Release history and changes for the Kubiya Workflow SDK
workflow.add_step()
from kubiya_sdk import KubiyaClient client = KubiyaClient( token="...", base_url="..." )
from kubiya_workflow_sdk import Client client = Client( api_key="key_...", api_url="https://api.kubiya.ai" )
workflow = Workflow("my-workflow") workflow.add_step(ShellStep("echo", "echo 'Hello'")) workflow.add_step(PythonStep("process", "print('Processing')"))
@workflow(name="my-workflow") def my_workflow(): step.shell("echo 'Hello'", name="echo") step.python("print('Processing')", name="process")
# Not supported result = client.run_workflow(workflow) print(result)
# Real-time streaming for event in client.execute_workflow(workflow, stream=True): if event.type == "log": print(event.data["message"])
try: client.run_workflow(workflow) except Exception as e: print(f"Error: {e}")
from kubiya_workflow_sdk.errors import ( ValidationError, ExecutionError, StepError ) try: client.execute_workflow(workflow) except StepError as e: print(f"Step {e.step_name} failed: {e.message}") except ExecutionError as e: print(f"Workflow failed: {e}")
Was this page helpful?