Basic Step Structure
Creating a Simple Step
Copy
Ask AI
from kubiya_sdk import Step
# Basic step definition
check_status = Step("check-service-status").tool("http-client").inputs(
url="https://api.myservice.com/health",
method="GET",
timeout=30
)
Step with Multiple Inputs
Copy
Ask AI
# Deploy application step
deploy_app = Step("deploy-application").tool("kubernetes-deployer").inputs(
service_name="user-service",
image="user-service:v1.2.0",
namespace="production",
replicas=3,
port=8080,
env_vars={
"DATABASE_URL": "${database.connection_string}",
"REDIS_URL": "${cache.redis_url}",
"LOG_LEVEL": "info"
}
)
Step Configuration
Step Names and Descriptions
Copy
Ask AI
# Well-named step with description
validate_deployment = Step(
name="validate-deployment-prerequisites",
description="Verify all prerequisites are met before deployment"
).tool("deployment-validator").inputs(
environment="production",
service="user-service",
checks=["health_endpoints", "database_migrations", "feature_flags"]
)
Step Timeouts
Copy
Ask AI
# Step with custom timeout
long_running_task = Step("database-migration").tool("migration-runner").inputs(
migration_script="001_add_user_preferences.sql",
database="production"
).timeout(minutes=30) # 30 minute timeout
# Step with retry configuration
flaky_operation = Step("external-api-call").tool("http-client").inputs(
url="https://external-api.com/data",
headers={"Authorization": "Bearer ${api_token}"}
).retry(
max_attempts=3,
backoff_strategy="exponential",
initial_delay=5, # seconds
max_delay=60
)
Step Dependencies
Sequential Dependencies
Copy
Ask AI
# Steps that must run in order
build_image = Step("build-docker-image").tool("docker-builder").inputs(
dockerfile_path="./Dockerfile",
image_name="myapp",
tag="${BUILD_VERSION}"
)
push_image = Step("push-to-registry").tool("docker-pusher").inputs(
image="myapp:${BUILD_VERSION}",
registry="gcr.io/myproject"
).depends_on(build_image)
deploy_service = Step("deploy-to-kubernetes").tool("kubectl").inputs(
image="gcr.io/myproject/myapp:${BUILD_VERSION}",
namespace="production"
).depends_on(push_image)
Multiple Dependencies
Copy
Ask AI
# Step that depends on multiple previous steps
integration_tests = Step("run-integration-tests").tool("test-runner").inputs(
test_suite="integration",
environment="staging"
)
load_tests = Step("run-load-tests").tool("load-tester").inputs(
target_url="https://staging.myapp.com",
concurrent_users=100
)
# Deploy only after both tests pass
production_deploy = Step("deploy-to-production").tool("kubernetes-deployer").inputs(
service="myapp",
environment="production"
).depends_on([integration_tests, load_tests])
Conditional Steps
Simple Conditions
Copy
Ask AI
# Step that only runs under certain conditions
rollback_step = Step("rollback-deployment").tool("kubernetes-rollback").inputs(
service="myapp",
namespace="production",
target_revision="previous"
).condition("${health_check.status} == 'failed'")
# Environment-specific step
staging_notification = Step("notify-staging-team").tool("slack").inputs(
channel="#staging-deploys",
message="🚀 Deployment to staging completed"
).condition("${ENVIRONMENT} == 'staging'")
Complex Conditions
Copy
Ask AI
# Multiple condition logic
canary_promotion = Step("promote-canary").tool("traffic-manager").inputs(
service="myapp",
traffic_split=100 # Full traffic
).condition("""
${canary_deploy.status} == 'success' and
${health_metrics.error_rate} < 0.01 and
${health_metrics.response_time} < 200
""")
Step Outputs and Variables
Capturing Step Outputs
Copy
Ask AI
# Step that produces output for other steps
database_backup = Step("backup-database").tool("pg_dump").inputs(
database="production",
backup_location="/backups/"
).outputs([
"backup_file_path",
"backup_size",
"backup_timestamp"
])
# Step that uses previous step output
verify_backup = Step("verify-backup").tool("backup-verifier").inputs(
backup_file="${database_backup.backup_file_path}",
expected_min_size="1GB"
).depends_on(database_backup)
Variable Interpolation
Copy
Ask AI
# Using workflow variables in steps
deploy_step = Step("deploy-service").tool("helm").inputs(
chart="./helm-chart",
release_name="${SERVICE_NAME}-${ENVIRONMENT}",
values={
"image.repository": "${DOCKER_REGISTRY}/${SERVICE_NAME}",
"image.tag": "${BUILD_VERSION}",
"replicaCount": "${REPLICA_COUNT}",
"environment": "${ENVIRONMENT}"
}
)
Step Types and Patterns
HTTP API Steps
Copy
Ask AI
# RESTful API calls
api_call = Step("call-external-api").tool("http-client").inputs(
url="https://api.external-service.com/v1/data",
method="POST",
headers={
"Authorization": "Bearer ${api_token}",
"Content-Type": "application/json"
},
body={
"query": "user_data",
"filters": ["active", "verified"]
},
timeout=60,
expected_status_codes=[200, 201]
)
Database Operations
Copy
Ask AI
# Database query step
user_lookup = Step("lookup-user").tool("postgres-client").inputs(
query="SELECT id, email, status FROM users WHERE id = $1",
parameters=["${USER_ID}"],
connection_string="${DATABASE_URL}"
)
# Database migration step
run_migration = Step("apply-migration").tool("flyway").inputs(
migration_location="classpath:db/migration",
database_url="${DATABASE_URL}",
migrate_command="migrate"
)
File System Operations
Copy
Ask AI
# File manipulation steps
copy_config = Step("copy-configuration").tool("file-manager").inputs(
source_path="/templates/app.conf.template",
destination_path="/app/config/app.conf",
replacements={
"{{DATABASE_HOST}}": "${DATABASE_HOST}",
"{{API_KEY}}": "${API_KEY}"
}
)
backup_logs = Step("archive-logs").tool("tar").inputs(
source_directory="/var/log/myapp",
archive_name="logs-${BUILD_VERSION}-${TIMESTAMP}.tar.gz",
destination="/backups/logs/"
)
Kubernetes Operations
Copy
Ask AI
# Kubernetes deployment step
k8s_deploy = Step("deploy-to-k8s").tool("kubectl").inputs(
action="apply",
manifest_file="deployment.yaml",
namespace="${ENVIRONMENT}",
context="${K8S_CONTEXT}"
)
# Kubernetes scaling step
scale_deployment = Step("scale-service").tool("kubectl").inputs(
action="scale",
resource_type="deployment",
resource_name="${SERVICE_NAME}",
replicas="${TARGET_REPLICAS}",
namespace="${ENVIRONMENT}"
)
Advanced Step Features
Dynamic Step Generation
Copy
Ask AI
def create_service_deployment_steps(services):
"""Generate deployment steps for multiple services"""
steps = []
for service in services:
# Build step
build_step = Step(f"build-{service}").tool("docker-builder").inputs(
service_name=service,
dockerfile_path=f"./services/{service}/Dockerfile"
)
# Test step
test_step = Step(f"test-{service}").tool("test-runner").inputs(
service=service,
test_type="unit"
).depends_on(build_step)
# Deploy step
deploy_step = Step(f"deploy-{service}").tool("kubernetes-deployer").inputs(
service=service,
environment="${ENVIRONMENT}"
).depends_on(test_step)
steps.extend([build_step, test_step, deploy_step])
return steps
# Use dynamic generation
services = ["user-service", "payment-service", "notification-service"]
deployment_steps = create_service_deployment_steps(services)
Step Templates
Copy
Ask AI
# Reusable step template
def create_health_check_step(service_name, endpoint="/health"):
return Step(f"health-check-{service_name}").tool("http-client").inputs(
url=f"https://{service_name}.myapp.com{endpoint}",
method="GET",
expected_status_code=200,
timeout=30
)
# Use template
frontend_health = create_health_check_step("frontend-service")
api_health = create_health_check_step("api-service", "/api/health")
Step Hooks and Callbacks
Copy
Ask AI
# Step with pre and post execution hooks
deployment_step = Step("deploy-application").tool("kubernetes-deployer").inputs(
service="myapp",
environment="production"
).on_start(
# Execute before step starts
Step("pre-deployment-notification").tool("slack").inputs(
channel="#deployments",
message="🚀 Starting deployment of myapp to production"
)
).on_success(
# Execute on successful completion
Step("post-deployment-notification").tool("slack").inputs(
channel="#deployments",
message="✅ Successfully deployed myapp to production"
)
).on_failure(
# Execute on failure
Step("deployment-failure-alert").tool("slack").inputs(
channel="#incidents",
message="🚨 Deployment of myapp to production failed!"
)
)
Error Handling in Steps
Step-Level Error Handling
Copy
Ask AI
# Step with custom error handling
risky_operation = Step("risky-database-operation").tool("postgres-client").inputs(
query="UPDATE users SET status = 'migrated' WHERE created_at < '2023-01-01'",
connection_string="${DATABASE_URL}"
).on_error(
# What to do if this step fails
actions=[
Step("log-error").tool("logger").inputs(
level="error",
message="Database migration failed: ${error.message}"
),
Step("rollback-changes").tool("postgres-client").inputs(
query="UPDATE users SET status = 'active' WHERE status = 'migrated'",
connection_string="${DATABASE_URL}"
)
],
continue_workflow=False # Stop workflow on error
)
Graceful Degradation
Copy
Ask AI
# Step with fallback behavior
primary_service_call = Step("call-primary-service").tool("http-client").inputs(
url="https://primary-service.com/api/data",
timeout=10
).on_error(
# Fallback to secondary service
fallback=Step("call-secondary-service").tool("http-client").inputs(
url="https://secondary-service.com/api/data",
timeout=30
),
continue_workflow=True
)
Step Performance and Monitoring
Step Metrics
Copy
Ask AI
# Step with performance monitoring
monitored_step = Step("performance-critical-operation").tool("data-processor").inputs(
input_file="/data/large_dataset.csv",
output_file="/processed/results.json"
).monitor(
metrics=["duration", "memory_usage", "cpu_usage"],
alerts=[
{"metric": "duration", "threshold": "5m", "action": "notify"},
{"metric": "memory_usage", "threshold": "2GB", "action": "alert"}
]
)
Step Logging
Copy
Ask AI
# Step with detailed logging
logged_step = Step("important-business-operation").tool("business-processor").inputs(
transaction_id="${TRANSACTION_ID}",
amount="${AMOUNT}",
currency="${CURRENCY}"
).log_level("debug").log_outputs(True).inputs_logging(
# Don't log sensitive inputs
exclude_fields=["credit_card", "ssn", "password"]
)
Best Practices
Step Naming
- Use descriptive, action-oriented names
- Include the purpose and context
- Use consistent naming conventions
- Avoid abbreviations
Step Design
- Keep steps focused on single responsibilities
- Make steps idempotent when possible
- Use appropriate timeouts
- Handle errors gracefully
Variable Usage
- Use meaningful variable names
- Document variable purposes
- Validate inputs when possible
- Use defaults for optional parameters
Dependencies
- Minimize unnecessary dependencies
- Use parallel execution when possible
- Consider failure scenarios
- Document dependency relationships