The Kubiya Workflow DSL (Domain Specific Language) allows you to define complex automation workflows using Python. This approach gives you full programmatic control while maintaining the benefits of Kubiya’s execution engine.

Why Use Workflow DSL?

Version Control: Store workflows as code in your repository with full version history Type Safety: Python type hints and IDE support catch errors before runtime Reusability: Create reusable components and share workflows across teams Testing: Unit test your workflows before deploying to production

Basic Workflow Structure

from kubiya_sdk import Workflow, Step, Parallel
from kubiya_sdk.tools import KubectlTool, SlackTool

# Define a workflow
@workflow
def deploy_application(service_name: str, version: str, environment: str):
    """Deploy application with health checks and notifications."""
    
    # Pre-deployment validation
    validate = Step("validate-deployment").tool("deployment-validator").inputs(
        service=service_name,
        version=version,
        environment=environment
    )
    
    # Deploy the application
    deploy = Step("deploy-service").tool("kubernetes-deployer").inputs(
        service=service_name,
        image=f"{service_name}:{version}",
        namespace=environment
    ).depends_on(validate)
    
    # Health checks
    health_check = Step("health-check").tool("health-monitor").inputs(
        service=service_name,
        namespace=environment,
        timeout="5m"
    ).depends_on(deploy)
    
    # Notify team
    notify = Step("notify-success").tool("slack").inputs(
        channel="#deployments",
        message=f"✅ {service_name} v{version} deployed to {environment}"
    ).depends_on(health_check)
    
    return Workflow([validate, deploy, health_check, notify])

Core DSL Components

Steps

Steps are the building blocks of workflows:
from kubiya_sdk import Step

# Basic step
basic_step = Step("check-status").tool("kubernetes").command([
    "kubectl", "get", "pods", "-n", "${NAMESPACE}"
])

# Step with inputs
step_with_inputs = Step("scale-deployment").tool("kubernetes").inputs(
    deployment="${SERVICE_NAME}",
    replicas="${REPLICA_COUNT}",
    namespace="${ENVIRONMENT}"
)

# Step with conditions
conditional_step = Step("rollback").tool("kubernetes").command([
    "kubectl", "rollout", "undo", "deployment/${SERVICE_NAME}"
]).condition("${health_check.status} == 'failed'")

# Step with retry logic
retry_step = Step("flaky-operation").tool("external-api").retry(
    max_attempts=3,
    backoff="exponential",
    backoff_factor=2.0
)

Parallel Execution

Run multiple steps concurrently:
from kubiya_sdk import Parallel, Step

# Parallel health checks
health_checks = Parallel([
    Step("check-frontend").tool("http-client").url("https://app.com/health"),
    Step("check-api").tool("http-client").url("https://api.com/health"), 
    Step("check-database").tool("postgres-client").query("SELECT 1"),
    Step("check-cache").tool("redis-client").command("ping")
])

# Wait for all to complete before proceeding
deploy_step = Step("deploy").depends_on(health_checks)

Conditional Logic

Create branching workflows:
from kubiya_sdk import Branch, Condition

# Branch based on environment
environment_branch = Branch([
    # Production branch - requires approval
    Condition("${ENVIRONMENT} == 'production'").then([
        Step("request-approval").tool("approval-gate"),
        Step("deploy-production").tool("kubernetes-deployer")
    ]),
    
    # Staging branch - auto deploy
    Condition("${ENVIRONMENT} == 'staging'").then([
        Step("deploy-staging").tool("kubernetes-deployer")
    ]),
    
    # Default branch
    Condition.default().then([
        Step("deploy-development").tool("kubernetes-deployer")
    ])
])

Error Handling

Define error handling and recovery strategies:
from kubiya_sdk import Step, ErrorHandler

deploy_step = Step("deploy").tool("kubernetes-deployer").error_handler(
    ErrorHandler()
    .on_failure([
        Step("collect-logs").tool("log-collector"),
        Step("rollback").tool("kubernetes-rollback"),
        Step("notify-failure").tool("slack").inputs(
            channel="#incidents",
            message="🚨 Deployment failed: ${deploy.error}"
        )
    ])
    .on_timeout([
        Step("cancel-deployment").tool("kubernetes-cancel"),
        Step("investigate").tool("debug-collector")
    ])
)

Advanced Patterns

Data Transformation

Transform data between steps:
from kubiya_sdk import Transform

# Transform step outputs
transform_data = Transform(
    inputs=["${api_response.data}", "${config.settings}"],
    transform=lambda api_data, config: {
        "processed_data": api_data.get("results", []),
        "environment": config.get("env", "development"),
        "timestamp": datetime.now().isoformat()
    }
)

next_step = Step("process-data").inputs(
    data="${transform_data.processed_data}",
    env="${transform_data.environment}"
)

Dynamic Workflows

