Basic Error Handling
Step-Level Error Handling
Copy
Ask AI
from kubiya_sdk import Step, ErrorHandler
# Basic error handling for a step
risky_deployment = Step("deploy-to-production").tool("kubernetes-deployer").inputs(
service="payment-service",
environment="production"
).on_error(
ErrorHandler().on_failure([
Step("notify-failure").tool("slack").inputs(
channel="#incidents",
message="🚨 Production deployment failed!"
),
Step("rollback-deployment").tool("kubernetes-rollback").inputs(
service="payment-service",
environment="production"
)
])
)
Multiple Error Types
Copy
Ask AI
# Handle different types of errors differently
database_operation = Step("complex-database-migration").tool("migration-runner").inputs(
migration_file="complex_schema_change.sql",
database="production"
).on_error(
ErrorHandler()
.on_timeout([
Step("extend-maintenance-window").tool("maintenance-scheduler").inputs(
duration="additional-30m"
),
Step("retry-with-longer-timeout").tool("migration-runner").inputs(
migration_file="complex_schema_change.sql",
timeout="45m"
)
])
.on_failure([
Step("restore-database-backup").tool("backup-restorer").inputs(
backup_file="${pre_migration_backup.backup_file}"
),
Step("alert-dba-team").tool("pager-duty").inputs(
service="database-team",
message="Critical: Migration rollback initiated"
)
])
.on_resource_limit([
Step("scale-up-database").tool("database-scaler").inputs(
cpu_cores=8,
memory="16Gi"
),
Step("retry-migration").tool("migration-runner").inputs(
migration_file="complex_schema_change.sql"
)
])
)
Retry Mechanisms
Simple Retry
Copy
Ask AI
# Step with automatic retry
flaky_external_api = Step("call-external-service").tool("http-client").inputs(
url="https://unreliable-api.com/data",
headers={"Authorization": "Bearer ${API_TOKEN}"}
).retry(
max_attempts=3,
backoff_strategy="exponential",
initial_delay=5, # seconds
max_delay=60,
retry_on_status=[500, 502, 503, 504]
)
Advanced Retry Logic
Copy
Ask AI
# Conditional retry with custom logic
intelligent_retry = Step("smart-deployment").tool("smart-deployer").inputs(
service="user-service",
strategy="blue-green"
).retry(
max_attempts=5,
backoff_strategy="custom",
retry_condition="""
${error.type} == 'TemporaryError' or
(${error.type} == 'ResourceError' and ${attempt_count} < 3)
""",
custom_backoff=lambda attempt: min(2 ** attempt * 10, 300), # Cap at 5 minutes
# Different action per retry attempt
on_retry_attempt=[
(1, Step("log-first-retry").tool("logger").inputs(level="info")),
(3, Step("alert-operations").tool("slack").inputs(channel="#ops")),
(5, Step("escalate-to-engineer").tool("pager-duty"))
]
)
Circuit Breaker Pattern
Copy
Ask AI
# Circuit breaker for frequently failing operations
circuit_breaker_step = Step("external-dependency").tool("http-client").inputs(
url="https://flaky-service.com/api"
).circuit_breaker(
failure_threshold=5, # Open circuit after 5 failures
recovery_timeout=60, # Try to close circuit after 60 seconds
success_threshold=3, # Close circuit after 3 successful calls
# Fallback when circuit is open
fallback_action=Step("use-cached-data").tool("cache-reader").inputs(
cache_key="fallback_data"
)
)
Workflow-Level Error Handling
Global Error Handlers
Copy
Ask AI
from kubiya_sdk import Workflow, GlobalErrorHandler
@workflow
def resilient_deployment_pipeline():
"""Deployment pipeline with comprehensive error handling"""
# Define global error handler
global_handler = GlobalErrorHandler()
global_handler.on_any_failure([
Step("capture-workflow-state").tool("state-capturer").inputs(
workflow_id="${WORKFLOW_ID}",
timestamp="${CURRENT_TIMESTAMP}"
),
Step("notify-platform-team").tool("slack").inputs(
channel="#platform",
message="Workflow ${WORKFLOW_ID} encountered an error"
)
])
# Define workflow steps
build_step = Step("build-application").tool("builder")
test_step = Step("run-tests").tool("test-runner")
deploy_step = Step("deploy-application").tool("deployer")
return Workflow(
steps=[build_step, test_step, deploy_step],
error_handler=global_handler
)
Workflow Recovery Strategies
Copy
Ask AI
@workflow
def self_healing_deployment():
"""Workflow that can recover from various failure scenarios"""
# Pre-deployment backup
backup_step = Step("create-backup").tool("backup-creator").inputs(
environment="production"
)
# Main deployment
deployment_step = Step("deploy-service").tool("deployer").inputs(
service="critical-service"
).on_error(
ErrorHandler()
.on_failure([
# Immediate rollback
Step("emergency-rollback").tool("rollback-tool").inputs(
backup_id="${backup_step.backup_id}"
),
# Health check after rollback
Step("verify-rollback").tool("health-checker"),
# If rollback fails, escalate immediately
Step("escalate-critical-failure").tool("incident-manager").inputs(
severity="critical",
auto_page=True
).condition("${emergency_rollback.status} == 'failed'")
])
)
# Post-deployment verification
verification_step = Step("verify-deployment").tool("verification-suite").on_error(
ErrorHandler().on_failure([
# Verification failed - safe rollback
Step("safe-rollback").tool("blue-green-rollback"),
Step("analyze-failure").tool("log-analyzer").inputs(
time_range="last-10m"
)
])
)
return Workflow([backup_step, deployment_step, verification_step])
Error Context and Debugging
Rich Error Information
Copy
Ask AI
# Capture detailed error context
detailed_error_handling = Step("complex-operation").tool("data-processor").inputs(
input_file="/large-dataset.csv"
).on_error(
ErrorHandler().on_failure([
# Capture system state
Step("capture-system-metrics").tool("metrics-collector").inputs(
metrics=["cpu", "memory", "disk_space", "network"],
duration="5m"
),
# Capture application logs
Step("collect-application-logs").tool("log-collector").inputs(
service="data-processor",
time_range="last-15m",
log_levels=["error", "warn"]
),
# Capture environment information
Step("environment-snapshot").tool("env-capturer").inputs(
include=["env_vars", "running_processes", "open_files"]
),
# Create debugging package
Step("create-debug-package").tool("debug-packager").inputs(
metrics="${capture-system-metrics.output}",
logs="${collect-application-logs.output}",
environment="${environment-snapshot.output}",
error_details="${complex-operation.error}"
)
])
)
Error Classification
Copy
Ask AI
# Classify and handle errors based on type
classified_error_handling = Step("service-deployment").tool("deployer").on_error(
ErrorHandler()
.on_error_type("NetworkError", [
Step("check-network-connectivity").tool("network-tester"),
Step("retry-with-different-endpoint").tool("deployer").inputs(
endpoint="backup-endpoint"
)
])
.on_error_type("AuthenticationError", [
Step("refresh-credentials").tool("auth-refresher"),
Step("retry-deployment").tool("deployer")
])
.on_error_type("ResourceLimitError", [
Step("scale-resources").tool("resource-scaler").inputs(
scale_factor=1.5
),
Step("retry-with-more-resources").tool("deployer")
])
.on_error_pattern(".*timeout.*", [ # Regex pattern matching
Step("increase-timeout").tool("config-updater").inputs(
timeout_multiplier=2
),
Step("retry-deployment").tool("deployer")
])
)
Graceful Degradation
Feature Degradation
Copy
Ask AI
# Gracefully degrade functionality on errors
feature_deployment = Step("deploy-new-feature").tool("feature-deployer").inputs(
feature_name="advanced-analytics"
).on_error(
ErrorHandler().on_failure([
# Disable the new feature
Step("disable-feature-flag").tool("feature-flag-manager").inputs(
feature="advanced-analytics",
enabled=False
),
# Ensure core functionality still works
Step("verify-core-features").tool("core-feature-tester"),
# Schedule retry for later
Step("schedule-retry").tool("scheduler").inputs(
job_name="retry-feature-deployment",
schedule="+2h" # Try again in 2 hours
)
])
)
Partial Success Handling
Copy
Ask AI
# Handle partial successes gracefully
bulk_operation = Step("process-user-batch").tool("batch-processor").inputs(
user_batch_size=1000
).on_error(
ErrorHandler()
.on_partial_failure([ # Some users processed, some failed
Step("analyze-failed-users").tool("failure-analyzer").inputs(
failed_items="${process-user-batch.failed_items}"
),
Step("retry-failed-subset").tool("batch-processor").inputs(
user_list="${analyze-failed-users.retryable_items}",
batch_size=100 # Smaller batches for problematic items
),
Step("quarantine-problematic-users").tool("quarantine-manager").inputs(
user_list="${analyze-failed-users.problematic_items}"
)
])
.on_complete_failure([ # Nothing processed successfully
Step("investigate-systematic-issue").tool("system-analyzer"),
Step("alert-engineering-team").tool("slack").inputs(
channel="#engineering-alerts"
)
])
)
Monitoring and Alerting
Error Rate Monitoring
Copy
Ask AI
# Monitor error rates and adjust behavior
monitored_operation = Step("high-frequency-operation").tool("api-client").inputs(
endpoint="/api/data"
).monitor_errors(
error_rate_threshold=0.05, # 5% error rate
time_window="5m",
# Actions when error rate is high
on_high_error_rate=[
Step("enable-circuit-breaker").tool("circuit-breaker-manager"),
Step("scale-down-traffic").tool("traffic-manager").inputs(
traffic_percentage=50
)
],
# Actions when error rate normalizes
on_error_rate_normal=[
Step("disable-circuit-breaker").tool("circuit-breaker-manager"),
Step("restore-full-traffic").tool("traffic-manager")
]
)
Intelligent Alerting
Copy
Ask AI
# Smart alerting based on error patterns
smart_alerting = Step("critical-business-operation").tool("payment-processor").on_error(
ErrorHandler().configure_alerting(
# Different alert levels based on error characteristics
alert_rules=[
{
"condition": "error.type == 'PaymentFailure' and error.count > 10",
"alert_level": "critical",
"recipients": ["payments-team@company.com", "on-call-engineer"]
},
{
"condition": "error.type == 'NetworkTimeout' and error.rate > 0.1",
"alert_level": "warning",
"recipients": ["platform-team@company.com"]
},
{
"condition": "error.duration > 300", # 5 minutes
"alert_level": "info",
"recipients": ["engineering-team@company.com"]
}
],
# Suppress duplicate alerts
suppression_window="10m",
# Escalation rules
escalation=[
{"after": "15m", "to": ["engineering-manager@company.com"]},
{"after": "30m", "to": ["cto@company.com"]}
]
])
)
Testing Error Scenarios
Error Injection for Testing
Copy
Ask AI
# Inject errors for testing error handling
def create_chaos_testing_workflow():
"""Workflow that intentionally introduces errors for testing"""
normal_operation = Step("normal-service-call").tool("service-client")
# Chaos engineering - randomly inject errors
chaos_step = Step("chaos-injection").tool("chaos-engineer").inputs(
chaos_type="random",
failure_probability=0.1, # 10% chance of failure
failure_types=["timeout", "network_error", "service_unavailable"]
)
# Test how the system handles the chaos
test_recovery = Step("test-error-recovery").tool("recovery-tester").inputs(
expected_recovery_time="30s",
recovery_strategies=["retry", "fallback", "circuit_breaker"]
)
return Workflow([normal_operation, chaos_step, test_recovery])
Error Simulation
Copy
Ask AI
# Simulate specific error conditions
error_simulation_tests = [
Step("simulate-database-down").tool("error-simulator").inputs(
error_type="database_connection_error",
duration="2m"
),
Step("simulate-high-latency").tool("network-simulator").inputs(
latency="5000ms",
packet_loss="10%"
),
Step("simulate-resource-exhaustion").tool("resource-simulator").inputs(
memory_pressure=True,
cpu_usage=95
)
]
Best Practices
Error Handler Design
- Handle errors at the appropriate level (step vs workflow)
- Provide meaningful error messages and context
- Implement proper logging and monitoring
- Design for graceful degradation
Recovery Strategies
- Plan recovery strategies before implementation
- Test all error handling paths
- Implement circuit breakers for external dependencies
- Use exponential backoff for retries
Monitoring and Alerting
- Monitor error rates and patterns
- Implement intelligent alerting to reduce noise
- Create runbooks for common error scenarios
- Track error handling effectiveness
Testing
- Test error scenarios regularly
- Use chaos engineering practices
- Validate error handling in staging environments
- Document error handling behaviors