Steps are the fundamental building blocks of Kubiya workflows. Each step represents a single operation or task that can be executed independently or as part of a larger workflow.

Basic Step Structure

Creating a Simple Step

from kubiya_sdk import Step

# Basic step definition
check_status = Step("check-service-status").tool("http-client").inputs(
    url="https://api.myservice.com/health",
    method="GET",
    timeout=30
)

Step with Multiple Inputs

# Deploy application step
deploy_app = Step("deploy-application").tool("kubernetes-deployer").inputs(
    service_name="user-service",
    image="user-service:v1.2.0",
    namespace="production",
    replicas=3,
    port=8080,
    env_vars={
        "DATABASE_URL": "${database.connection_string}",
        "REDIS_URL": "${cache.redis_url}",
        "LOG_LEVEL": "info"
    }
)

Step Configuration

Step Names and Descriptions

# Well-named step with description
validate_deployment = Step(
    name="validate-deployment-prerequisites",
    description="Verify all prerequisites are met before deployment"
).tool("deployment-validator").inputs(
    environment="production",
    service="user-service",
    checks=["health_endpoints", "database_migrations", "feature_flags"]
)

Step Timeouts

# Step with custom timeout
long_running_task = Step("database-migration").tool("migration-runner").inputs(
    migration_script="001_add_user_preferences.sql",
    database="production"
).timeout(minutes=30)  # 30 minute timeout

# Step with retry configuration
flaky_operation = Step("external-api-call").tool("http-client").inputs(
    url="https://external-api.com/data",
    headers={"Authorization": "Bearer ${api_token}"}
).retry(
    max_attempts=3,
    backoff_strategy="exponential",
    initial_delay=5,  # seconds
    max_delay=60
)

Step Dependencies

Sequential Dependencies

# Steps that must run in order
build_image = Step("build-docker-image").tool("docker-builder").inputs(
    dockerfile_path="./Dockerfile",
    image_name="myapp",
    tag="${BUILD_VERSION}"
)

push_image = Step("push-to-registry").tool("docker-pusher").inputs(
    image="myapp:${BUILD_VERSION}",
    registry="gcr.io/myproject"
).depends_on(build_image)

deploy_service = Step("deploy-to-kubernetes").tool("kubectl").inputs(
    image="gcr.io/myproject/myapp:${BUILD_VERSION}",
    namespace="production"
).depends_on(push_image)

Multiple Dependencies

# Step that depends on multiple previous steps
integration_tests = Step("run-integration-tests").tool("test-runner").inputs(
    test_suite="integration",
    environment="staging"
)

load_tests = Step("run-load-tests").tool("load-tester").inputs(
    target_url="https://staging.myapp.com",
    concurrent_users=100
)

# Deploy only after both tests pass
production_deploy = Step("deploy-to-production").tool("kubernetes-deployer").inputs(
    service="myapp",
    environment="production"
).depends_on([integration_tests, load_tests])

Conditional Steps

Simple Conditions

# Step that only runs under certain conditions
rollback_step = Step("rollback-deployment").tool("kubernetes-rollback").inputs(
    service="myapp",
    namespace="production",
    target_revision="previous"
).condition("${health_check.status} == 'failed'")

# Environment-specific step
staging_notification = Step("notify-staging-team").tool("slack").inputs(
    channel="#staging-deploys",
    message="🚀 Deployment to staging completed"
).condition("${ENVIRONMENT} == 'staging'")

Complex Conditions

# Multiple condition logic
canary_promotion = Step("promote-canary").tool("traffic-manager").inputs(
    service="myapp",
    traffic_split=100  # Full traffic
).condition("""
    ${canary_deploy.status} == 'success' and
    ${health_metrics.error_rate} < 0.01 and
    ${health_metrics.response_time} < 200
""")

Step Outputs and Variables

Capturing Step Outputs

