Skip to main content
Steps are the fundamental building blocks of Kubiya workflows. Each step represents a single operation or task that can be executed independently or as part of a larger workflow with dependencies, retries, and data flow. This page focuses on how to define different kinds of steps, control their execution, and wire them together.

Basic Step Structure

Simple steps

from kubiya.dsl import chain

wf = (
    chain("basic-steps")
        # Simple shell command
        .step("list-files", "ls -la /tmp")

        # Command with parameters
        .step("check-status", "kubectl get pods -n ${NAMESPACE}")
)
This pattern is ideal for quick, sequential flows where you do not need advanced configuration.

Steps with callbacks

For richer configuration, use the callback pattern. The callback receives a step builder that lets you choose an executor, set environment variables, timeouts, outputs, and more.
from kubiya.dsl import chain

wf = (
    chain("advanced-steps")
        .step("deploy", callback=lambda s:
            s.shell("kubectl apply -f deployment.yaml")
                .description("Deploy application to Kubernetes")
                .env(ENVIRONMENT="production")
                .timeout(600)
                .retries(2)
                .output("DEPLOYMENT_RESULT")
        )
)

Using executor helpers

In addition to configuring steps via callbacks, you can use executor helper functions to build reusable, well-typed steps and then attach them to workflows.
from kubiya.dsl import chain, docker_run_executor

run_tests_step = docker_run_executor(
    "run-tests",
    image="python:3.11",
    command=["pytest", "tests", "-q"],
    env={"ENVIRONMENT": "test"},
    memory="1Gi",
    cpu_limit="1",
)

wf = (
    chain("tests-with-helper")
        .add_step(run_tests_step)
)
Executor helpers construct Step objects you can reuse across workflows, while .add_step(...) attaches them to a specific workflow.

Step Types

1. Shell commands

Execute shell commands and scripts:
# Simple command
wf.step("simple", "echo 'Hello World'")

# Multi-line script
wf.step("script", callback=lambda s:
    s.shell("""
#!/bin/bash
set -e
echo "Starting process..."
for i in {1..5}; do
    echo "Processing item $i"
done
echo "Complete!"
    """)
)

# With shell type specified
wf.step("bash-script", callback=lambda s:
    s.shell("grep 'TODO' *.py | less")
        .shell_type("bash")
)

2. Shell scripts with environment variables

wf.step("parameterized-script", callback=lambda s:
    s.shell(
        """
#!/bin/sh
echo "Processing data..."
grep "${PATTERN}" "${FILE}"
        """
    )
    .env(
        PATTERN="ERROR",
        FILE="application.log",
    )
)

3. Python code

Execute Python code directly:
wf.step("python-task", callback=lambda s:
    s.python("""
import json
import sys

# Process data
data = {
    "status": "success",
    "count": 42,
    "items": ["item1", "item2", "item3"]
}

# Output results
print(json.dumps(data, indent=2))
    """)
)

# Python with variables
wf.step("python-with-vars", callback=lambda s:
    s.python("""
colors = ["red", "yellow", "green", "blue"]
for i, color in enumerate(colors, 1):
    print(f"{i}. Color: {color}")
    """)
)

4. Docker containers

Run steps in Docker containers when you want clean, isolated environments for builds, tests, or tooling.
wf.step("containerized-python", callback=lambda s:
    s.docker(
        image="python:3.11-slim",
        content="""
#!/usr/bin/env python3
import sys
print(f"Python version: {sys.version}")
print("Running in Docker!")
        """
    )
)

# Alpine container with dependencies
wf.step("alpine-task", callback=lambda s:
    s.docker(
        image="alpine:latest",
        content="""
#!/bin/sh
set -e
if ! command -v curl >/dev/null 2>&1; then
    apk add --no-cache curl
fi
curl -fsS https://api.example.com/health
        """
    )
)

5. Kubiya API calls

Interact with Kubiya platform APIs as part of a workflow:
wf.step("get-runners", callback=lambda s:
    s.kubiya(
        url="api/v3/runners",
        method="GET"
    ).output("RUNNERS_DATA")
)

