Effective error handling is crucial for building reliable workflows. The Kubiya Workflow DSL provides comprehensive error handling mechanisms to gracefully handle failures and implement recovery strategies.

Basic Error Handling

Step-Level Error Handling

from kubiya_sdk import Step, ErrorHandler

# Basic error handling for a step
risky_deployment = Step("deploy-to-production").tool("kubernetes-deployer").inputs(
    service="payment-service",
    environment="production"
).on_error(
    ErrorHandler().on_failure([
        Step("notify-failure").tool("slack").inputs(
            channel="#incidents",
            message="🚨 Production deployment failed!"
        ),
        Step("rollback-deployment").tool("kubernetes-rollback").inputs(
            service="payment-service",
            environment="production"
        )
    ])
)

Multiple Error Types

# Handle different types of errors differently
database_operation = Step("complex-database-migration").tool("migration-runner").inputs(
    migration_file="complex_schema_change.sql",
    database="production"
).on_error(
    ErrorHandler()
    .on_timeout([
        Step("extend-maintenance-window").tool("maintenance-scheduler").inputs(
            duration="additional-30m"
        ),
        Step("retry-with-longer-timeout").tool("migration-runner").inputs(
            migration_file="complex_schema_change.sql",
            timeout="45m"
        )
    ])
    .on_failure([
        Step("restore-database-backup").tool("backup-restorer").inputs(
            backup_file="${pre_migration_backup.backup_file}"
        ),
        Step("alert-dba-team").tool("pager-duty").inputs(
            service="database-team",
            message="Critical: Migration rollback initiated"
        )
    ])
    .on_resource_limit([
        Step("scale-up-database").tool("database-scaler").inputs(
            cpu_cores=8,
            memory="16Gi"
        ),
        Step("retry-migration").tool("migration-runner").inputs(
            migration_file="complex_schema_change.sql"
        )
    ])
)

Retry Mechanisms

Simple Retry

# Step with automatic retry
flaky_external_api = Step("call-external-service").tool("http-client").inputs(
    url="https://unreliable-api.com/data",
    headers={"Authorization": "Bearer ${API_TOKEN}"}
).retry(
    max_attempts=3,
    backoff_strategy="exponential",
    initial_delay=5,  # seconds
    max_delay=60,
    retry_on_status=[500, 502, 503, 504]
)

Advanced Retry Logic

# Conditional retry with custom logic
intelligent_retry = Step("smart-deployment").tool("smart-deployer").inputs(
    service="user-service",
    strategy="blue-green"
).retry(
    max_attempts=5,
    backoff_strategy="custom",
    retry_condition="""
        ${error.type} == 'TemporaryError' or
        (${error.type} == 'ResourceError' and ${attempt_count} < 3)
    """,
    custom_backoff=lambda attempt: min(2 ** attempt * 10, 300),  # Cap at 5 minutes
    
    # Different action per retry attempt
    on_retry_attempt=[
        (1, Step("log-first-retry").tool("logger").inputs(level="info")),
        (3, Step("alert-operations").tool("slack").inputs(channel="#ops")),
        (5, Step("escalate-to-engineer").tool("pager-duty"))
    ]
)

Circuit Breaker Pattern

# Circuit breaker for frequently failing operations
circuit_breaker_step = Step("external-dependency").tool("http-client").inputs(
    url="https://flaky-service.com/api"
).circuit_breaker(
    failure_threshold=5,        # Open circuit after 5 failures
    recovery_timeout=60,        # Try to close circuit after 60 seconds
    success_threshold=3,        # Close circuit after 3 successful calls
    
    # Fallback when circuit is open
    fallback_action=Step("use-cached-data").tool("cache-reader").inputs(
        cache_key="fallback_data"
    )
)

Workflow-Level Error Handling

Global Error Handlers

from kubiya_sdk import Workflow, GlobalErrorHandler

@workflow
def resilient_deployment_pipeline():
    """Deployment pipeline with comprehensive error handling"""
    
    # Define global error handler
    global_handler = GlobalErrorHandler()
    global_handler.on_any_failure([
        Step("capture-workflow-state").tool("state-capturer").inputs(
            workflow_id="${WORKFLOW_ID}",
            timestamp="${CURRENT_TIMESTAMP}"
        ),
        Step("notify-platform-team").tool("slack").inputs(
            channel="#platform",
            message="Workflow ${WORKFLOW_ID} encountered an error"
        )
    ])
    
    # Define workflow steps
    build_step = Step("build-application").tool("builder")
    test_step = Step("run-tests").tool("test-runner")
    deploy_step = Step("deploy-application").tool("deployer")
    
    return Workflow(
        steps=[build_step, test_step, deploy_step],
        error_handler=global_handler
    )

