Skip to main content
Steps are the fundamental building blocks of Kubiya workflows. Each step represents a single operation or task that can be executed independently or as part of a larger workflow with dependencies.

Basic Step Structure

Simple Steps

from kubiya.dsl import workflow

wf = (
    workflow("basic-steps")
        # Simple shell command
        .step("list-files", "ls -la /tmp")

        # Command with parameters
        .step("check-status", "kubectl get pods -n ${NAMESPACE}")
)

Steps with Callbacks

For advanced configuration, use the callback pattern:
wf = (
    workflow("advanced-steps")
        .step("deploy", callback=lambda s:
            s.shell("kubectl apply -f deployment.yaml")
                .description("Deploy application to Kubernetes")
                .output("DEPLOYMENT_RESULT")
        )
)

Step Types

1. Shell Commands

Execute shell commands and scripts:
# Simple command
wf.step("simple", "echo 'Hello World'")

# Multi-line script
wf.step("script", callback=lambda s:
    s.shell("""
#!/bin/bash
set -e
echo "Starting process..."
for i in {1..5}; do
    echo "Processing item $i"
done
echo "Complete!"
    """)
)

# With shell type specified
wf.step("bash-script", callback=lambda s:
    s.shell("grep 'TODO' *.py | less")
        .shell_type("bash")
)

2. Shell Scripts with Arguments

wf.step("parameterized-script", callback=lambda s:
    s.script("""
#!/bin/sh
echo "Processing data..."
grep "${pattern}" "${file}"
    """)
    .args(
        pattern="ERROR",
        file="application.log"
    )
)

3. Python Code

Execute Python code directly:
wf.step("python-task", callback=lambda s:
    s.python("""
import json
import sys

# Process data
data = {
    "status": "success",
    "count": 42,
    "items": ["item1", "item2", "item3"]
}

# Output results
print(json.dumps(data, indent=2))
    """)
)

# Python with variables
wf.step("python-with-vars", callback=lambda s:
    s.python("""
colors = ["red", "yellow", "green", "blue"]
for i, color in enumerate(colors, 1):
    print(f"{i}. Color: {color}")
    """)
)

4. Docker Containers

Run steps in Docker containers:
wf.step("containerized-python", callback=lambda s:
    s.docker(
        image="python:3.11-slim",
        content="""
#!/usr/bin/env python3
import sys
print(f"Python version: {sys.version}")
print("Running in Docker!")
        """
    )
)

# Alpine container with dependencies
wf.step("alpine-task", callback=lambda s:
    s.docker(
        image="alpine:latest",
        content="""
#!/bin/sh
set -e
if ! command -v curl >/dev/null 2>&1; then
    apk add --no-cache curl
fi
curl -fsS https://api.example.com/health
        """
    )
)

5. Kubiya API Calls

Interact with Kubiya platform APIs:
wf.step("get-runners", callback=lambda s:
    s.kubiya(
        url="api/v3/runners",
        method="GET"
    ).output("RUNNERS_DATA")
)

wf.step("create-resource", callback=lambda s:
    s.kubiya(
        url="api/v3/resources",
        method="POST",
        body={"name": "resource-name", "type": "config"}
    )
)

6. Custom Tool Definitions

Define custom tools inline:
wf.step("custom-tool", callback=lambda s:
    s.tool_def(
        name="service_health_check",
        description="Check service health endpoint",
        type="docker",
        image="alpine:latest",
        content="""
#!/bin/sh
if ! command -v curl >/dev/null 2>&1; then
    apk add --no-cache curl
fi
echo "Checking health of $SERVER_URL..."
if curl -fsS "$SERVER_URL" >/dev/null; then
    echo "✅ Service is healthy"
    exit 0
else
    echo "❌ Service health check failed"
    exit 1
fi
        """,
        args={
            "SERVER_URL": "https://api.example.com/health"
        }
    )
)

Step Dependencies

Sequential Dependencies

