Workflow DSL Reference

The Kubiya Workflow DSL provides a fluent, chainable API for building workflows programmatically.

Workflow Builder

Creating a Workflow

from kubiya_workflow_sdk.dsl import workflow

# Basic workflow
wf = workflow("my-workflow")

# With description
wf = workflow("my-workflow").description("Process daily data")

# Chain workflow (default)
wf = workflow("my-workflow").type("chain")

# Graph workflow (explicit dependencies)
wf = workflow("my-workflow").type("graph")

Workflow Methods

.description(desc: str)

Set workflow description.
wf.description("Daily ETL pipeline")

.runner(name: str)

Specify the runner to execute on.
wf.runner("my-k8s-runner")  # Custom runner
wf.runner("kubiya-hosted")  # Default

.env(**variables)

Set environment variables.
wf.env(
    LOG_LEVEL="debug",
    API_URL="https://api.example.com"
)

.params(**parameters)

Define parameters with defaults.
wf.params(
    ENVIRONMENT="${ENVIRONMENT:-staging}",
    VERSION="${VERSION}",
    REPLICAS="3"
)

.schedule(cron: str)

Set cron schedule.
wf.schedule("0 2 * * *")  # Daily at 2 AM

.timeout(seconds: int)

Set workflow timeout.
wf.timeout(3600)  # 1 hour

Steps

Basic Steps

.step(name: str, command: str)

Add a simple command step.
wf.step("build", "docker build -t myapp:latest .")
wf.step("test", "pytest tests/")

.step(name: str).tool(tool_name_or_instance, args=None, timeout=None, **kwargs)

Use a pre-registered tool or Tool class instance with enhanced type safety and automatic configuration. Parameters:
  • tool_name_or_instance: Either a string name of a pre-registered tool, or a Tool class instance from kubiya_workflow_sdk.tools.models
  • args: Optional dictionary of arguments to pass to the tool
  • timeout: Optional execution timeout in seconds
  • **kwargs: Additional configuration options
Using Tool Class Instances (Recommended):
from kubiya_workflow_sdk.tools.models import Tool, Arg

# Define a tool instance
json_processor = Tool(
    name="json_processor",
    type="docker",
    image="python:3.11-slim",
    content="""#!/bin/bash
echo "Processing JSON data..."
python -m json.tool < "$json_data"
""",
    args=[
        Arg(name="json_data", type="str", description="JSON data to process", required=True),
        Arg(name="operation", type="str", description="Operation type", default="validate")
    ]
)

# Use the tool instance in workflow
Workflow("processor-workflow")
    .step("processor-step", callback=lambda s:
            s.description("JSON processor step")
            .tool(json_processor)
            .args(json_data='{"key": "value"}', operation="validate")
            .output("processing_summary")
    )
Benefits of Tool Instances:
  • Type Safety: Full IDE support with autocomplete and type checking
  • Automatic Configuration: Tool definition, arguments, environment, and dependencies are automatically extracted
  • Validation: Built-in parameter validation and type checking
  • Reusability: Tool instances can be shared across multiple workflows
Automatic Configuration Extraction: When using Tool instances, the step automatically extracts:
  • Tool name, type, and image
  • Script content and entrypoint
  • Argument definitions with types and validation
  • Environment variables and secrets
  • File mounts and volumes
  • Service dependencies

Parallel Steps

.parallel_steps(name: str, items: list, command: str, max_concurrent: int = None)

Execute steps in parallel.
wf.parallel_steps(
    "deploy-regions",
    items=["us-east-1", "eu-west-1", "ap-south-1"],
    command="deploy.sh ${ITEM}",
    max_concurrent=2
)

Sub-workflows

.sub_workflow(name: str, workflow: str, params: dict = None)

Execute another workflow as a step.
wf.sub_workflow(
    "run-tests",
    workflow="test-suite",
    params={"env": "staging"}
)

Executors

Use specific executors for steps:

Shell Executor

from kubiya_workflow_sdk.dsl import shell_executor