Workflow Recovery Strategies

@workflow
def self_healing_deployment():
    """Workflow that can recover from various failure scenarios"""
    
    # Pre-deployment backup
    backup_step = Step("create-backup").tool("backup-creator").inputs(
        environment="production"
    )
    
    # Main deployment
    deployment_step = Step("deploy-service").tool("deployer").inputs(
        service="critical-service"
    ).on_error(
        ErrorHandler()
        .on_failure([
            # Immediate rollback
            Step("emergency-rollback").tool("rollback-tool").inputs(
                backup_id="${backup_step.backup_id}"
            ),
            
            # Health check after rollback
            Step("verify-rollback").tool("health-checker"),
            
            # If rollback fails, escalate immediately
            Step("escalate-critical-failure").tool("incident-manager").inputs(
                severity="critical",
                auto_page=True
            ).condition("${emergency_rollback.status} == 'failed'")
        ])
    )
    
    # Post-deployment verification
    verification_step = Step("verify-deployment").tool("verification-suite").on_error(
        ErrorHandler().on_failure([
            # Verification failed - safe rollback
            Step("safe-rollback").tool("blue-green-rollback"),
            Step("analyze-failure").tool("log-analyzer").inputs(
                time_range="last-10m"
            )
        ])
    )
    
    return Workflow([backup_step, deployment_step, verification_step])

Error Context and Debugging

Rich Error Information

# Capture detailed error context
detailed_error_handling = Step("complex-operation").tool("data-processor").inputs(
    input_file="/large-dataset.csv"
).on_error(
    ErrorHandler().on_failure([
        # Capture system state
        Step("capture-system-metrics").tool("metrics-collector").inputs(
            metrics=["cpu", "memory", "disk_space", "network"],
            duration="5m"
        ),
        
        # Capture application logs
        Step("collect-application-logs").tool("log-collector").inputs(
            service="data-processor",
            time_range="last-15m",
            log_levels=["error", "warn"]
        ),
        
        # Capture environment information
        Step("environment-snapshot").tool("env-capturer").inputs(
            include=["env_vars", "running_processes", "open_files"]
        ),
        
        # Create debugging package
        Step("create-debug-package").tool("debug-packager").inputs(
            metrics="${capture-system-metrics.output}",
            logs="${collect-application-logs.output}",
            environment="${environment-snapshot.output}",
            error_details="${complex-operation.error}"
        )
    ])
)

Error Classification

# Classify and handle errors based on type
classified_error_handling = Step("service-deployment").tool("deployer").on_error(
    ErrorHandler()
    .on_error_type("NetworkError", [
        Step("check-network-connectivity").tool("network-tester"),
        Step("retry-with-different-endpoint").tool("deployer").inputs(
            endpoint="backup-endpoint"
        )
    ])
    .on_error_type("AuthenticationError", [
        Step("refresh-credentials").tool("auth-refresher"),
        Step("retry-deployment").tool("deployer")
    ])
    .on_error_type("ResourceLimitError", [
        Step("scale-resources").tool("resource-scaler").inputs(
            scale_factor=1.5
        ),
        Step("retry-with-more-resources").tool("deployer")
    ])
    .on_error_pattern(".*timeout.*", [  # Regex pattern matching
        Step("increase-timeout").tool("config-updater").inputs(
            timeout_multiplier=2
        ),
        Step("retry-deployment").tool("deployer")
    ])
)

Graceful Degradation

Feature Degradation

# Gracefully degrade functionality on errors
feature_deployment = Step("deploy-new-feature").tool("feature-deployer").inputs(
    feature_name="advanced-analytics"
).on_error(
    ErrorHandler().on_failure([
        # Disable the new feature
        Step("disable-feature-flag").tool("feature-flag-manager").inputs(
            feature="advanced-analytics",
            enabled=False
        ),
        
        # Ensure core functionality still works
        Step("verify-core-features").tool("core-feature-tester"),
        
        # Schedule retry for later
        Step("schedule-retry").tool("scheduler").inputs(
            job_name="retry-feature-deployment",
            schedule="+2h"  # Try again in 2 hours
        )
    ])
)

Partial Success Handling