Control the execution order:
wf = (
    workflow("sequential-pipeline")
        # Step 1: Build
        .step("build", "docker build -t myapp:latest .")

        # Step 2: Test (depends on build)
        .step("test", callback=lambda s:
            s.shell("docker run myapp:latest pytest tests/")
                .depends("build")
        )

        # Step 3: Push (depends on test)
        .step("push", callback=lambda s:
            s.shell("docker push myapp:latest")
                .depends("test")
        )

        # Step 4: Deploy (depends on push)
        .step("deploy", callback=lambda s:
            s.shell("kubectl apply -f deployment.yaml")
                .depends("push")
        )
)

Multiple Dependencies

A step can depend on multiple previous steps:
wf = (
    workflow("multi-dependency")
        # Independent steps
        .step("unit-tests", "pytest tests/unit/")
        .step("integration-tests", "pytest tests/integration/")
        .step("lint", "flake8 .")

        # This step waits for all three to complete
        .step("build", callback=lambda s:
            s.shell("docker build -t app .")
                .depends("unit-tests")
                .depends("integration-tests")
                .depends("lint")
        )
)

Step Outputs and Variables

Capturing Outputs

Capture step output for use in later steps:
wf = (
    workflow("data-flow")
        .params(INPUT_FILE="data.csv")

        # Produce data
        .step("count-lines", callback=lambda s:
            s.shell("wc -l ${INPUT_FILE}")
                .output("LINE_COUNT")
        )

        # Consume data
        .step("report", callback=lambda s:
            s.shell("echo 'Processed {{LINE_COUNT}} lines'")
                .depends("count-lines")
        )
)

Complex Data Flow

wf = (
    workflow("advanced-data-flow")
        # Step 1: Fetch data and output multiple variables
        .step("fetch", callback=lambda s:
            s.python("""
import json
data = {
    "count": 100,
    "status": "success",
    "timestamp": "2024-01-01T00:00:00Z"
}
print(json.dumps(data))
            """)
            .output("API_RESPONSE")
        )

        # Step 2: Process using the output
        .step("process", callback=lambda s:
            s.python("""
import json
response = json.loads('{{API_RESPONSE}}')
print(f"Processing {response['count']} items")
print(f"Status: {response['status']}")
            """)
            .depends("fetch")
        )
)

Step Configuration

Step Descriptions

Add descriptions for documentation:
wf.step("deploy-production", callback=lambda s:
    s.shell("kubectl apply -f deployment.yaml")
        .description("Deploy application to production Kubernetes cluster")
)

Output Variables

Name output variables for use in subsequent steps:
wf.step("get-version", callback=lambda s:
    s.shell("cat version.txt")
        .output("APP_VERSION")
)

wf.step("tag-image", callback=lambda s:
    s.shell("docker tag app:latest app:{{APP_VERSION}}")
        .depends("get-version")
)

Variable Interpolation

Using Workflow Parameters

wf = (
    workflow("parameterized")
        .params(
            SERVICE_NAME="my-service",
            ENVIRONMENT="staging",
            REPLICAS="3"
        )

        .step("deploy", "kubectl apply -f deployment.yaml")

        .step("scale", callback=lambda s:
            s.shell("kubectl scale deployment ${SERVICE_NAME} --replicas=${REPLICAS} -n ${ENVIRONMENT}")
        )
)

Using Step Outputs

wf = (
    workflow("output-usage")
        # Produce output
        .step("get-config", callback=lambda s:
            s.shell("cat config.json")
                .output("CONFIG")
        )

        # Use output with {{variable}} syntax
        .step("apply-config", callback=lambda s:
            s.shell("echo '{{CONFIG}}' | kubectl apply -f -")
                .depends("get-config")
        )
)

Complete Examples

Example 1: Multi-Step Data Processing

from kubiya import KubiyaClient, workflow

client = KubiyaClient()

