Skip to main content
Effective error handling is crucial for building reliable workflows. The Kubiya Workflow DSL provides comprehensive error handling mechanisms to gracefully handle failures and implement recovery strategies.

Basic Error Handling

Step-Level Error Handling

from kubiya_sdk import Step, ErrorHandler

# Basic error handling for a step
risky_deployment = Step("deploy-to-production").tool("kubernetes-deployer").inputs(
    service="payment-service",
    environment="production"
).on_error(
    ErrorHandler().on_failure([
        Step("notify-failure").tool("slack").inputs(
            channel="#incidents",
            message="🚨 Production deployment failed!"
        ),
        Step("rollback-deployment").tool("kubernetes-rollback").inputs(
            service="payment-service",
            environment="production"
        )
    ])
)

Multiple Error Types

# Handle different types of errors differently
database_operation = Step("complex-database-migration").tool("migration-runner").inputs(
    migration_file="complex_schema_change.sql",
    database="production"
).on_error(
    ErrorHandler()
    .on_timeout([
        Step("extend-maintenance-window").tool("maintenance-scheduler").inputs(
            duration="additional-30m"
        ),
        Step("retry-with-longer-timeout").tool("migration-runner").inputs(
            migration_file="complex_schema_change.sql",
            timeout="45m"
        )
    ])
    .on_failure([
        Step("restore-database-backup").tool("backup-restorer").inputs(
            backup_file="${pre_migration_backup.backup_file}"
        ),
        Step("alert-dba-team").tool("pager-duty").inputs(
            service="database-team",
            message="Critical: Migration rollback initiated"
        )
    ])
    .on_resource_limit([
        Step("scale-up-database").tool("database-scaler").inputs(
            cpu_cores=8,
            memory="16Gi"
        ),
        Step("retry-migration").tool("migration-runner").inputs(
            migration_file="complex_schema_change.sql"
        )
    ])
)

Retry Mechanisms

Simple Retry

# Step with automatic retry
flaky_external_api = Step("call-external-service").tool("http-client").inputs(
    url="https://unreliable-api.com/data",
    headers={"Authorization": "Bearer ${API_TOKEN}"}
).retry(
    max_attempts=3,
    backoff_strategy="exponential",
    initial_delay=5,  # seconds
    max_delay=60,
    retry_on_status=[500, 502, 503, 504]
)

Advanced Retry Logic

# Conditional retry with custom logic
intelligent_retry = Step("smart-deployment").tool("smart-deployer").inputs(
    service="user-service",
    strategy="blue-green"
).retry(
    max_attempts=5,
    backoff_strategy="custom",
    retry_condition="""
        ${error.type} == 'TemporaryError' or
        (${error.type} == 'ResourceError' and ${attempt_count} < 3)
    """,
    custom_backoff=lambda attempt: min(2 ** attempt * 10, 300),  # Cap at 5 minutes
    
    # Different action per retry attempt
    on_retry_attempt=[
        (1, Step("log-first-retry").tool("logger").inputs(level="info")),
        (3, Step("alert-operations").tool("slack").inputs(channel="#ops")),
        (5, Step("escalate-to-engineer").tool("pager-duty"))
    ]
)

Circuit Breaker Pattern

# Circuit breaker for frequently failing operations
circuit_breaker_step = Step("external-dependency").tool("http-client").inputs(
    url="https://flaky-service.com/api"
).circuit_breaker(
    failure_threshold=5,        # Open circuit after 5 failures
    recovery_timeout=60,        # Try to close circuit after 60 seconds
    success_threshold=3,        # Close circuit after 3 successful calls
    
    # Fallback when circuit is open
    fallback_action=Step("use-cached-data").tool("cache-reader").inputs(
        cache_key="fallback_data"
    )
)

Workflow-Level Error Handling

Global Error Handlers

from kubiya_sdk import Workflow, GlobalErrorHandler

@workflow
def resilient_deployment_pipeline():
    """Deployment pipeline with comprehensive error handling"""
    
    # Define global error handler
    global_handler = GlobalErrorHandler()
    global_handler.on_any_failure([
        Step("capture-workflow-state").tool("state-capturer").inputs(
            workflow_id="${WORKFLOW_ID}",
            timestamp="${CURRENT_TIMESTAMP}"
        ),
        Step("notify-platform-team").tool("slack").inputs(
            channel="#platform",
            message="Workflow ${WORKFLOW_ID} encountered an error"
        )
    ])
    
    # Define workflow steps
    build_step = Step("build-application").tool("builder")
    test_step = Step("run-tests").tool("test-runner")
    deploy_step = Step("deploy-application").tool("deployer")
    
    return Workflow(
        steps=[build_step, test_step, deploy_step],
        error_handler=global_handler
    )