wf.step("create-resource", callback=lambda s:
    s.kubiya(
        url="api/v3/resources",
        method="POST",
        body={"name": "resource-name", "type": "config"}
    )
)

6. Tools and bounded services

Define custom tools inline and, when needed, attach temporary services like databases or caches for more realistic environments.
wf.step("custom-tool", callback=lambda s:
    s.tool_def(
        name="service_health_check",
        description="Check service health endpoint",
        type="docker",
        image="alpine:latest",
        content="""
#!/bin/sh
if ! command -v curl >/dev/null 2>&1; then
    apk add --no-cache curl
fi
echo "Checking health of $SERVER_URL..."
if curl -fsS "$SERVER_URL" >/dev/null; then
    echo "✅ Service is healthy"
    exit 0
else
    echo "❌ Service health check failed"
    exit 1
fi
        """,
        args={
            "SERVER_URL": "https://api.example.com/health"
        }
    )
)

# Attach a temporary Postgres database while the tool runs
wf.step("db-migrations", callback=lambda s:
    s.tool_def(
        name="db_migrator",
        description="Run database migrations against a temporary database",
        type="docker",
        image="alpine:latest",
        content="""
#!/bin/sh
alembic upgrade head
        """,
        args={},
    )
    .with_database(db_type="postgres")
)

HTTP, SSH, and agent steps

Beyond shell, Python, Docker, and tools, the DSL includes executors for HTTP calls, SSH commands, and AI agents. These are especially useful when you want workflows to orchestrate external systems or intelligent automation.

HTTP and SSH

# Call an external API as part of a workflow
wf.step("notify-webhook", callback=lambda s:
    s.http(
        url="https://hooks.example.com/deployments",
        method="POST",
        headers={"Content-Type": "application/json"},
        body={"service": "payments-api", "status": "deployed"},
    )
)

# Run a command over SSH on a remote host
wf.step("restart-service", callback=lambda s:
    s.ssh(
        host="bastion.example.com",
        user="deploy",
        command="sudo systemctl restart payments-api",
    )
)
Use these executors when you need to integrate with legacy systems, on-prem services, or existing SSH-based automation.

Inline agents and LLM completion

# Inline agent to summarize an incident
wf.step("summarize-incident", callback=lambda s:
    s.inline_agent(
        message="Summarize the incident based on {{LOG_OUTPUT}}",
        agent_name="sre-incident-analyzer",
        ai_instructions="You are an on-call SRE. Provide a concise summary and next actions.",
        runners=["sre-agents"],
        llm_model="gpt-4o",
    )
    .depends("collect-logs")
    .output("INCIDENT_SUMMARY")
)

# Direct LLM completion for a small transformation
wf.step("summarize-text", callback=lambda s:
    s.llm_completion(
        model="gpt-4o",
        prompt="Summarize the following content in one paragraph: {{RAW_TEXT}}",
        json_mode=False,
    )
    .output("SUMMARY")
)
Agent and LLM steps are useful when you want AI-powered behavior to be an explicit part of your automation instead of a separate manual process.

Step dependencies and parallelism

Sequential dependencies

Control the execution order in a workflow:
from kubiya.dsl import chain

wf = (
    chain("sequential-pipeline")
        # Step 1: Build
        .step("build", "docker build -t myapp:latest .")

        # Step 2: Test (depends on build)
        .step("test", callback=lambda s:
            s.shell("docker run myapp:latest pytest tests/")
                .depends("build")
        )

        # Step 3: Push (depends on test)
        .step("push", callback=lambda s:
            s.shell("docker push myapp:latest")
                .depends("test")
        )

        # Step 4: Deploy (depends on push)
        .step("deploy", callback=lambda s:
            s.shell("kubectl apply -f deployment.yaml")
                .depends("push")
        )
)

Multiple dependencies

A step can depend on multiple previous steps:
from kubiya.dsl import graph