# Handle partial successes gracefully
bulk_operation = Step("process-user-batch").tool("batch-processor").inputs(
    user_batch_size=1000
).on_error(
    ErrorHandler()
    .on_partial_failure([  # Some users processed, some failed
        Step("analyze-failed-users").tool("failure-analyzer").inputs(
            failed_items="${process-user-batch.failed_items}"
        ),
        
        Step("retry-failed-subset").tool("batch-processor").inputs(
            user_list="${analyze-failed-users.retryable_items}",
            batch_size=100  # Smaller batches for problematic items
        ),
        
        Step("quarantine-problematic-users").tool("quarantine-manager").inputs(
            user_list="${analyze-failed-users.problematic_items}"
        )
    ])
    .on_complete_failure([  # Nothing processed successfully
        Step("investigate-systematic-issue").tool("system-analyzer"),
        Step("alert-engineering-team").tool("slack").inputs(
            channel="#engineering-alerts"
        )
    ])
)

Monitoring and Alerting

Error Rate Monitoring

# Monitor error rates and adjust behavior
monitored_operation = Step("high-frequency-operation").tool("api-client").inputs(
    endpoint="/api/data"
).monitor_errors(
    error_rate_threshold=0.05,  # 5% error rate
    time_window="5m",
    
    # Actions when error rate is high
    on_high_error_rate=[
        Step("enable-circuit-breaker").tool("circuit-breaker-manager"),
        Step("scale-down-traffic").tool("traffic-manager").inputs(
            traffic_percentage=50
        )
    ],
    
    # Actions when error rate normalizes
    on_error_rate_normal=[
        Step("disable-circuit-breaker").tool("circuit-breaker-manager"),
        Step("restore-full-traffic").tool("traffic-manager")
    ]
)

Intelligent Alerting

# Smart alerting based on error patterns
smart_alerting = Step("critical-business-operation").tool("payment-processor").on_error(
    ErrorHandler().configure_alerting(
        # Different alert levels based on error characteristics
        alert_rules=[
            {
                "condition": "error.type == 'PaymentFailure' and error.count > 10",
                "alert_level": "critical",
                "recipients": ["payments-team@company.com", "on-call-engineer"]
            },
            {
                "condition": "error.type == 'NetworkTimeout' and error.rate > 0.1",
                "alert_level": "warning", 
                "recipients": ["platform-team@company.com"]
            },
            {
                "condition": "error.duration > 300",  # 5 minutes
                "alert_level": "info",
                "recipients": ["engineering-team@company.com"]
            }
        ],
        
        # Suppress duplicate alerts
        suppression_window="10m",
        
        # Escalation rules
        escalation=[
            {"after": "15m", "to": ["engineering-manager@company.com"]},
            {"after": "30m", "to": ["cto@company.com"]}
        ]
    ])
)

Testing Error Scenarios

Error Injection for Testing

# Inject errors for testing error handling
def create_chaos_testing_workflow():
    """Workflow that intentionally introduces errors for testing"""
    
    normal_operation = Step("normal-service-call").tool("service-client")
    
    # Chaos engineering - randomly inject errors
    chaos_step = Step("chaos-injection").tool("chaos-engineer").inputs(
        chaos_type="random",
        failure_probability=0.1,  # 10% chance of failure
        failure_types=["timeout", "network_error", "service_unavailable"]
    )
    
    # Test how the system handles the chaos
    test_recovery = Step("test-error-recovery").tool("recovery-tester").inputs(
        expected_recovery_time="30s",
        recovery_strategies=["retry", "fallback", "circuit_breaker"]
    )
    
    return Workflow([normal_operation, chaos_step, test_recovery])

Error Simulation

# Simulate specific error conditions
error_simulation_tests = [
    Step("simulate-database-down").tool("error-simulator").inputs(
        error_type="database_connection_error",
        duration="2m"
    ),
    
    Step("simulate-high-latency").tool("network-simulator").inputs(
        latency="5000ms",
        packet_loss="10%"
    ),
    
    Step("simulate-resource-exhaustion").tool("resource-simulator").inputs(
        memory_pressure=True,
        cpu_usage=95
    )
]

Best Practices

Error Handler Design

  • Handle errors at the appropriate level (step vs workflow)
  • Provide meaningful error messages and context
  • Implement proper logging and monitoring
  • Design for graceful degradation

Recovery Strategies

  • Plan recovery strategies before implementation
  • Test all error handling paths
  • Implement circuit breakers for external dependencies
  • Use exponential backoff for retries

Monitoring and Alerting

  • Monitor error rates and patterns
  • Implement intelligent alerting to reduce noise
  • Create runbooks for common error scenarios
  • Track error handling effectiveness

Testing

  • Test error scenarios regularly
  • Use chaos engineering practices
  • Validate error handling in staging environments
  • Document error handling behaviors