wf.step("backup").executor(
    shell_executor("pg_dump -h localhost -U postgres mydb > backup.sql")
)

Python Executor

from kubiya_workflow_sdk.dsl import python_executor

wf.step("process").executor(
    python_executor("""
import pandas as pd
df = pd.read_csv('data.csv')
print(f"Processed {len(df)} rows")
    """, 
    packages=["pandas", "numpy"])
)

Docker Executor

from kubiya_workflow_sdk.dsl import docker_executor

wf.step("scan").executor(
    docker_executor(
        image="aquasec/trivy:latest",
        command="image --severity HIGH myapp:latest"
    )
)

Inline Agent Executor

from kubiya_workflow_sdk.dsl import inline_agent_executor

wf.step("analyze").executor(
    inline_agent_executor(
        message="Analyze the test results and decide if we should deploy",
        runners=["kubiya-hosted"],
        ai_instructions="You are a deployment decision maker"
    )
)

Advanced Features

Lifecycle Handlers

wf.handlers(
    success="echo 'Workflow completed successfully'",
    failure="./scripts/rollback.sh",
    exit="./scripts/cleanup.sh"
)

Email Notifications

wf.notifications(
    mail_on_failure=True,
    mail_on_success=False,
    error_mail={
        "to": ["ops@example.com"],
        "subject": "Workflow Failed: ${WORKFLOW_NAME}"
    }
)

Queue Management

wf.queue("critical-jobs", max_active_runs=1)

Resource Management

wf.max_active_steps(5)  # Limit concurrent steps
wf.max_output_size(10485760)  # 10MB max output

Metadata

wf.tags("production", "etl", "daily")
wf.group("data-pipelines")

Complete Example

from kubiya_workflow_sdk.dsl import workflow, python_executor, shell_executor

# Build a complete data pipeline
pipeline = (
    workflow("data-pipeline")
    .description("Daily data processing pipeline")
    .runner("production-runner")
    .schedule("0 2 * * *")
    .env(
        AWS_REGION="us-east-1",
        LOG_LEVEL="info"
    )
    .params(
        DATE="${DATE:-$(date +%Y-%m-%d)}",
        BATCH_SIZE="1000"
    )
    
    # Extract data
    .step("extract", "aws s3 cp s3://data-lake/raw/${DATE}/ /tmp/data/ --recursive")
    
    # Process with Python
    .step("transform")
    .executor(python_executor("""
import pandas as pd
import glob
import os

date = os.getenv('DATE')
batch_size = int(os.getenv('BATCH_SIZE'))

# Process all files
for file in glob.glob('/tmp/data/*.csv'):
    df = pd.read_csv(file)
    # Transform logic here
    df.to_parquet(file.replace('.csv', '.parquet'))
    print(f"Processed {file}: {len(df)} rows")
    """, packages=["pandas", "pyarrow"]))
    
    # Load to warehouse
    .step("load", "aws s3 sync /tmp/data/ s3://data-warehouse/processed/${DATE}/")
    
    # Cleanup
    .step("cleanup", "rm -rf /tmp/data/")
    
    # Handlers
    .handlers(
        success="./notify.sh success",
        failure="./notify.sh failure && ./rollback.sh"
    )
    
    # Resource limits
    .timeout(7200)  # 2 hours
    .max_active_steps(3)
)

# Export as YAML
print(pipeline.to_yaml())

# Execute
from kubiya_workflow_sdk import execute_workflow
result = execute_workflow(pipeline.to_dict(), api_key="YOUR_KEY")

Complete Example with Tool Instances

Here’s a comprehensive example showing how to use Tool instances in workflows:
from kubiya_workflow_sdk.dsl import workflow
from kubiya_workflow_sdk.tools.models import Tool, Arg