Generate workflows based on runtime data:
@workflow
def multi_environment_deployment(services: List[str], environments: List[str]):
    """Deploy multiple services to multiple environments."""
    
    steps = []
    
    for service in services:
        for env in environments:
            # Create deployment step for each service/environment combination
            deploy_step = Step(f"deploy-{service}-{env}").tool("kubernetes-deployer").inputs(
                service=service,
                environment=env,
                image=f"{service}:${BUILD_VERSION}"
            )
            
            # Add health check
            health_step = Step(f"health-{service}-{env}").tool("health-checker").inputs(
                service=service,
                environment=env
            ).depends_on(deploy_step)
            
            steps.extend([deploy_step, health_step])
    
    return Workflow(steps)

Workflow Composition

Compose larger workflows from smaller ones:
@workflow
def database_migration():
    """Database migration workflow."""
    return Workflow([
        Step("backup-database").tool("postgres-backup"),
        Step("run-migrations").tool("migrate"),
        Step("verify-schema").tool("schema-validator")
    ])

@workflow  
def application_deployment():
    """Application deployment workflow."""
    return Workflow([
        Step("build-image").tool("docker-build"),
        Step("push-image").tool("docker-push"),
        Step("deploy-app").tool("kubernetes-deploy")
    ])

@workflow
def full_deployment_pipeline(include_migration: bool = False):
    """Complete deployment pipeline."""
    
    steps = []
    
    # Add migration if requested
    if include_migration:
        migration = database_migration()
        steps.extend(migration.steps)
    
    # Add application deployment
    app_deployment = application_deployment()
    
    # If we have migration steps, make deployment depend on them
    if include_migration:
        app_deployment.steps[0].depends_on(steps[-1])  # Last migration step
    
    steps.extend(app_deployment.steps)
    
    return Workflow(steps)

Custom Tools Integration

Define and use custom tools:
from kubiya_sdk.tools import Tool, Arg

# Define custom tool
custom_analyzer = Tool(
    name="log-analyzer",
    description="Analyze application logs for errors",
    image="python:3.11-slim",
    requirements=["pandas", "regex"],
    args=[
        Arg("log_file", description="Path to log file", required=True),
        Arg("pattern", description="Error pattern to search", default="ERROR|FATAL"),
        Arg("time_range", description="Time range to analyze", default="1h")
    ],
    content="""
    import pandas as pd
    import re
    import sys
    from datetime import datetime, timedelta
    
    log_file = sys.argv[1]
    pattern = sys.argv[2]
    time_range = sys.argv[3]
    
    # Parse time range (simplified)
    hours = int(time_range.replace('h', ''))
    cutoff = datetime.now() - timedelta(hours=hours)
    
    errors = []
    with open(log_file, 'r') as f:
        for line in f:
            if re.search(pattern, line, re.IGNORECASE):
                errors.append(line.strip())
    
    print(f"Found {len(errors)} errors in the last {time_range}")
    for error in errors[-10:]:  # Show last 10
        print(f"  {error}")
    """
)

# Use custom tool in workflow
@workflow
def investigate_issues(service: str):
    """Investigate application issues."""
    
    # Collect logs
    collect_logs = Step("collect-logs").tool("log-collector").inputs(
        service=service,
        duration="1h",
        output="/tmp/service.log"
    )
    
    # Analyze logs with custom tool
    analyze = Step("analyze-logs").tool(custom_analyzer).inputs(
        log_file="/tmp/service.log",
        pattern="ERROR|EXCEPTION|FATAL",
        time_range="1h"
    ).depends_on(collect_logs)
    
    return Workflow([collect_logs, analyze])

Testing Workflows

Unit Testing

import unittest
from kubiya_sdk.testing import WorkflowTest

class TestDeploymentWorkflow(WorkflowTest):
    def test_successful_deployment(self):
        """Test successful deployment scenario."""
        
        workflow = deploy_application(
            service_name="frontend",
            version="v1.2.0", 
            environment="staging"
        )
        
        # Mock tool responses
        self.mock_tool("deployment-validator", success=True)
        self.mock_tool("kubernetes-deployer", success=True)
        self.mock_tool("health-monitor", success=True)
        self.mock_tool("slack", success=True)
        
        # Execute workflow
        result = self.execute_workflow(workflow)
        
        # Assertions
        self.assertTrue(result.success)
        self.assertEqual(len(result.completed_steps), 4)
        self.assertIn("validate-deployment", [s.name for s in result.completed_steps])
    
    def test_deployment_failure_rollback(self):
        """Test rollback on deployment failure."""
        
        workflow = deploy_application("frontend", "v1.2.0", "staging")
        
        # Mock deployment failure
        self.mock_tool("deployment-validator", success=True)
        self.mock_tool("kubernetes-deployer", success=False, error="Pod failed to start")
        
        result = self.execute_workflow(workflow)
        
        self.assertFalse(result.success)
        self.assertIn("rollback", [s.name for s in result.executed_steps])

Integration Testing

import pytest
from kubiya_sdk import Kubiya

@pytest.fixture
def client():
    return Kubiya(
        api_key=os.environ["KUBIYA_TEST_API_KEY"],
        base_url="https://test-api.kubiya.ai"
    )