wf = (
    graph("multi-dependency")
        # Independent steps
        .step("unit-tests", "pytest tests/unit/")
        .step("integration-tests", "pytest tests/integration/")
        .step("lint", "flake8 .")

        # This step waits for all three to complete
        .step("build", callback=lambda s:
            s.shell("docker build -t app .")
                .depends("unit-tests")
                .depends("integration-tests")
                .depends("lint")
        )
)
In a chain workflow, steps typically run sequentially in the order they are declared, and explicit dependencies are mostly for clarity. In a graph workflow, dependencies like the ones above define the execution order and fan-in/fan-out behavior.

Parallel steps

You can also run a single step across many items in parallel.
from kubiya.dsl import chain

wf = (
    chain("parallel-processing")
        .parallel_steps(
            "process-file",
            items=["file1.csv", "file2.csv", "file3.csv"],
            command="python process_file.py ${ITEM}",
            max_concurrent=2,
        )
)
For more fine-grained control, you can call .parallel(...) on an individual step to specify a list of items or a reference to a variable containing them.

Step outputs and variables

Steps can expose parts of their result as named outputs. Later steps can then reference those outputs instead of re-running the same work or scraping logs. This is how you pass values through the workflow graph in a controlled way.

Capturing outputs

Use .output(NAME) on a step to capture its primary output under a descriptive name. Any downstream step can then interpolate that value using the {{NAME}} syntax.
from kubiya.dsl import chain

wf = (
    chain("data-flow")
        .params(INPUT_FILE="data.csv")

        # Produce data
        .step("count-lines", callback=lambda s:
            s.shell("wc -l ${INPUT_FILE}")
                .output("LINE_COUNT")
        )

        # Consume data
        .step("report", callback=lambda s:
            s.shell("echo 'Processed {{LINE_COUNT}} lines'")
                .depends("count-lines")
        )
)

Complex data flow

For richer scenarios, you can emit structured data (for example JSON) from one step and parse it in another. This keeps complex logic in regular Python while still using the DSL to orchestrate when and how each piece runs.
from kubiya.dsl import chain

wf = (
    chain("advanced-data-flow")
        # Step 1: Fetch data and output multiple variables
        .step("fetch", callback=lambda s:
            s.python("""
import json
data = {
    "count": 100,
    "status": "success",
    "timestamp": "2024-01-01T00:00:00Z"
}
print(json.dumps(data))
            """)
            .output("API_RESPONSE")
        )

        # Step 2: Process using the output
        .step("process", callback=lambda s:
            s.python("""
import json
response = json.loads('{{API_RESPONSE}}')
print(f"Processing {response['count']} items")
print(f"Status: {response['status']}")
            """)
            .depends("fetch")
        )
)

Step configuration

Step descriptions

Add descriptions for documentation and observability:
wf.step("deploy-production", callback=lambda s:
    s.shell("kubectl apply -f deployment.yaml")
        .description("Deploy application to production Kubernetes cluster")
)

Output variables

Name output variables for use in subsequent steps:
wf.step("get-version", callback=lambda s:
    s.shell("cat version.txt")
        .output("APP_VERSION")
)

wf.step("tag-image", callback=lambda s:
    s.shell("docker tag app:latest app:{{APP_VERSION}}")
        .depends("get-version")
)

Variable interpolation

Kubiya workflows support two main kinds of interpolation:
  • ${PARAM} pulls in workflow parameters or environment variables that are defined at the workflow level.
  • {{OUTPUT}} pulls in values produced by earlier steps via .output(...).
Interpolation happens on the workflow side before the command or script is executed, so what the container sees is a fully rendered string.

Using workflow parameters

from kubiya.dsl import chain

wf = (
    chain("parameterized")
        .params(
            SERVICE_NAME="my-service",
            ENVIRONMENT="staging",
            REPLICAS="3"
        )

        .step("deploy", "kubectl apply -f deployment.yaml")

        .step("scale", callback=lambda s:
            s.shell("kubectl scale deployment ${SERVICE_NAME} --replicas=${REPLICAS} -n ${ENVIRONMENT}")
        )
)

Using step outputs

Here the first step saves the raw configuration as CONFIG, and the second step injects that value directly into a shell pipeline using the {{CONFIG}} placeholder.
from kubiya.dsl import chain