# Step that produces output for other steps
database_backup = Step("backup-database").tool("pg_dump").inputs(
    database="production",
    backup_location="/backups/"
).outputs([
    "backup_file_path",
    "backup_size",
    "backup_timestamp"
])

# Step that uses previous step output
verify_backup = Step("verify-backup").tool("backup-verifier").inputs(
    backup_file="${database_backup.backup_file_path}",
    expected_min_size="1GB"
).depends_on(database_backup)

Variable Interpolation

# Using workflow variables in steps
deploy_step = Step("deploy-service").tool("helm").inputs(
    chart="./helm-chart",
    release_name="${SERVICE_NAME}-${ENVIRONMENT}",
    values={
        "image.repository": "${DOCKER_REGISTRY}/${SERVICE_NAME}",
        "image.tag": "${BUILD_VERSION}",
        "replicaCount": "${REPLICA_COUNT}",
        "environment": "${ENVIRONMENT}"
    }
)

Step Types and Patterns

HTTP API Steps

# RESTful API calls
api_call = Step("call-external-api").tool("http-client").inputs(
    url="https://api.external-service.com/v1/data",
    method="POST",
    headers={
        "Authorization": "Bearer ${api_token}",
        "Content-Type": "application/json"
    },
    body={
        "query": "user_data",
        "filters": ["active", "verified"]
    },
    timeout=60,
    expected_status_codes=[200, 201]
)

Database Operations

# Database query step
user_lookup = Step("lookup-user").tool("postgres-client").inputs(
    query="SELECT id, email, status FROM users WHERE id = $1",
    parameters=["${USER_ID}"],
    connection_string="${DATABASE_URL}"
)

# Database migration step
run_migration = Step("apply-migration").tool("flyway").inputs(
    migration_location="classpath:db/migration",
    database_url="${DATABASE_URL}",
    migrate_command="migrate"
)

File System Operations

# File manipulation steps
copy_config = Step("copy-configuration").tool("file-manager").inputs(
    source_path="/templates/app.conf.template", 
    destination_path="/app/config/app.conf",
    replacements={
        "{{DATABASE_HOST}}": "${DATABASE_HOST}",
        "{{API_KEY}}": "${API_KEY}"
    }
)

backup_logs = Step("archive-logs").tool("tar").inputs(
    source_directory="/var/log/myapp",
    archive_name="logs-${BUILD_VERSION}-${TIMESTAMP}.tar.gz",
    destination="/backups/logs/"
)

Kubernetes Operations

# Kubernetes deployment step
k8s_deploy = Step("deploy-to-k8s").tool("kubectl").inputs(
    action="apply",
    manifest_file="deployment.yaml",
    namespace="${ENVIRONMENT}",
    context="${K8S_CONTEXT}"
)

# Kubernetes scaling step  
scale_deployment = Step("scale-service").tool("kubectl").inputs(
    action="scale",
    resource_type="deployment",
    resource_name="${SERVICE_NAME}",
    replicas="${TARGET_REPLICAS}",
    namespace="${ENVIRONMENT}"
)

Advanced Step Features

Dynamic Step Generation

def create_service_deployment_steps(services):
    """Generate deployment steps for multiple services"""
    steps = []
    
    for service in services:
        # Build step
        build_step = Step(f"build-{service}").tool("docker-builder").inputs(
            service_name=service,
            dockerfile_path=f"./services/{service}/Dockerfile"
        )
        
        # Test step
        test_step = Step(f"test-{service}").tool("test-runner").inputs(
            service=service,
            test_type="unit"
        ).depends_on(build_step)
        
        # Deploy step
        deploy_step = Step(f"deploy-{service}").tool("kubernetes-deployer").inputs(
            service=service,
            environment="${ENVIRONMENT}"
        ).depends_on(test_step)
        
        steps.extend([build_step, test_step, deploy_step])
    
    return steps

# Use dynamic generation
services = ["user-service", "payment-service", "notification-service"]
deployment_steps = create_service_deployment_steps(services)

