Why Use Workflow DSL?
Version Control: Store workflows as code in your repository with full version history Type Safety: Python type hints and IDE support catch errors before runtime Reusability: Create reusable components and share workflows across teams Testing: Unit test your workflows before deploying to productionBasic Workflow Structure
Copy
Ask AI
from kubiya import Workflow, Step, Parallel
from kubiya.tools import KubectlTool, SlackTool
# Define a workflow
@workflow
def deploy_application(service_name: str, version: str, environment: str):
"""Deploy application with health checks and notifications."""
# Pre-deployment validation
validate = Step("validate-deployment").tool("deployment-validator").inputs(
service=service_name,
version=version,
environment=environment
)
# Deploy the application
deploy = Step("deploy-service").tool("kubernetes-deployer").inputs(
service=service_name,
image=f"{service_name}:{version}",
namespace=environment
).depends_on(validate)
# Health checks
health_check = Step("health-check").tool("health-monitor").inputs(
service=service_name,
namespace=environment,
timeout="5m"
).depends_on(deploy)
# Notify team
notify = Step("notify-success").tool("slack").inputs(
channel="#deployments",
message=f"✅ {service_name} v{version} deployed to {environment}"
).depends_on(health_check)
return Workflow([validate, deploy, health_check, notify])
Core DSL Components
Steps
Steps are the building blocks of workflows:Copy
Ask AI
from kubiya import Step
# Basic step
basic_step = Step("check-status").tool("kubernetes").command([
"kubectl", "get", "pods", "-n", "${NAMESPACE}"
])
# Step with inputs
step_with_inputs = Step("scale-deployment").tool("kubernetes").inputs(
deployment="${SERVICE_NAME}",
replicas="${REPLICA_COUNT}",
namespace="${ENVIRONMENT}"
)
# Step with conditions
conditional_step = Step("rollback").tool("kubernetes").command([
"kubectl", "rollout", "undo", "deployment/${SERVICE_NAME}"
]).condition("${health_check.status} == 'failed'")
# Step with retry logic
retry_step = Step("flaky-operation").tool("external-api").retry(
max_attempts=3,
backoff="exponential",
backoff_factor=2.0
)
Parallel Execution
Run multiple steps concurrently:Copy
Ask AI
from kubiya import Parallel, Step
# Parallel health checks
health_checks = Parallel([
Step("check-frontend").tool("http-client").url("https://app.com/health"),
Step("check-api").tool("http-client").url("https://api.com/health"),
Step("check-database").tool("postgres-client").query("SELECT 1"),
Step("check-cache").tool("redis-client").command("ping")
])
# Wait for all to complete before proceeding
deploy_step = Step("deploy").depends_on(health_checks)
Conditional Logic
Create branching workflows:Copy
Ask AI
from kubiya import Branch, Condition
# Branch based on environment
environment_branch = Branch([
# Production branch - requires approval
Condition("${ENVIRONMENT} == 'production'").then([
Step("request-approval").tool("approval-gate"),
Step("deploy-production").tool("kubernetes-deployer")
]),
# Staging branch - auto deploy
Condition("${ENVIRONMENT} == 'staging'").then([
Step("deploy-staging").tool("kubernetes-deployer")
]),
# Default branch
Condition.default().then([
Step("deploy-development").tool("kubernetes-deployer")
])
])
Error Handling
Define error handling and recovery strategies:Copy
Ask AI
from kubiya import Step, ErrorHandler
deploy_step = Step("deploy").tool("kubernetes-deployer").error_handler(
ErrorHandler()
.on_failure([
Step("collect-logs").tool("log-collector"),
Step("rollback").tool("kubernetes-rollback"),
Step("notify-failure").tool("slack").inputs(
channel="#incidents",
message="🚨 Deployment failed: ${deploy.error}"
)
])
.on_timeout([
Step("cancel-deployment").tool("kubernetes-cancel"),
Step("investigate").tool("debug-collector")
])
)
Advanced Patterns
Data Transformation
Transform data between steps:Copy
Ask AI
from kubiya import Transform
# Transform step outputs
transform_data = Transform(
inputs=["${api_response.data}", "${config.settings}"],
transform=lambda api_data, config: {
"processed_data": api_data.get("results", []),
"environment": config.get("env", "development"),
"timestamp": datetime.now().isoformat()
}
)
next_step = Step("process-data").inputs(
data="${transform_data.processed_data}",
env="${transform_data.environment}"
)
Dynamic Workflows
Generate workflows based on runtime data:Copy
Ask AI
@workflow
def multi_environment_deployment(services: List[str], environments: List[str]):
"""Deploy multiple services to multiple environments."""
steps = []
for service in services:
for env in environments:
# Create deployment step for each service/environment combination
deploy_step = Step(f"deploy-{service}-{env}").tool("kubernetes-deployer").inputs(
service=service,
environment=env,
image=f"{service}:${BUILD_VERSION}"
)
# Add health check
health_step = Step(f"health-{service}-{env}").tool("health-checker").inputs(
service=service,
environment=env
).depends_on(deploy_step)
steps.extend([deploy_step, health_step])
return Workflow(steps)
Workflow Composition
Compose larger workflows from smaller ones:Copy
Ask AI
@workflow
def database_migration():
"""Database migration workflow."""
return Workflow([
Step("backup-database").tool("postgres-backup"),
Step("run-migrations").tool("migrate"),
Step("verify-schema").tool("schema-validator")
])
@workflow
def application_deployment():
"""Application deployment workflow."""
return Workflow([
Step("build-image").tool("docker-build"),
Step("push-image").tool("docker-push"),
Step("deploy-app").tool("kubernetes-deploy")
])
@workflow
def full_deployment_pipeline(include_migration: bool = False):
"""Complete deployment pipeline."""
steps = []
# Add migration if requested
if include_migration:
migration = database_migration()
steps.extend(migration.steps)
# Add application deployment
app_deployment = application_deployment()
# If we have migration steps, make deployment depend on them
if include_migration:
app_deployment.steps[0].depends_on(steps[-1]) # Last migration step
steps.extend(app_deployment.steps)
return Workflow(steps)
Custom Tools Integration
Define and use custom tools:Copy
Ask AI
from kubiya.tools import Tool, Arg
# Define custom tool
custom_analyzer = Tool(
name="log-analyzer",
description="Analyze application logs for errors",
image="python:3.11-slim",
requirements=["pandas", "regex"],
args=[
Arg("log_file", description="Path to log file", required=True),
Arg("pattern", description="Error pattern to search", default="ERROR|FATAL"),
Arg("time_range", description="Time range to analyze", default="1h")
],
content="""
import pandas as pd
import re
import sys
from datetime import datetime, timedelta
log_file = sys.argv[1]
pattern = sys.argv[2]
time_range = sys.argv[3]
# Parse time range (simplified)
hours = int(time_range.replace('h', ''))
cutoff = datetime.now() - timedelta(hours=hours)
errors = []
with open(log_file, 'r') as f:
for line in f:
if re.search(pattern, line, re.IGNORECASE):
errors.append(line.strip())
print(f"Found {len(errors)} errors in the last {time_range}")
for error in errors[-10:]: # Show last 10
print(f" {error}")
"""
)
# Use custom tool in workflow
@workflow
def investigate_issues(service: str):
"""Investigate application issues."""
# Collect logs
collect_logs = Step("collect-logs").tool("log-collector").inputs(
service=service,
duration="1h",
output="/tmp/service.log"
)
# Analyze logs with custom tool
analyze = Step("analyze-logs").tool(custom_analyzer).inputs(
log_file="/tmp/service.log",
pattern="ERROR|EXCEPTION|FATAL",
time_range="1h"
).depends_on(collect_logs)
return Workflow([collect_logs, analyze])
Testing Workflows
Unit Testing
Copy
Ask AI
import unittest
from kubiya.testing import WorkflowTest
class TestDeploymentWorkflow(WorkflowTest):
def test_successful_deployment(self):
"""Test successful deployment scenario."""
workflow = deploy_application(
service_name="frontend",
version="v1.2.0",
environment="staging"
)
# Mock tool responses
self.mock_tool("deployment-validator", success=True)
self.mock_tool("kubernetes-deployer", success=True)
self.mock_tool("health-monitor", success=True)
self.mock_tool("slack", success=True)
# Execute workflow
result = self.execute_workflow(workflow)
# Assertions
self.assertTrue(result.success)
self.assertEqual(len(result.completed_steps), 4)
self.assertIn("validate-deployment", [s.name for s in result.completed_steps])
def test_deployment_failure_rollback(self):
"""Test rollback on deployment failure."""
workflow = deploy_application("frontend", "v1.2.0", "staging")
# Mock deployment failure
self.mock_tool("deployment-validator", success=True)
self.mock_tool("kubernetes-deployer", success=False, error="Pod failed to start")
result = self.execute_workflow(workflow)
self.assertFalse(result.success)
self.assertIn("rollback", [s.name for s in result.executed_steps])
Integration Testing
Copy
Ask AI
import pytest
from kubiya import Kubiya
@pytest.fixture
def client():
return Kubiya(
api_key=os.environ["KUBIYA_TEST_API_KEY"],
base_url="https://test-api.kubiya.ai"
)
def test_workflow_end_to_end(client):
"""Test complete workflow execution."""
# Create test workflow
workflow = deploy_application("test-service", "v1.0.0", "test")
# Register with Kubiya
registered = client.workflows.register(workflow)
# Execute workflow
execution = client.workflows.execute(
workflow_id=registered.id,
inputs={},
wait=True,
timeout=300 # 5 minutes
)
# Verify results
assert execution.status == "completed"
assert execution.error is None
# Cleanup
client.workflows.delete(registered.id)
Best Practices
Workflow Design
- Keep Steps Atomic: Each step should do one specific thing
- Use Clear Names: Step and workflow names should be descriptive
- Handle Errors: Always include error handling and rollback strategies
- Add Documentation: Use docstrings and comments liberally
- Test Thoroughly: Write both unit and integration tests
Performance Optimization
- Parallel Execution: Use parallel steps when operations are independent
- Resource Management: Set appropriate resource limits for tools
- Caching: Cache expensive operations when possible
- Timeouts: Set reasonable timeouts for all operations
Security Considerations
- Secrets Management: Never hardcode sensitive values
- Input Validation: Validate all workflow inputs
- Least Privilege: Use minimal required permissions
- Audit Logging: Ensure all operations are logged
Real-World Example
Here’s a complete production deployment workflow:Copy
Ask AI
from kubiya import workflow, Step, Parallel, Branch, Condition
from datetime import datetime
@workflow
def production_deployment_pipeline(
service_name: str,
version: str,
rollout_strategy: str = "canary",
approval_required: bool = True
):
"""
Production deployment pipeline with safety checks.
Args:
service_name: Name of the service to deploy
version: Version tag to deploy
rollout_strategy: Deployment strategy (canary, blue-green, rolling)
approval_required: Whether manual approval is required
"""
# Pre-deployment checks
pre_checks = Parallel([
Step("security-scan").tool("security-scanner").inputs(
image=f"{service_name}:{version}"
),
Step("performance-test").tool("load-tester").inputs(
service=service_name,
version=version,
environment="staging"
),
Step("integration-test").tool("test-runner").inputs(
suite="integration",
target=f"{service_name}:{version}"
)
])
# Approval gate (conditional)
approval_branch = Branch([
Condition("${approval_required} == true").then([
Step("request-approval").tool("approval-system").inputs(
approvers=["@platform-team", "@security-team"],
timeout="2h",
details={
"service": service_name,
"version": version,
"strategy": rollout_strategy,
"security_scan": "${security-scan.status}",
"performance_test": "${performance-test.status}"
}
)
]),
Condition.default().then([]) # No approval needed
]).depends_on(pre_checks)
# Deployment strategy selection
deployment_branch = Branch([
Condition("${rollout_strategy} == 'canary'").then([
Step("canary-deploy").tool("canary-deployer").inputs(
service=service_name,
version=version,
traffic_split=[10, 50, 100],
success_threshold=99.5
)
]),
Condition("${rollout_strategy} == 'blue-green'").then([
Step("blue-green-deploy").tool("blue-green-deployer").inputs(
service=service_name,
version=version,
switch_threshold=99.9
)
]),
Condition.default().then([
Step("rolling-deploy").tool("rolling-deployer").inputs(
service=service_name,
version=version,
max_unavailable=1
)
])
]).depends_on(approval_branch)
# Post-deployment verification
post_checks = Parallel([
Step("health-check").tool("health-monitor").inputs(
service=service_name,
environment="production",
duration="5m"
),
Step("metrics-check").tool("metrics-validator").inputs(
service=service_name,
baseline_version="previous",
current_version=version,
metrics=["latency", "error_rate", "throughput"]
),
Step("smoke-test").tool("smoke-tester").inputs(
service=service_name,
environment="production"
)
]).depends_on(deployment_branch)
# Notification
notify = Step("notify-completion").tool("slack").inputs(
channel="#deployments",
message=f"🚀 {service_name} v{version} deployed to production via {rollout_strategy}"
).depends_on(post_checks)
# Error handling - rollback on failure
rollback = Step("emergency-rollback").tool("rollback-deployer").inputs(
service=service_name,
environment="production",
reason="Deployment verification failed"
).error_handler(
on_failure_of=[post_checks]
)
return Workflow([
pre_checks,
approval_branch,
deployment_branch,
post_checks,
notify,
rollback
])
Next Steps
- Client SDK - Learn the Kubiya Python client for API access
- Tools SDK - Create custom tools for your workflows
- Examples - Browse real-world workflow examples
- DSL Reference - Complete DSL syntax reference