wf = (
    chain("output-usage")
        # Produce output
        .step("get-config", callback=lambda s:
            s.shell("cat config.json")
                .output("CONFIG")
        )

        # Use output with {{variable}} syntax
        .step("apply-config", callback=lambda s:
            s.shell("echo '{{CONFIG}}' | kubectl apply -f -")
                .depends("get-config")
        )
)

Control flow and reliability

Steps support a rich set of controls for retries, timeouts, and “continue even on failure” behavior.

Retry policies

from kubiya.dsl import chain

wf = (
    chain("retry-example")
        .step("deploy", callback=lambda s:
            s.shell("./deploy.sh")
                .retry(
                    limit=3,
                    interval_sec=30,
                    max_interval_sec=300,
                    backoff=2.0,
                    exit_codes=[1, 2],
                )
        )
)
Use retries for transient errors such as flaky networks or temporarily unavailable services.

Repeat / polling

wf = (
    chain("wait-for-service")
        .step("poll-health", callback=lambda s:
            s.shell("curl -fsS https://api.example.com/health || exit 1")
                .repeat(interval_sec=30, limit=10)
        )
)
repeat is useful when you want to poll an external system until it reaches a desired state.

Continue-on and timeouts

wf = (
    chain("deploy-with-rollback")
        .step("deploy", callback=lambda s:
            s.shell("./deploy.sh")
                .timeout(900)
        )

        # Always attempt rollback on failure, but keep workflow marked as failed
        .step("rollback", callback=lambda s:
            s.shell("./rollback.sh")
                .depends("deploy")
                .continue_on(failure=True)
        )
)
You can also use .signal_on_stop(...), .mail_on_error(...), and .retries(...) for smaller adjustments to how a step behaves at runtime.

Complete examples

Example 1: Multi-step data processing

This example models a simple batch ETL pipeline. It shows how to use workflow parameters, a mix of shell and Python steps, and explicit dependencies to coordinate a multi-stage data flow.
  • The workflow takes INPUT_DIR and OUTPUT_DIR parameters so you can reuse the same definition across environments or datasets.
  • setup prepares the target directory up front so later steps can assume it exists.
  • list-files discovers input CSV files and exposes the list through an output (FILE_LIST), which is a typical pattern when you want to inspect or log what is about to be processed.
  • process uses a Python step with pandas to perform the actual data cleaning and transformation. Because it depends on list-files, it runs only after discovery has completed.
  • summarize is a lightweight shell step that gives operators a quick view of the generated artifacts, making the pipeline easier to debug.
Use this pattern whenever you have a small number of discrete ETL phases and you want orchestration (dependencies, retries, scheduling) separated from the data-processing code itself.
from kubiya.dsl import chain

wf = (
        chain("data-processing-pipeline")
                .description("Process and transform data files")
                .params(
                        INPUT_DIR="/data/raw",
                        OUTPUT_DIR="/data/processed"
                )

                # Step 1: Create output directory
                .step("setup", "mkdir -p ${OUTPUT_DIR}")

                # Step 2: List input files
                .step("list-files", callback=lambda s:
                        s.shell("ls -1 ${INPUT_DIR}/*.csv")
                                .output("FILE_LIST")
                                .depends("setup")
                )

                # Step 3: Process files with Python
                .step("process", callback=lambda s:
                        s.python("""
import os
import pandas as pd

input_dir = os.getenv('INPUT_DIR')
output_dir = os.getenv('OUTPUT_DIR')

# Process each CSV file
for filename in os.listdir(input_dir):
        if filename.endswith('.csv'):
                df = pd.read_csv(os.path.join(input_dir, filename))

                # Clean data
                df_clean = df.dropna()
                df_clean = df_clean.drop_duplicates()

                # Save processed file
                output_path = os.path.join(output_dir, f"processed_{filename}")
                df_clean.to_csv(output_path, index=False)

                print(f"Processed {filename}: {len(df)} -> {len(df_clean)} rows")
                        """)
                        .depends("list-files")
                )

                # Step 4: Generate summary
                .step("summarize", callback=lambda s:
                        s.shell("ls -lh ${OUTPUT_DIR}")
                                .depends("process")
                )
)

