Advanced workflow patterns for complex automation scenarios.

Conditional execution

Run steps based on previous results:
from kubiya import workflow, step

@workflow
def conditional_deploy():
    # Run tests
    tests = step("test").docker(
        image="node:18",
        command="npm test"
    ).output("TEST_RESULTS")
    
    # Deploy only if tests pass
    step("deploy").docker(
        image="kubectl:latest",
        command="kubectl apply -f app.yaml"
    ).depends("test").preconditions(
        "${TEST_RESULTS.passed} == true",
        "${TEST_RESULTS.coverage} > 80"
    )

Retry and error handling

Handle failures gracefully:
@workflow
def resilient_workflow():
    # Step with retry logic
    data = step("fetch").docker(
        image="curlimages/curl:latest", 
        command="curl -f https://api.example.com/data"
    ).retry(
        limit=3,
        backoff="exponential",
        max_interval="5m"
    ).timeout("10m")
    
    # Continue on specific errors
    step("optional-step").docker(
        image="python:3.11",
        script="optional_process.py"
    ).continue_on_exit_codes([404, 503])
    
    # Cleanup that always runs
    step("cleanup").shell(
        "rm -rf /tmp/workspace/*"
    ).always_run(True)

Parallel processing

Process multiple items simultaneously:
@workflow
def parallel_processing():
    # Get list of items
    items = step("list-items").shell(
        "ls /data/*.csv"
    ).output("FILE_LIST")
    
    # Process each item in parallel
    step("process-files").parallel(
        items="${FILE_LIST}",
        max_concurrent=5,
        step_config={
            "image": "python:3.11",
            "script": """
import sys
filename = sys.argv[1]
process_file(filename)
"""
        }
    ).depends("list-items")

Data sharing between steps

Pass data and files between workflow steps:
@workflow
def data_pipeline():
    # Extract data
    extract = step("extract").docker(
        image="postgres:15",
        command="pg_dump mydb > /output/data.sql"
    ).output("DATA_PATH")
    
    # Transform using extracted data
    transform = step("transform").docker(
        image="python:3.11",
        script="""
import pandas as pd
df = pd.read_sql_table("users", connection)
df['processed'] = True
df.to_csv('/output/transformed.csv')
"""
    ).depends("extract").volumes([{
        "source": "${DATA_PATH}",
        "target": "/input"
    }]).output("TRANSFORMED_DATA")
    
    # Load transformed data
    step("load").docker(
        image="snowflake/cli:latest",
        command="snow stage copy ${TRANSFORMED_DATA} @warehouse"
    ).depends("transform")

Multi-environment workflows

Deploy to different environments with different configurations:
@workflow
def multi_env_deploy(environment: str, version: str):
    # Build application
    build = step("build").docker(
        image="docker:latest",
        command=f"docker build -t myapp:{version} ."
    )
    
    # Get environment-specific config
    config = step("get-config").docker(
        image="alpine:latest",
        command=f"cp /configs/{environment}.yaml /output/config.yaml"
    ).output("CONFIG")
    
    # Deploy with environment-specific settings
    step("deploy").docker(
        image="kubectl:latest",
        script=f"""
kubectl config use-context {environment}
kubectl apply -f ${{CONFIG}}
kubectl set image deployment/myapp myapp=myapp:{version}
"""
    ).depends(["build", "get-config"])

Workflow orchestration

Chain multiple workflows together:
@workflow
def main_pipeline():
    # Run data ingestion workflow
    ingest = step("run-ingestion").workflow(
        name="data-ingestion",
        inputs={"source": "production-db"}
    )
    
    # Run processing workflow
    process = step("run-processing").workflow(
        name="data-processing", 
        inputs={"data_path": "${ingest.output_path}"}
    ).depends("run-ingestion")
    
    # Run ML training workflow
    step("run-training").workflow(
        name="ml-training",
        inputs={"features": "${process.features}"}
    ).depends("run-processing")

Dynamic workflow generation

Generate workflow steps based on runtime data:
@workflow
def dynamic_processing():
    # Get list of datasets
    datasets = step("list-datasets").docker(
        image="awscli/aws-cli:latest",
        command="aws s3 ls s3://data-bucket/ --recursive | grep .csv"
    ).output("DATASET_LIST")
    
    # AI agent creates processing plan
    plan = step("create-plan").inline_agent(
        message="Create a processing plan for these datasets: ${DATASET_LIST}",
        agent_name="data-planner",
        tools=[
            {
                "name": "analyze-dataset",
                "image": "python:3.11",
                "script": "analyze_data_schema.py"
            }
        ]
    ).depends("list-datasets").output("PROCESSING_PLAN")
    
    # Execute the generated plan
    step("execute-plan").dynamic_workflow(
        plan="${PROCESSING_PLAN}"
    ).depends("create-plan")

Production monitoring

Monitor workflow execution and performance:
@workflow
def monitored_workflow():
    # Step with custom metrics
    process = step("process-data").docker(
        image="python:3.11",
        script="process_data.py"
    ).metrics([
        {"name": "records_processed", "type": "counter"},
        {"name": "processing_time", "type": "gauge"}
    ])
    
    # Health check step
    step("health-check").docker(
        image="curlimages/curl:latest",
        command="curl -f http://service/health"
    ).depends("process-data").on_failure("send-alert")
    
    # Send notification on completion
    step("notify").docker(
        image="alpine:latest",
        script="""
curl -X POST $SLACK_WEBHOOK \
  -H "Content-Type: application/json" \
  -d '{"text": "Workflow completed: ${workflow.name}"}'
"""
    ).always_run(True)

Resource management

Control resource allocation for steps:
@workflow  
def resource_managed():
    # Memory-intensive step
    step("process-large-data").docker(
        image="python:3.11",
        script="process_big_data.py"
    ).resources(
        cpu="2000m",
        memory="8Gi",
        timeout="2h"
    )
    
    # GPU-accelerated step
    step("ml-inference").docker(
        image="tensorflow/tensorflow:latest-gpu",
        script="run_inference.py"
    ).resources(
        gpu=1,
        memory="4Gi"
    )

Best practices

Use specific timeouts: Set reasonable timeouts for each step to prevent hanging. Implement proper cleanup: Always include cleanup steps that run even on failure. Monitor resource usage: Set appropriate CPU and memory limits. Handle secrets securely: Use environment variables for sensitive data, never hardcode. Test failure scenarios: Verify your error handling and retry logic work correctly. Use meaningful step names: Make workflow execution logs easy to understand. Optimize for parallelism: Identify independent steps that can run simultaneously.
Next: Learn about DSL syntax or see more workflow examples