Common workflow patterns using containers, AI agents, and automation tools.

Simple deployment pipeline

Build, test, and deploy an application:
from kubiya import workflow, step

@workflow
def deploy_app():
    # Run tests in Node.js container
    test = step("test").docker(
        image="node:18",
        command="npm test"
    )
    
    # Build Docker image
    build = step("build").docker(
        image="docker:latest",
        command="docker build -t myapp:latest ."
    ).depends("test")
    
    # Deploy to Kubernetes
    step("deploy").docker(
        image="kubectl:latest",
        command="kubectl apply -f deployment.yaml"
    ).depends("build")

Data processing workflow

Extract, transform, and load data:
@workflow
def etl_pipeline():
    # Extract data from database
    extract = step("extract").docker(
        image="postgres:15",
        command="pg_dump -h $DB_HOST -t users > users.sql"
    )
    
    # Transform with Python
    transform = step("transform").docker(
        image="python:3.11",
        script="""
import pandas as pd
df = pd.read_sql_query("SELECT * FROM users", connection)
df['processed'] = df['created_at'].dt.date
df.to_csv('transformed_users.csv')
"""
    ).depends("extract")
    
    # Load to data warehouse
    step("load").docker(
        image="snowflake/snowcli:latest",
        command="snow stage copy transformed_users.csv @warehouse"
    ).depends("transform")

AI-powered code review

Use AI agents to analyze pull requests:
@workflow
def ai_code_review(pr_number: int):
    # Fetch PR changes
    fetch = step("fetch-pr").docker(
        image="alpine/git:latest",
        command=f"git diff main..pr-{pr_number} > changes.diff"
    ).output("PR_CHANGES")
    
    # AI agent analyzes the code
    review = step("analyze-code").inline_agent(
        message="Review this code for quality, bugs, and best practices: ${PR_CHANGES}",
        agent_name="code-reviewer",
        tools=[
            {
                "name": "security-scanner",
                "image": "securecodewarrior/semgrep:latest",
                "command": "semgrep --config=auto ."
            }
        ]
    ).depends("fetch-pr").output("REVIEW_RESULTS")
    
    # Post review comments
    step("post-review").docker(
        image="github/hub:latest",
        command="hub api repos/:owner/:repo/pulls/${pr_number}/reviews -f body='${REVIEW_RESULTS}'"
    ).depends("analyze-code")

Custom Tool Integration

Use custom Tool class instances for reusable, typed functionality:
from kubiya_workflow_sdk.tools import Tool, Arg

# Define custom tools
url_validator = Tool(
    name="url_validator",
    description="Validate URL format and connectivity",
    type="docker",
    image="curlimages/curl:latest",
    content="""
#!/bin/sh
echo "Validating URL: $url"
if echo "$url" | grep -q '^https\\?://'; then
    echo "✓ Valid URL format"
    curl -s --head "$url" && echo "✓ URL is accessible"
else
    echo "✗ Invalid URL format"
    exit 1
fi
    """,
    args=[
        Arg(name="url", type="str", description="URL to validate", required=True)
    ]
)

json_processor = Tool(
    name="json_processor",
    description="Process and validate JSON data",
    type="docker",
    image="python:3.11-alpine",
    content="""
#!/bin/sh
python3 -c "
import json
import sys

try:
    data = json.loads('$json_data')
    if '$operation' == 'validate':
        print('✓ Valid JSON')
        print(f'Keys: {list(data.keys())}')
    elif '$operation' == 'pretty':
        print(json.dumps(data, indent=2))
except json.JSONDecodeError as e:
    print(f'✗ Invalid JSON: {e}')
    sys.exit(1)
"
    """,
    args=[
        Arg(name="json_data", type="str", description="JSON data to process", required=True),
        Arg(name="operation", type="str", description="Operation type", 
            default="validate", options=["validate", "pretty"])
    ]
)

Incident response automation