# Define reusable tool instances
url_validator = Tool(
    name="url_validator",
    type="docker", 
    image="alpine:latest",
    content="""#!/bin/sh
echo "Validating URL: $url"
if echo "$url" | grep -q '^https\\?://'; then
    echo "✓ Valid URL format"
    if echo "$url" | grep -q '^https://'; then
        echo "✓ Secure (HTTPS)"
    else
        echo "⚠ Not secure (HTTP)"
    fi
else
    echo "✗ Invalid URL format"
    exit 1
fi
""",
    args=[
        Arg(name="url", type="str", description="URL to validate", required=True)
    ]
)

json_processor = Tool(
    name="json_processor", 
    type="docker",
    image="python:3.11-slim",
    content="""#!/bin/bash
echo "Processing JSON data..."
echo "Operation: $operation"

case "$operation" in
    "validate")
        echo "$json_data" | python -m json.tool > /dev/null 2>&1
        if [ $? -eq 0 ]; then
            echo "✓ Valid JSON"
        else
            echo "✗ Invalid JSON"
            exit 1
        fi
        ;;
    "pretty")
        echo "$json_data" | python -m json.tool
        ;;
    *)
        echo "Unknown operation: $operation"
        exit 1
        ;;
esac
""",
    args=[
        Arg(name="json_data", type="str", description="JSON data to process", required=True),
        Arg(name="operation", type="str", description="Operation: validate, pretty", default="validate")
    ]
)

# Create workflow using tool instances
validation_pipeline = (
    workflow("api-validation-pipeline")
    .description("Validate API endpoints and process JSON responses")
    .runner("production-runner")
    .params(
        API_URL="https://api.example.com/v1/data",
        JSON_PAYLOAD='{"service": "api", "version": "1.0"}'
    )
    
    # Step 1: Validate the API URL format
    .step("validate-url").tool(url_validator).args(
        url="${API_URL}"
    ).output("URL_VALIDATION")
    
    # Step 2: Validate JSON payload structure  
    .step("validate-json").tool(json_processor).args(
        json_data="${JSON_PAYLOAD}",
        operation="validate"
    ).output("JSON_VALIDATION")
    .depends("validate-url")
    
    # Step 3: Pretty print JSON for logging
    .step("format-json").tool(
        json_processor,
        timeout=60,
        args={"json_data": "${JSON_PAYLOAD}", "operation": "pretty"}
    ).output("FORMATTED_JSON")
    .depends("validate-json")
    
    # Step 4: Generate summary report
    .step("generate-report").shell("""
echo "=== API Validation Report ==="
echo "URL: ${API_URL}"
echo "URL Validation: ${URL_VALIDATION}"
echo "JSON Validation: ${JSON_VALIDATION}" 
echo "Formatted JSON:"
echo "${FORMATTED_JSON}"
echo "Report generated at: $(date)"
echo "All validations completed successfully ✓"
    """).depends("format-json")
    
    # Configure error handling
    .handlers(
        success="echo 'All validations passed!'",
        failure="echo 'Validation failed - check logs for details'"
    )
    .timeout(300)
)

# Export the workflow
print("Workflow YAML:")
print(validation_pipeline.to_yaml())

# Execute the workflow
from kubiya_workflow_sdk import execute_workflow
result = execute_workflow(validation_pipeline.to_dict(), api_key="YOUR_KEY")
Key Benefits Demonstrated:
  • Type Safety: Tool instances provide IDE autocomplete and validation
  • Reusability: Tools can be defined once and used multiple times
  • Maintainability: Tool logic is separate from workflow orchestration
  • Flexibility: Mix tool instances with shell commands and other executors
  • Error Handling: Built-in timeout and validation support

Validation

Validate workflow before execution:
validation = wf.validate()
if validation["valid"]:
    print("Workflow is valid!")
else:
    print(f"Errors: {validation['errors']}")
    print(f"Warnings: {validation['warnings']}")

Export Formats

# As dictionary
workflow_dict = wf.to_dict()

# As YAML
workflow_yaml = wf.to_yaml()

# As JSON
workflow_json = wf.to_json(indent=2)

Next Steps

Examples: See real-world examples of workflows in action Advanced patterns: Learn advanced workflow techniques Architecture: Understand the execution model API Reference: Browse the complete API documentation