Example 2: Complex dependencies

This example demonstrates why graph workflows are useful for non-trivial CI/CD pipelines. Multiple services are built in parallel, tests fan out, artifacts are pushed only after successful validation, and a final deployment plus health check gates the end of the workflow.
  • build-* steps can run concurrently because they have no dependencies.
  • test-* depend on their respective builds, so a broken build stops that service’s tests from running unnecessarily.
  • push-* steps ensure only tested images are published; the worker image is pushed directly after build when there are no tests.
  • deploy-all waits on all pushes to complete, which is where you would also typically add approvals, notifications, or stricter retry/timeout policies.
  • health-check runs from a dedicated container to validate the deployed endpoints and fail fast if something is wrong, giving a clear last point in the graph to attach alerts or rollbacks.
Use this structure when you have many related services and want to maximize parallelism without sacrificing correctness or observability.
from kubiya.dsl import graph

wf = (
    graph("complex-deployment")
        .description("Multi-service deployment with health checks")
        .params(VERSION="v1.0.0", ENVIRONMENT="staging")

        # Build all services (can run in parallel)
        .step("build-api", "docker build -t api:${VERSION} ./api")
        .step("build-frontend", "docker build -t frontend:${VERSION} ./frontend")
        .step("build-worker", "docker build -t worker:${VERSION} ./worker")

        # Run tests (depend on builds)
        .step("test-api", callback=lambda s:
            s.shell("docker run api:${VERSION} pytest tests/")
                .depends("build-api")
        )
        .step("test-frontend", callback=lambda s:
            s.shell("docker run frontend:${VERSION} npm test")
                .depends("build-frontend")
        )

        # Push images (depend on tests)
        .step("push-api", callback=lambda s:
            s.shell("docker push api:${VERSION}")
                .depends("test-api")
        )
        .step("push-frontend", callback=lambda s:
            s.shell("docker push frontend:${VERSION}")
                .depends("test-frontend")
        )
        .step("push-worker", callback=lambda s:
            s.shell("docker push worker:${VERSION}")
                .depends("build-worker")
        )

        # Deploy (depends on all pushes)
        .step("deploy-all", callback=lambda s:
            s.shell("kubectl apply -f k8s/ -n ${ENVIRONMENT}")
                .depends("push-api")
                .depends("push-frontend")
                .depends("push-worker")
        )

        # Health check (depends on deployment)
        .step("health-check", callback=lambda s:
            s.docker(
                image="alpine:latest",
                content="""
#!/bin/sh
apk add --no-cache curl
sleep 30  # Wait for pods to be ready
curl -f http://api.${ENVIRONMENT}.svc.cluster.local/health || exit 1
curl -f http://frontend.${ENVIRONMENT}.svc.cluster.local/health || exit 1
echo "✅ All services healthy"
                """
            )
            .depends("deploy-all")
        )
)

Best Practices

Each step should have a single, clear purpose:
# Good - separate concerns
.step("build", "docker build -t app .")
.step("test", "pytest tests/")
.step("push", "docker push app:latest")

# Avoid - doing too much
.step("build-and-test-and-push", "docker build && pytest && docker push")
Only add dependencies when truly needed:
# Good - clear dependencies
.step("build", "docker build -t app .")
.step("test", callback=lambda s:
    s.shell("pytest tests/")
        .depends("build")  # Needs the build artifact
)

# Avoid - unnecessary dependencies (steps can run in parallel)
.step("lint", "flake8 .")
.step("format", callback=lambda s:
    s.shell("black .")
        .depends("lint")  # Unnecessary - these can run in parallel
)
Use descriptive names for output variables:
# Good - clear purpose
.step("get-version", callback=lambda s:
    s.shell("cat version.txt")
        .output("APP_VERSION")
)

# Avoid - vague names
.step("get-version", callback=lambda s:
    s.shell("cat version.txt")
        .output("OUTPUT")
)
Document complex steps:
.step("migrate-database", callback=lambda s:
    s.shell("./migrate.sh")
        .description("Apply database migrations to production schema")
)

Next Steps

Examples

Browse real-world workflow patterns