Real-world workflow patterns using the Kubiya Workflow SDK.

Basic Deployment Pipeline

Build, test, and deploy an application:
from kubiya_workflow_sdk.dsl import workflow, shell_executor
from kubiya_workflow_sdk import execute_workflow

# Create deployment workflow
deploy_pipeline = (
    workflow("deploy-app")
    .description("Build, test, and deploy application")
    .params(APP_NAME="myapp", VERSION="${BUILD_ID}")
    
    # Run tests
    .step("test", "npm test")
    
    # Build Docker image
    .step("build", "docker build -t ${APP_NAME}:${VERSION} .")
    
    # Deploy to Kubernetes
    .step("deploy", "kubectl set image deployment/${APP_NAME} ${APP_NAME}=${APP_NAME}:${VERSION}")
)

# Execute the workflow
result = execute_workflow(deploy_pipeline.to_dict(), api_key="your-api-key")

Data Processing Pipeline

Extract, transform, and load data:
from kubiya_workflow_sdk.dsl import workflow, python_executor

# Create ETL workflow
etl_pipeline = (
    workflow("etl-pipeline")
    .description("Extract, transform, and load data")
    .env(DB_HOST="${DB_HOST}", DB_USER="${DB_USER}", DB_PASS="${DB_PASS}")
    
    # Extract data
    .step("extract", "pg_dump -h ${DB_HOST} -U ${DB_USER} users > /tmp/users.sql")
    
    # Transform data
    .step("transform")
    .executor(python_executor("""
import pandas as pd
import os

# Load data
df = pd.read_csv('/tmp/users.sql', delimiter='\t')
df['processed_date'] = pd.Timestamp.now()
df.to_csv('/tmp/transformed_users.csv', index=False)
print(f"Processed {len(df)} records")
    """, packages=["pandas"]))
    
    # Load to warehouse
    .step("load", "aws s3 cp /tmp/transformed_users.csv s3://warehouse/data/")
)

AI-Powered Code Review

Use AI agents to analyze pull requests:
from kubiya_workflow_sdk.dsl import workflow, inline_agent_executor

# Create code review workflow
code_review = (
    workflow("ai-code-review")
    .description("AI-powered code review for pull requests")
    .params(PR_NUMBER="${PR_NUMBER}", REPO="${REPO}")
    
    # Fetch PR changes
    .step("fetch-changes", "gh pr diff ${PR_NUMBER} > /tmp/pr-changes.diff")
    
    # AI analyzes the code
    .step("analyze")
    .executor(inline_agent_executor(
        message="Review the code changes in /tmp/pr-changes.diff for quality, security, and best practices",
        runners=["kubiya-hosted"],
        ai_instructions="""You are an expert code reviewer. Analyze the changes and provide:
        1. Code quality issues
        2. Security vulnerabilities  
        3. Performance concerns
        4. Best practice recommendations"""
    ))
    
    # Post review comment
    .step("post-review", "gh pr comment ${PR_NUMBER} --body-file /tmp/review-results.md")
)

Parallel Processing

Process multiple items simultaneously:
from kubiya_workflow_sdk.dsl import workflow

# Process multiple services in parallel
parallel_deployment = (
    workflow("parallel-deploy")
    .description("Deploy multiple services simultaneously")
    .params(ENVIRONMENT="staging")
    
    # Deploy services in parallel
    .parallel_steps(
        "deploy-services",
        items=["api", "frontend", "worker"],
        command="kubectl set image deployment/${ITEM} ${ITEM}:latest -n ${ENVIRONMENT}",
        max_concurrent=3
    )
    
    # Verify all deployments
    .parallel_steps(
        "verify-services", 
        items=["api", "frontend", "worker"],
        command="kubectl rollout status deployment/${ITEM} -n ${ENVIRONMENT}",
        max_concurrent=3
    )
)

Database Migration with Backup

Safely run database changes with backup:
from kubiya_workflow_sdk.dsl import workflow

# Safe database migration
db_migration = (
    workflow("db-migration")
    .description("Run database migration with backup")
    .env(DB_NAME="production_db", BACKUP_BUCKET="s3://backups/db/")
    
    # Backup database
    .step("backup", "pg_dump ${DB_NAME} | gzip > backup_$(date +%Y%m%d_%H%M%S).sql.gz")
    
    # Upload backup
    .step("upload-backup", "aws s3 cp backup_*.sql.gz ${BACKUP_BUCKET}")
    
    # Run migration
    .step("migrate", "python manage.py migrate --database=${DB_NAME}")
    
    # Verify migration
    .step("verify", "python manage.py showmigrations --database=${DB_NAME}")
)

Error Handling with Retries

Build resilient workflows with retry logic:
from kubiya_workflow_sdk.dsl import workflow

# Workflow with error handling
resilient_workflow = (
    workflow("resilient-api-call")
    .description("API call with retries and fallback")
    .params(API_URL="https://api.example.com/data", MAX_RETRIES="3")
    
    # Main API call with retry
    .step("fetch-data", """
        for i in $(seq 1 ${MAX_RETRIES}); do
            if curl -f ${API_URL} -o /tmp/data.json; then
                echo "API call succeeded on attempt $i"
                exit 0
            fi
            echo "Attempt $i failed, retrying..."
            sleep $((i * 2))
        done
        exit 1
    """)
    
    # Process data if fetch succeeded
    .step("process-data", "python process_api_data.py /tmp/data.json")
    
    # Cleanup (always runs)
    .step("cleanup", "rm -f /tmp/data.json")
)

Using the Kubiya Client

Execute workflows using the KubiyaClient:
from kubiya_workflow_sdk import KubiyaClient
from kubiya_workflow_sdk.dsl import workflow

# Initialize client
client = KubiyaClient(
    api_key="your-api-key",
    base_url="https://api.kubiya.ai"
)

# Create workflow
simple_workflow = (
    workflow("hello-world")
    .description("Simple greeting workflow")
    .step("greet", "echo 'Hello from Kubiya!'")
    .step("timestamp", "date")
)

# Execute with streaming
for event in client.execute_workflow(simple_workflow.to_dict(), stream=True):
    print(event)

# Execute without streaming  
result = client.execute_workflow(simple_workflow.to_dict(), stream=False)
print(f"Workflow result: {result}")

Best Practices

  • Use clear step names that describe what each step does
  • Set environment variables for configuration instead of hardcoding values
  • Handle errors gracefully with retry logic and fallback steps
  • Use parallel processing for independent operations to save time
  • Keep workflows focused - each workflow should have a single purpose
  • Test workflows incrementally starting simple and adding complexity

Next Steps