Common SDK usage patterns with practical examples.

Basic workflow

Create and execute a simple workflow:
from kubiya import workflow, step

# Create workflow
@workflow
def hello_world():
    step("greet").shell("echo 'Hello from Kubiya!'")

# Execute
result = hello_world.execute(api_key="your-api-key")
print(result.status)

Multi-step pipeline

Build a data processing pipeline:
from kubiya import workflow, step

@workflow
def process_data():
    # Download data
    step("download").shell(
        "wget https://example.com/data.csv -O /tmp/data.csv"
    )
    
    # Process with Python
    step("transform").docker(
        image="python:3.11",
        script="""
import pandas as pd
df = pd.read_csv('/tmp/data.csv')
df_clean = df.dropna()
df_clean.to_csv('/tmp/clean.csv')
print(f"Cleaned {len(df_clean)} rows")
"""
    ).depends("download")
    
    # Upload results
    step("upload").shell(
        "aws s3 cp /tmp/clean.csv s3://bucket/processed/"
    ).depends("transform")

# Execute and stream results
for event in process_data.execute_stream():
    print(f"{event.timestamp}: {event.message}")

AI-powered workflow

Use AI agents within workflows:
from kubiya import workflow, step

@workflow
def ai_code_review(repo_url: str):
    # Clone repository
    step("clone").shell(f"git clone {repo_url} /tmp/repo")
    
    # AI agent reviews code
    step("review").inline_agent(
        message="Review this codebase for issues and improvements",
        tools=[
            {
                "name": "analyze-code",
                "image": "python:3.11",
                "script": "analyze_code.py /tmp/repo"
            }
        ]
    ).depends("clone").output("REVIEW")
    
    # Create report
    step("report").shell(
        "echo '${REVIEW}' > /tmp/code_review.md"
    ).depends("review")

# Execute with parameters
result = ai_code_review.execute(
    params={"repo_url": "https://github.com/myorg/myapp"},
    api_key="your-api-key"
)

Error handling

Build resilient workflows with retry and error handling:
from kubiya import workflow, step

@workflow
def resilient_pipeline():
    # Step with retry
    step("fetch-data").shell(
        "curl -f https://api.example.com/data"
    ).retry(limit=3, backoff="exponential")
    
    # Step with timeout
    step("process").docker(
        image="python:3.11",
        script="process_data.py"
    ).timeout("10m").depends("fetch-data")
    
    # Cleanup that always runs
    step("cleanup").shell(
        "rm -rf /tmp/workspace/*"
    ).always_run(True)

# Execute with error handling
try:
    result = resilient_pipeline.execute()
    print(f"Success: {result.output}")
except WorkflowError as e:
    print(f"Failed: {e.message}")
    # Access failed step details
    for step_error in e.step_errors:
        print(f"Step {step_error.name}: {step_error.error}")

Parallel execution

Process multiple items simultaneously:
from kubiya import workflow, step

@workflow
def parallel_processing():
    # Generate file list
    files = step("list-files").shell(
        "ls /data/*.csv"
    ).output("FILE_LIST")
    
    # Process files in parallel
    step("process-all").parallel(
        items="${FILE_LIST}",
        max_concurrent=3,
        step_template={
            "image": "python:3.11",
            "script": "process_file.py ${ITEM}"
        }
    ).depends("list-files")

result = parallel_processing.execute()

Conditional workflows

Execute steps based on conditions:
from kubiya import workflow, step

@workflow
def conditional_deploy(environment: str):
    # Run tests
    step("test").docker(
        image="node:18",
        command="npm test"
    ).output("TEST_RESULTS")
    
    # Deploy to staging (always)
    step("deploy-staging").docker(
        image="kubectl:latest",
        command="kubectl apply -f staging.yaml"
    ).depends("test")
    
    # Deploy to production (only if environment is prod and tests pass)
    step("deploy-prod").docker(
        image="kubectl:latest", 
        command="kubectl apply -f production.yaml"
    ).depends("deploy-staging").preconditions(
        "${environment} == 'production'",
        "${TEST_RESULTS.passed} == true"
    )

# Execute with parameters
result = conditional_deploy.execute(
    params={"environment": "production"}
)

Configuration and secrets

Handle configuration and sensitive data:
from kubiya import workflow, step

@workflow
def secure_deployment():
    # Use environment variables for config
    step("build").docker(
        image="docker:latest",
        command="docker build -t app:${VERSION} .",
        env={
            "VERSION": "${workflow.params.version}",
            "BUILD_ENV": "production"
        }
    )
    
    # Use secrets for sensitive data
    step("deploy").docker(
        image="kubectl:latest",
        script="""
kubectl create secret generic app-secrets \
  --from-literal=db-password="${secrets.DB_PASSWORD}" \
  --from-literal=api-key="${secrets.API_KEY}"
kubectl apply -f deployment.yaml
"""
    ).depends("build")

# Execute with secrets
result = secure_deployment.execute(
    params={"version": "1.2.3"},
    secrets={"DB_PASSWORD": "secret123", "API_KEY": "key456"}
)

Custom tools

Define reusable tools:
from kubiya import workflow, step, tool

# Define custom tool
@tool("slack-notify")
def slack_notification(message: str, channel: str = "#general"):
    return {
        "image": "curlimages/curl:latest",
        "script": f"""
curl -X POST $SLACK_WEBHOOK \
  -H "Content-Type: application/json" \
  -d '{{"text": "{message}", "channel": "{channel}"}}'
"""
    }

@workflow
def deployment_with_notifications():
    # Deploy application
    step("deploy").docker(
        image="kubectl:latest",
        command="kubectl apply -f app.yaml"
    )
    
    # Send notification using custom tool
    step("notify").use_tool(
        slack_notification,
        message="Deployment completed successfully",
        channel="#deployments"
    ).depends("deploy")

result = deployment_with_notifications.execute()

Streaming execution

Monitor workflow execution in real-time:
from kubiya import workflow, step

@workflow
def monitored_pipeline():
    step("prepare").shell("echo 'Starting pipeline'")
    step("process").shell("sleep 30 && echo 'Processing complete'")
    step("finalize").shell("echo 'Pipeline finished'").depends("process")

# Stream execution events
print("Starting workflow...")
for event in monitored_pipeline.execute_stream():
    if event.type == "step_started":
        print(f"→ Starting: {event.step_name}")
    elif event.type == "step_completed": 
        print(f"✓ Completed: {event.step_name}")
    elif event.type == "step_output":
        print(f"  Output: {event.message}")
    elif event.type == "workflow_completed":
        print("✓ Workflow completed successfully")

Best practices

Keep workflows focused: One workflow should handle one logical process. Use meaningful names: Step and workflow names should be self-documenting. Handle errors gracefully: Add retry logic and cleanup steps. Pass data efficiently: Use step outputs to share data between steps. Test locally: Validate workflows in development before production use. Use version control: Store workflow definitions in git repositories.
Next: Check the API reference for complete method documentation