Step Templates

# Reusable step template
def create_health_check_step(service_name, endpoint="/health"):
    return Step(f"health-check-{service_name}").tool("http-client").inputs(
        url=f"https://{service_name}.myapp.com{endpoint}",
        method="GET",
        expected_status_code=200,
        timeout=30
    )

# Use template
frontend_health = create_health_check_step("frontend-service")
api_health = create_health_check_step("api-service", "/api/health")

Step Hooks and Callbacks

# Step with pre and post execution hooks
deployment_step = Step("deploy-application").tool("kubernetes-deployer").inputs(
    service="myapp",
    environment="production"
).on_start(
    # Execute before step starts
    Step("pre-deployment-notification").tool("slack").inputs(
        channel="#deployments",
        message="🚀 Starting deployment of myapp to production"
    )
).on_success(
    # Execute on successful completion
    Step("post-deployment-notification").tool("slack").inputs(
        channel="#deployments", 
        message="✅ Successfully deployed myapp to production"
    )
).on_failure(
    # Execute on failure
    Step("deployment-failure-alert").tool("slack").inputs(
        channel="#incidents",
        message="🚨 Deployment of myapp to production failed!"
    )
)

Error Handling in Steps

Step-Level Error Handling

# Step with custom error handling
risky_operation = Step("risky-database-operation").tool("postgres-client").inputs(
    query="UPDATE users SET status = 'migrated' WHERE created_at < '2023-01-01'",
    connection_string="${DATABASE_URL}"
).on_error(
    # What to do if this step fails
    actions=[
        Step("log-error").tool("logger").inputs(
            level="error",
            message="Database migration failed: ${error.message}"
        ),
        Step("rollback-changes").tool("postgres-client").inputs(
            query="UPDATE users SET status = 'active' WHERE status = 'migrated'",
            connection_string="${DATABASE_URL}"
        )
    ],
    continue_workflow=False  # Stop workflow on error
)

Graceful Degradation

# Step with fallback behavior
primary_service_call = Step("call-primary-service").tool("http-client").inputs(
    url="https://primary-service.com/api/data",
    timeout=10
).on_error(
    # Fallback to secondary service
    fallback=Step("call-secondary-service").tool("http-client").inputs(
        url="https://secondary-service.com/api/data",
        timeout=30
    ),
    continue_workflow=True
)

Step Performance and Monitoring

Step Metrics

# Step with performance monitoring
monitored_step = Step("performance-critical-operation").tool("data-processor").inputs(
    input_file="/data/large_dataset.csv",
    output_file="/processed/results.json"
).monitor(
    metrics=["duration", "memory_usage", "cpu_usage"],
    alerts=[
        {"metric": "duration", "threshold": "5m", "action": "notify"},
        {"metric": "memory_usage", "threshold": "2GB", "action": "alert"}
    ]
)

Step Logging

# Step with detailed logging
logged_step = Step("important-business-operation").tool("business-processor").inputs(
    transaction_id="${TRANSACTION_ID}",
    amount="${AMOUNT}",
    currency="${CURRENCY}"
).log_level("debug").log_outputs(True).inputs_logging(
    # Don't log sensitive inputs
    exclude_fields=["credit_card", "ssn", "password"]
)

Best Practices

Step Naming

  • Use descriptive, action-oriented names
  • Include the purpose and context
  • Use consistent naming conventions
  • Avoid abbreviations

Step Design

  • Keep steps focused on single responsibilities
  • Make steps idempotent when possible
  • Use appropriate timeouts
  • Handle errors gracefully

Variable Usage

  • Use meaningful variable names
  • Document variable purposes
  • Validate inputs when possible
  • Use defaults for optional parameters

Dependencies

  • Minimize unnecessary dependencies
  • Use parallel execution when possible
  • Consider failure scenarios
  • Document dependency relationships