wf = (
    workflow("data-processing-pipeline")
        .description("Process and transform data files")
        .params(
            INPUT_DIR="/data/raw",
            OUTPUT_DIR="/data/processed"
        )

        # Step 1: Create output directory
        .step("setup", "mkdir -p ${OUTPUT_DIR}")

        # Step 2: List input files
        .step("list-files", callback=lambda s:
            s.shell("ls -1 ${INPUT_DIR}/*.csv")
                .output("FILE_LIST")
                .depends("setup")
        )

        # Step 3: Process files with Python
        .step("process", callback=lambda s:
            s.python("""
import os
import pandas as pd

input_dir = os.getenv('INPUT_DIR')
output_dir = os.getenv('OUTPUT_DIR')

# Process each CSV file
for filename in os.listdir(input_dir):
    if filename.endswith('.csv'):
        df = pd.read_csv(os.path.join(input_dir, filename))

        # Clean data
        df_clean = df.dropna()
        df_clean = df_clean.drop_duplicates()

        # Save processed file
        output_path = os.path.join(output_dir, f"processed_{filename}")
        df_clean.to_csv(output_path, index=False)

        print(f"Processed {filename}: {len(df)} -> {len(df_clean)} rows")
            """)
            .depends("list-files")
        )

        # Step 4: Generate summary
        .step("summarize", callback=lambda s:
            s.shell("ls -lh ${OUTPUT_DIR}")
                .depends("process")
        )
)

# Execute
for event in client.execute_workflow(wf.to_dict(), stream=True):
    print(event)

Example 2: Complex Dependencies

wf = (
    workflow("complex-deployment")
        .description("Multi-service deployment with health checks")
        .params(VERSION="v1.0.0", ENVIRONMENT="staging")

        # Build all services (can run in parallel)
        .step("build-api", "docker build -t api:${VERSION} ./api")
        .step("build-frontend", "docker build -t frontend:${VERSION} ./frontend")
        .step("build-worker", "docker build -t worker:${VERSION} ./worker")

        # Run tests (depend on builds)
        .step("test-api", callback=lambda s:
            s.shell("docker run api:${VERSION} pytest tests/")
                .depends("build-api")
        )
        .step("test-frontend", callback=lambda s:
            s.shell("docker run frontend:${VERSION} npm test")
                .depends("build-frontend")
        )

        # Push images (depend on tests)
        .step("push-api", callback=lambda s:
            s.shell("docker push api:${VERSION}")
                .depends("test-api")
        )
        .step("push-frontend", callback=lambda s:
            s.shell("docker push frontend:${VERSION}")
                .depends("test-frontend")
        )
        .step("push-worker", callback=lambda s:
            s.shell("docker push worker:${VERSION}")
                .depends("build-worker")
        )

        # Deploy (depends on all pushes)
        .step("deploy-all", callback=lambda s:
            s.shell("kubectl apply -f k8s/ -n ${ENVIRONMENT}")
                .depends("push-api")
                .depends("push-frontend")
                .depends("push-worker")
        )

        # Health check (depends on deployment)
        .step("health-check", callback=lambda s:
            s.docker(
                image="alpine:latest",
                content="""
#!/bin/sh
apk add --no-cache curl
sleep 30  # Wait for pods to be ready
curl -f http://api.${ENVIRONMENT}.svc.cluster.local/health || exit 1
curl -f http://frontend.${ENVIRONMENT}.svc.cluster.local/health || exit 1
echo "✅ All services healthy"
                """
            )
            .depends("deploy-all")
        )
)

Best Practices

Each step should have a single, clear purpose:
# Good - separate concerns
.step("build", "docker build -t app .")
.step("test", "pytest tests/")
.step("push", "docker push app:latest")

# Avoid - doing too much
.step("build-and-test-and-push", "docker build && pytest && docker push")
Only add dependencies when truly needed:
# Good - clear dependencies
.step("build", "docker build -t app .")
.step("test", callback=lambda s:
    s.shell("pytest tests/")
        .depends("build")  # Needs the build artifact
)

# Avoid - unnecessary dependencies (steps can run in parallel)
.step("lint", "flake8 .")
.step("format", callback=lambda s:
    s.shell("black .")
        .depends("lint")  # Unnecessary - these can run in parallel
)
Use descriptive names for output variables:
# Good - clear purpose
.step("get-version", callback=lambda s:
    s.shell("cat version.txt")
        .output("APP_VERSION")
)

# Avoid - vague names
.step("get-version", callback=lambda s:
    s.shell("cat version.txt")
        .output("OUTPUT")
)
Document complex steps:
.step("migrate-database", callback=lambda s:
    s.shell("./migrate.sh")
        .description("Apply database migrations to production schema")
)

Next Steps

Examples

Browse real-world workflow patterns