def test_workflow_end_to_end(client):
    """Test complete workflow execution."""
    
    # Create test workflow
    workflow = deploy_application("test-service", "v1.0.0", "test")
    
    # Register with Kubiya
    registered = client.workflows.register(workflow)
    
    # Execute workflow
    execution = client.workflows.execute(
        workflow_id=registered.id,
        inputs={},
        wait=True,
        timeout=300  # 5 minutes
    )
    
    # Verify results
    assert execution.status == "completed"
    assert execution.error is None
    
    # Cleanup
    client.workflows.delete(registered.id)

Best Practices

Workflow Design

  1. Keep Steps Atomic: Each step should do one specific thing
  2. Use Clear Names: Step and workflow names should be descriptive
  3. Handle Errors: Always include error handling and rollback strategies
  4. Add Documentation: Use docstrings and comments liberally
  5. Test Thoroughly: Write both unit and integration tests

Performance Optimization

  1. Parallel Execution: Use parallel steps when operations are independent
  2. Resource Management: Set appropriate resource limits for tools
  3. Caching: Cache expensive operations when possible
  4. Timeouts: Set reasonable timeouts for all operations

Security Considerations

  1. Secrets Management: Never hardcode sensitive values
  2. Input Validation: Validate all workflow inputs
  3. Least Privilege: Use minimal required permissions
  4. Audit Logging: Ensure all operations are logged

Real-World Example

Here’s a complete production deployment workflow:
from kubiya_sdk import workflow, Step, Parallel, Branch, Condition
from datetime import datetime

@workflow
def production_deployment_pipeline(
    service_name: str,
    version: str,
    rollout_strategy: str = "canary",
    approval_required: bool = True
):
    """
    Production deployment pipeline with safety checks.
    
    Args:
        service_name: Name of the service to deploy
        version: Version tag to deploy
        rollout_strategy: Deployment strategy (canary, blue-green, rolling)
        approval_required: Whether manual approval is required
    """
    
    # Pre-deployment checks
    pre_checks = Parallel([
        Step("security-scan").tool("security-scanner").inputs(
            image=f"{service_name}:{version}"
        ),
        Step("performance-test").tool("load-tester").inputs(
            service=service_name,
            version=version,
            environment="staging"
        ),
        Step("integration-test").tool("test-runner").inputs(
            suite="integration",
            target=f"{service_name}:{version}"
        )
    ])
    
    # Approval gate (conditional)
    approval_branch = Branch([
        Condition("${approval_required} == true").then([
            Step("request-approval").tool("approval-system").inputs(
                approvers=["@platform-team", "@security-team"],
                timeout="2h",
                details={
                    "service": service_name,
                    "version": version,
                    "strategy": rollout_strategy,
                    "security_scan": "${security-scan.status}",
                    "performance_test": "${performance-test.status}"
                }
            )
        ]),
        Condition.default().then([])  # No approval needed
    ]).depends_on(pre_checks)
    
    # Deployment strategy selection
    deployment_branch = Branch([
        Condition("${rollout_strategy} == 'canary'").then([
            Step("canary-deploy").tool("canary-deployer").inputs(
                service=service_name,
                version=version,
                traffic_split=[10, 50, 100],
                success_threshold=99.5
            )
        ]),
        Condition("${rollout_strategy} == 'blue-green'").then([
            Step("blue-green-deploy").tool("blue-green-deployer").inputs(
                service=service_name,
                version=version,
                switch_threshold=99.9
            )
        ]),
        Condition.default().then([
            Step("rolling-deploy").tool("rolling-deployer").inputs(
                service=service_name,
                version=version,
                max_unavailable=1
            )
        ])
    ]).depends_on(approval_branch)
    
    # Post-deployment verification
    post_checks = Parallel([
        Step("health-check").tool("health-monitor").inputs(
            service=service_name,
            environment="production",
            duration="5m"
        ),
        Step("metrics-check").tool("metrics-validator").inputs(
            service=service_name,
            baseline_version="previous",
            current_version=version,
            metrics=["latency", "error_rate", "throughput"]
        ),
        Step("smoke-test").tool("smoke-tester").inputs(
            service=service_name,
            environment="production"
        )
    ]).depends_on(deployment_branch)
    
    # Notification
    notify = Step("notify-completion").tool("slack").inputs(
        channel="#deployments",
        message=f"🚀 {service_name} v{version} deployed to production via {rollout_strategy}"
    ).depends_on(post_checks)
    
    # Error handling - rollback on failure
    rollback = Step("emergency-rollback").tool("rollback-deployer").inputs(
        service=service_name,
        environment="production",
        reason="Deployment verification failed"
    ).error_handler(
        on_failure_of=[post_checks]
    )
    
    return Workflow([
        pre_checks,
        approval_branch, 
        deployment_branch,
        post_checks,
        notify,
        rollback
    ])

Next Steps

  • Client SDK - Learn the Kubiya Python client for API access
  • Tools SDK - Create custom tools for your workflows
  • Examples - Browse real-world workflow examples
  • DSL Reference - Complete DSL syntax reference