Workflow Recovery Strategies

@workflow
def self_healing_deployment():
    """Workflow that can recover from various failure scenarios"""
    
    # Pre-deployment backup
    backup_step = Step("create-backup").tool("backup-creator").inputs(
        environment="production"
    )
    
    # Main deployment
    deployment_step = Step("deploy-service").tool("deployer").inputs(
        service="critical-service"
    ).on_error(
        ErrorHandler()
        .on_failure([
            # Immediate rollback
            Step("emergency-rollback").tool("rollback-tool").inputs(
                backup_id="${backup_step.backup_id}"
            ),
            
            # Health check after rollback
            Step("verify-rollback").tool("health-checker"),
            
            # If rollback fails, escalate immediately
            Step("escalate-critical-failure").tool("incident-manager").inputs(
                severity="critical",
                auto_page=True
            ).condition("${emergency_rollback.status} == 'failed'")
        ])
    )
    
    # Post-deployment verification
    verification_step = Step("verify-deployment").tool("verification-suite").on_error(
        ErrorHandler().on_failure([
            # Verification failed - safe rollback
            Step("safe-rollback").tool("blue-green-rollback"),
            Step("analyze-failure").tool("log-analyzer").inputs(
                time_range="last-10m"
            )
        ])
    )
    
    return Workflow([backup_step, deployment_step, verification_step])

Error Context and Debugging

Rich Error Information

# Capture detailed error context
detailed_error_handling = Step("complex-operation").tool("data-processor").inputs(
    input_file="/large-dataset.csv"
).on_error(
    ErrorHandler().on_failure([
        # Capture system state
        Step("capture-system-metrics").tool("metrics-collector").inputs(
            metrics=["cpu", "memory", "disk_space", "network"],
            duration="5m"
        ),
        
        # Capture application logs
        Step("collect-application-logs").tool("log-collector").inputs(
            service="data-processor",
            time_range="last-15m",
            log_levels=["error", "warn"]
        ),
        
        # Capture environment information
        Step("environment-snapshot").tool("env-capturer").inputs(
            include=["env_vars", "running_processes", "open_files"]
        ),
        
        # Create debugging package
        Step("create-debug-package").tool("debug-packager").inputs(
            metrics="${capture-system-metrics.output}",
            logs="${collect-application-logs.output}",
            environment="${environment-snapshot.output}",
            error_details="${complex-operation.error}"
        )
    ])
)

Error Classification

# Classify and handle errors based on type
classified_error_handling = Step("service-deployment").tool("deployer").on_error(
    ErrorHandler()
    .on_error_type("NetworkError", [
        Step("check-network-connectivity").tool("network-tester"),
        Step("retry-with-different-endpoint").tool("deployer").inputs(
            endpoint="backup-endpoint"
        )
    ])
    .on_error_type("AuthenticationError", [
        Step("refresh-credentials").tool("auth-refresher"),
        Step("retry-deployment").tool("deployer")
    ])
    .on_error_type("ResourceLimitError", [
        Step("scale-resources").tool("resource-scaler").inputs(
            scale_factor=1.5
        ),
        Step("retry-with-more-resources").tool("deployer")
    ])
    .on_error_pattern(".*timeout.*", [  # Regex pattern matching
        Step("increase-timeout").tool("config-updater").inputs(
            timeout_multiplier=2
        ),
        Step("retry-deployment").tool("deployer")
    ])
)

Graceful Degradation

Feature Degradation

# Gracefully degrade functionality on errors
feature_deployment = Step("deploy-new-feature").tool("feature-deployer").inputs(
    feature_name="advanced-analytics"
).on_error(
    ErrorHandler().on_failure([
        # Disable the new feature
        Step("disable-feature-flag").tool("feature-flag-manager").inputs(
            feature="advanced-analytics",
            enabled=False
        ),
        
        # Ensure core functionality still works
        Step("verify-core-features").tool("core-feature-tester"),
        
        # Schedule retry for later
        Step("schedule-retry").tool("scheduler").inputs(
            job_name="retry-feature-deployment",
            schedule="+2h"  # Try again in 2 hours
        )
    ])
)

Partial Success Handling