Automated response to system alerts:
@workflow
def incident_response():
    # Detect issue from monitoring
    detect = step("detect").docker(
        image="alpine:latest",
        script="curl -s http://prometheus:9090/api/v1/alerts"
    ).output("ALERTS")
    
    # AI agent determines severity and actions
    assess = step("assess-incident").inline_agent(
        message="Analyze these alerts and recommend actions: ${ALERTS}",
        agent_name="incident-commander",
        tools=[
            {
                "name": "check-logs",
                "image": "elasticsearch/elasticsearch:8.0",
                "command": "curl -X GET 'localhost:9200/_search'"
            },
            {
                "name": "check-resources", 
                "image": "kubernetes/kubectl:latest",
                "command": "kubectl top nodes"
            }
        ]
    ).depends("detect").output("RESPONSE_PLAN")
    
    # Execute remediation
    step("remediate").docker(
        image="ansible/ansible:latest",
        command="ansible-playbook fix-${RESPONSE_PLAN.issue_type}.yml"
    ).depends("assess-incident")

Parallel file processing

Process multiple files simultaneously:
@workflow
def process_files():
    # List files to process
    list_files = step("list").shell(
        "ls /data/*.csv > files.txt"
    ).output("FILE_LIST")
    
    # Process each file in parallel
    step("process").parallel(
        items="${FILE_LIST}",
        max_concurrent=3,
        docker_config={
            "image": "python:3.11",
            "script": """
import pandas as pd
import sys
filename = sys.argv[1]
df = pd.read_csv(f'/data/{filename}')
df['processed'] = True
df.to_csv(f'/output/processed_{filename}')
"""
        }
    ).depends("list")

Database migration with validation

Run database changes safely:
@workflow
def database_migration():
    # Backup database
    backup = step("backup").docker(
        image="postgres:15",
        command="pg_dump production_db > backup_$(date +%Y%m%d).sql"
    )
    
    # Test migration on copy
    test = step("test-migration").docker(
        image="postgres:15",
        script="""
# Create test database from backup
createdb test_migration
psql test_migration < backup_*.sql
# Run migration
psql test_migration < migration.sql
# Validate
psql test_migration -c "SELECT COUNT(*) FROM users;"
"""
    ).depends("backup")
    
    # Apply to production only if test passes
    step("migrate-prod").docker(
        image="postgres:15",
        command="psql production_db < migration.sql"
    ).depends("test").preconditions("${test.exit_code} == 0")

Error handling patterns

Robust workflows with retry and cleanup:
@workflow
def resilient_workflow():
    # Step with retry logic
    fetch = step("fetch-data").docker(
        image="curlimages/curl:latest",
        command="curl -f https://api.example.com/data"
    ).retry(limit=3, backoff="exponential")
    
    # Process data with timeout
    process = step("process").docker(
        image="python:3.11",
        script="process_data.py"
    ).depends("fetch").timeout("10m")
    
    # Cleanup that always runs
    step("cleanup").shell(
        "rm -rf /tmp/workspace/*"
    ).continue_on_failure(True)

Multi-environment deployment

Deploy to different environments with conditions:
@workflow
def multi_env_deploy(environment: str):
    # Build once
    build = step("build").docker(
        image="docker:latest",
        command="docker build -t myapp:${GIT_SHA} ."
    )
    
    # Deploy to staging (always)
    deploy_staging = step("deploy-staging").docker(
        image="kubectl:latest",
        command="kubectl apply -f staging.yaml"
    ).depends("build")
    
    # Deploy to production (only if staging succeeds and env is prod)
    step("deploy-production").docker(
        image="kubectl:latest",
        command="kubectl apply -f production.yaml"
    ).depends("deploy-staging").preconditions(
        "${deploy_staging.exit_code} == 0",
        "${environment} == 'production'"
    )

Best practices

Use specific container images: Pin versions for reproducibility (node:18.17 not node:latest) Handle failures gracefully: Add retry logic and cleanup steps that always run Pass data between steps: Use .output() to capture results and reference them in later steps Leverage parallel execution: Independent steps will run simultaneously by default Include AI where it adds value: Use inline agents for analysis, decision-making, and dynamic responses Test incrementally: Start with simple workflows and add complexity gradually
Next: Learn the workflow syntax or try the quick start guide