# Handle partial successes gracefully
bulk_operation = Step("process-user-batch").tool("batch-processor").inputs(
    user_batch_size=1000
).on_error(
    ErrorHandler()
    .on_partial_failure([  # Some users processed, some failed
        Step("analyze-failed-users").tool("failure-analyzer").inputs(
            failed_items="${process-user-batch.failed_items}"
        ),
        
        Step("retry-failed-subset").tool("batch-processor").inputs(
            user_list="${analyze-failed-users.retryable_items}",
            batch_size=100  # Smaller batches for problematic items
        ),
        
        Step("quarantine-problematic-users").tool("quarantine-manager").inputs(
            user_list="${analyze-failed-users.problematic_items}"
        )
    ])
    .on_complete_failure([  # Nothing processed successfully
        Step("investigate-systematic-issue").tool("system-analyzer"),
        Step("alert-engineering-team").tool("slack").inputs(
            channel="#engineering-alerts"
        )
    ])
)

Monitoring and Alerting

Error Rate Monitoring

# Monitor error rates and adjust behavior
monitored_operation = Step("high-frequency-operation").tool("api-client").inputs(
    endpoint="/api/data"
).monitor_errors(
    error_rate_threshold=0.05,  # 5% error rate
    time_window="5m",
    
    # Actions when error rate is high
    on_high_error_rate=[
        Step("enable-circuit-breaker").tool("circuit-breaker-manager"),
        Step("scale-down-traffic").tool("traffic-manager").inputs(
            traffic_percentage=50
        )
    ],
    
    # Actions when error rate normalizes
    on_error_rate_normal=[
        Step("disable-circuit-breaker").tool("circuit-breaker-manager"),
        Step("restore-full-traffic").tool("traffic-manager")
    ]
)

Intelligent Alerting

# Smart alerting based on error patterns
smart_alerting = Step("critical-business-operation").tool("payment-processor").on_error(
    ErrorHandler().configure_alerting(
        # Different alert levels based on error characteristics
        alert_rules=[
            {
                "condition": "error.type == 'PaymentFailure' and error.count > 10",
                "alert_level": "critical",
                "recipients": ["payments-team@company.com", "on-call-engineer"]
            },
            {
                "condition": "error.type == 'NetworkTimeout' and error.rate > 0.1",
                "alert_level": "warning", 
                "recipients": ["platform-team@company.com"]
            },
            {
                "condition": "error.duration > 300",  # 5 minutes
                "alert_level": "info",
                "recipients": ["engineering-team@company.com"]
            }
        ],
        
        # Suppress duplicate alerts
        suppression_window="10m",
        
        # Escalation rules
        escalation=[
            {"after": "15m", "to": ["engineering-manager@company.com"]},
            {"after": "30m", "to": ["cto@company.com"]}
        ]
    ])
)

Testing Error Scenarios

Error Injection for Testing

# Inject errors for testing error handling
def create_chaos_testing_workflow():
    """Workflow that intentionally introduces errors for testing"""
    
    normal_operation = Step("normal-service-call").tool("service-client")
    
    # Chaos engineering - randomly inject errors
    chaos_step = Step("chaos-injection").tool("chaos-engineer").inputs(
        chaos_type="random",
        failure_probability=0.1,  # 10% chance of failure
        failure_types=["timeout", "network_error", "service_unavailable"]
    )
    
    # Test how the system handles the chaos
    test_recovery = Step("test-error-recovery").tool("recovery-tester").inputs(
        expected_recovery_time="30s",
        recovery_strategies=["retry", "fallback", "circuit_breaker"]
    )
    
    return Workflow([normal_operation, chaos_step, test_recovery])

Error Simulation

# Simulate specific error conditions
error_simulation_tests = [
    Step("simulate-database-down").tool("error-simulator").inputs(
        error_type="database_connection_error",
        duration="2m"
    ),
    
    Step("simulate-high-latency").tool("network-simulator").inputs(
        latency="5000ms",
        packet_loss="10%"
    ),
    
    Step("simulate-resource-exhaustion").tool("resource-simulator").inputs(
        memory_pressure=True,
        cpu_usage=95
    )
]

Best Practices

Error Handler Design

  • Handle errors at the appropriate level (step vs workflow)
  • Provide meaningful error messages and context
  • Implement proper logging and monitoring
  • Design for graceful degradation

Recovery Strategies

  • Plan recovery strategies before implementation
  • Test all error handling paths
  • Implement circuit breakers for external dependencies
  • Use exponential backoff for retries

Monitoring and Alerting

  • Monitor error rates and patterns
  • Implement intelligent alerting to reduce noise
  • Create runbooks for common error scenarios
  • Track error handling effectiveness

Testing

  • Test error scenarios regularly
  • Use chaos engineering practices
  • Validate error handling in staging environments
  • Document error handling behaviors