> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kubiya.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Steps & Dependencies

> Master workflow steps, dependencies, and execution control

Steps are the fundamental building blocks of Kubiya workflows. Each step represents a single operation or task that can be executed independently or as part of a larger workflow with dependencies, retries, and data flow.

This page focuses on how to define different kinds of steps, control their execution, and wire them together.

## Basic Step Structure

### Simple steps

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("basic-steps")
        # Simple shell command
        .step("list-files", "ls -la /tmp")

        # Command with parameters
        .step("check-status", "kubectl get pods -n ${NAMESPACE}")
)
```

This pattern is ideal for quick, sequential flows where you do not need advanced configuration.

### Steps with callbacks

For richer configuration, use the callback pattern. The callback receives a step builder that lets you choose an executor, set environment variables, timeouts, outputs, and more.

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("advanced-steps")
        .step("deploy", callback=lambda s:
            s.shell("kubectl apply -f deployment.yaml")
                .description("Deploy application to Kubernetes")
                .env(ENVIRONMENT="production")
                .timeout(600)
                .retries(2)
                .output("DEPLOYMENT_RESULT")
        )
)
```

### Using executor helpers

In addition to configuring steps via callbacks, you can use executor helper functions to build reusable, well-typed steps and then attach them to workflows.

```python theme={null}
from kubiya.dsl import chain, docker_run_executor

run_tests_step = docker_run_executor(
    "run-tests",
    image="python:3.11",
    command=["pytest", "tests", "-q"],
    env={"ENVIRONMENT": "test"},
    memory="1Gi",
    cpu_limit="1",
)

wf = (
    chain("tests-with-helper")
        .add_step(run_tests_step)
)
```

Executor helpers construct `Step` objects you can reuse across workflows, while `.add_step(...)` attaches them to a specific workflow.

## Step Types

### 1. Shell commands

Execute shell commands and scripts:

```python theme={null}
# Simple command
wf.step("simple", "echo 'Hello World'")

# Multi-line script
wf.step("script", callback=lambda s:
    s.shell("""
#!/bin/bash
set -e
echo "Starting process..."
for i in {1..5}; do
    echo "Processing item $i"
done
echo "Complete!"
    """)
)

# With shell type specified
wf.step("bash-script", callback=lambda s:
    s.shell("grep 'TODO' *.py | less")
        .shell_type("bash")
)
```

### 2. Shell scripts with environment variables

```python theme={null}
wf.step("parameterized-script", callback=lambda s:
    s.shell(
        """
#!/bin/sh
echo "Processing data..."
grep "${PATTERN}" "${FILE}"
        """
    )
    .env(
        PATTERN="ERROR",
        FILE="application.log",
    )
)
```

### 3. Python code

Execute Python code directly:

```python theme={null}
wf.step("python-task", callback=lambda s:
    s.python("""
import json
import sys

# Process data
data = {
    "status": "success",
    "count": 42,
    "items": ["item1", "item2", "item3"]
}

# Output results
print(json.dumps(data, indent=2))
    """)
)

# Python with variables
wf.step("python-with-vars", callback=lambda s:
    s.python("""
colors = ["red", "yellow", "green", "blue"]
for i, color in enumerate(colors, 1):
    print(f"{i}. Color: {color}")
    """)
)
```

### 4. Docker containers

Run steps in Docker containers when you want clean, isolated environments for builds, tests, or tooling.

```python theme={null}
wf.step("containerized-python", callback=lambda s:
    s.docker(
        image="python:3.11-slim",
        content="""
#!/usr/bin/env python3
import sys
print(f"Python version: {sys.version}")
print("Running in Docker!")
        """
    )
)

# Alpine container with dependencies
wf.step("alpine-task", callback=lambda s:
    s.docker(
        image="alpine:latest",
        content="""
#!/bin/sh
set -e
if ! command -v curl >/dev/null 2>&1; then
    apk add --no-cache curl
fi
curl -fsS https://api.example.com/health
        """
    )
)
```

### 5. Kubiya API calls

Interact with Kubiya platform APIs as part of a workflow:

```python theme={null}
wf.step("get-runners", callback=lambda s:
    s.kubiya(
        url="api/v3/runners",
        method="GET"
    ).output("RUNNERS_DATA")
)

wf.step("create-resource", callback=lambda s:
    s.kubiya(
        url="api/v3/resources",
        method="POST",
        body={"name": "resource-name", "type": "config"}
    )
)
```

### 6. Tools and bounded services

Define custom tools inline and, when needed, attach temporary services like databases or caches for more realistic environments.

```python theme={null}
wf.step("custom-tool", callback=lambda s:
    s.tool_def(
        name="service_health_check",
        description="Check service health endpoint",
        type="docker",
        image="alpine:latest",
        content="""
#!/bin/sh
if ! command -v curl >/dev/null 2>&1; then
    apk add --no-cache curl
fi
echo "Checking health of $SERVER_URL..."
if curl -fsS "$SERVER_URL" >/dev/null; then
    echo "✅ Service is healthy"
    exit 0
else
    echo "❌ Service health check failed"
    exit 1
fi
        """,
        args={
            "SERVER_URL": "https://api.example.com/health"
        }
    )
)

# Attach a temporary Postgres database while the tool runs
wf.step("db-migrations", callback=lambda s:
    s.tool_def(
        name="db_migrator",
        description="Run database migrations against a temporary database",
        type="docker",
        image="alpine:latest",
        content="""
#!/bin/sh
alembic upgrade head
        """,
        args={},
    )
    .with_database(db_type="postgres")
)
```

## HTTP, SSH, and agent steps

Beyond shell, Python, Docker, and tools, the DSL includes executors for HTTP calls, SSH commands, and AI agents. These are especially useful when you want workflows to orchestrate external systems or intelligent automation.

### HTTP and SSH

```python theme={null}
# Call an external API as part of a workflow
wf.step("notify-webhook", callback=lambda s:
    s.http(
        url="https://hooks.example.com/deployments",
        method="POST",
        headers={"Content-Type": "application/json"},
        body={"service": "payments-api", "status": "deployed"},
    )
)

# Run a command over SSH on a remote host
wf.step("restart-service", callback=lambda s:
    s.ssh(
        host="bastion.example.com",
        user="deploy",
        command="sudo systemctl restart payments-api",
    )
)
```

Use these executors when you need to integrate with legacy systems, on-prem services, or existing SSH-based automation.

### Inline agents and LLM completion

```python theme={null}
# Inline agent to summarize an incident
wf.step("summarize-incident", callback=lambda s:
    s.inline_agent(
        message="Summarize the incident based on {{LOG_OUTPUT}}",
        agent_name="sre-incident-analyzer",
        ai_instructions="You are an on-call SRE. Provide a concise summary and next actions.",
        runners=["sre-agents"],
        llm_model="gpt-4o",
    )
    .depends("collect-logs")
    .output("INCIDENT_SUMMARY")
)

# Direct LLM completion for a small transformation
wf.step("summarize-text", callback=lambda s:
    s.llm_completion(
        model="gpt-4o",
        prompt="Summarize the following content in one paragraph: {{RAW_TEXT}}",
        json_mode=False,
    )
    .output("SUMMARY")
)
```

Agent and LLM steps are useful when you want AI-powered behavior to be an explicit part of your automation instead of a separate manual process.

## Step dependencies and parallelism

### Sequential dependencies

Control the execution order in a workflow:

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("sequential-pipeline")
        # Step 1: Build
        .step("build", "docker build -t myapp:latest .")

        # Step 2: Test (depends on build)
        .step("test", callback=lambda s:
            s.shell("docker run myapp:latest pytest tests/")
                .depends("build")
        )

        # Step 3: Push (depends on test)
        .step("push", callback=lambda s:
            s.shell("docker push myapp:latest")
                .depends("test")
        )

        # Step 4: Deploy (depends on push)
        .step("deploy", callback=lambda s:
            s.shell("kubectl apply -f deployment.yaml")
                .depends("push")
        )
)
```

### Multiple dependencies

A step can depend on multiple previous steps:

```python theme={null}
from kubiya.dsl import graph

wf = (
    graph("multi-dependency")
        # Independent steps
        .step("unit-tests", "pytest tests/unit/")
        .step("integration-tests", "pytest tests/integration/")
        .step("lint", "flake8 .")

        # This step waits for all three to complete
        .step("build", callback=lambda s:
            s.shell("docker build -t app .")
                .depends("unit-tests")
                .depends("integration-tests")
                .depends("lint")
        )
)
```

In a **chain** workflow, steps typically run sequentially in the order they are declared, and explicit dependencies are mostly for clarity. In a **graph** workflow, dependencies like the ones above define the execution order and fan-in/fan-out behavior.

### Parallel steps

You can also run a single step across many items in parallel.

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("parallel-processing")
        .parallel_steps(
            "process-file",
            items=["file1.csv", "file2.csv", "file3.csv"],
            command="python process_file.py ${ITEM}",
            max_concurrent=2,
        )
)
```

For more fine-grained control, you can call `.parallel(...)` on an individual step to specify a list of items or a reference to a variable containing them.

## Step outputs and variables

Steps can expose parts of their result as named outputs. Later steps can then
reference those outputs instead of re-running the same work or scraping logs.
This is how you pass values through the workflow graph in a controlled way.

### Capturing outputs

Use `.output(NAME)` on a step to capture its primary output under a
descriptive name. Any downstream step can then interpolate that value using
the `{{NAME}}` syntax.

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("data-flow")
        .params(INPUT_FILE="data.csv")

        # Produce data
        .step("count-lines", callback=lambda s:
            s.shell("wc -l ${INPUT_FILE}")
                .output("LINE_COUNT")
        )

        # Consume data
        .step("report", callback=lambda s:
            s.shell("echo 'Processed {{LINE_COUNT}} lines'")
                .depends("count-lines")
        )
)
```

### Complex data flow

For richer scenarios, you can emit structured data (for example JSON) from one
step and parse it in another. This keeps complex logic in regular Python while
still using the DSL to orchestrate when and how each piece runs.

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("advanced-data-flow")
        # Step 1: Fetch data and output multiple variables
        .step("fetch", callback=lambda s:
            s.python("""
import json
data = {
    "count": 100,
    "status": "success",
    "timestamp": "2024-01-01T00:00:00Z"
}
print(json.dumps(data))
            """)
            .output("API_RESPONSE")
        )

        # Step 2: Process using the output
        .step("process", callback=lambda s:
            s.python("""
import json
response = json.loads('{{API_RESPONSE}}')
print(f"Processing {response['count']} items")
print(f"Status: {response['status']}")
            """)
            .depends("fetch")
        )
)
```

## Step configuration

### Step descriptions

Add descriptions for documentation and observability:

```python theme={null}
wf.step("deploy-production", callback=lambda s:
    s.shell("kubectl apply -f deployment.yaml")
        .description("Deploy application to production Kubernetes cluster")
)
```

### Output variables

Name output variables for use in subsequent steps:

```python theme={null}
wf.step("get-version", callback=lambda s:
    s.shell("cat version.txt")
        .output("APP_VERSION")
)

wf.step("tag-image", callback=lambda s:
    s.shell("docker tag app:latest app:{{APP_VERSION}}")
        .depends("get-version")
)
```

## Variable interpolation

Kubiya workflows support two main kinds of interpolation:

* `${PARAM}` pulls in workflow parameters or environment variables that are
  defined at the workflow level.
* `{{OUTPUT}}` pulls in values produced by earlier steps via `.output(...)`.

Interpolation happens on the workflow side before the command or script is
executed, so what the container sees is a fully rendered string.

### Using workflow parameters

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("parameterized")
        .params(
            SERVICE_NAME="my-service",
            ENVIRONMENT="staging",
            REPLICAS="3"
        )

        .step("deploy", "kubectl apply -f deployment.yaml")

        .step("scale", callback=lambda s:
            s.shell("kubectl scale deployment ${SERVICE_NAME} --replicas=${REPLICAS} -n ${ENVIRONMENT}")
        )
)
```

### Using step outputs

Here the first step saves the raw configuration as `CONFIG`, and the second
step injects that value directly into a shell pipeline using the `{{CONFIG}}`
placeholder.

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("output-usage")
        # Produce output
        .step("get-config", callback=lambda s:
            s.shell("cat config.json")
                .output("CONFIG")
        )

        # Use output with {{variable}} syntax
        .step("apply-config", callback=lambda s:
            s.shell("echo '{{CONFIG}}' | kubectl apply -f -")
                .depends("get-config")
        )
)
```

## Control flow and reliability

Steps support a rich set of controls for retries, timeouts, and “continue even on failure” behavior.

### Retry policies

```python theme={null}
from kubiya.dsl import chain

wf = (
    chain("retry-example")
        .step("deploy", callback=lambda s:
            s.shell("./deploy.sh")
                .retry(
                    limit=3,
                    interval_sec=30,
                    max_interval_sec=300,
                    backoff=2.0,
                    exit_codes=[1, 2],
                )
        )
)
```

Use retries for transient errors such as flaky networks or temporarily unavailable services.

### Repeat / polling

```python theme={null}
wf = (
    chain("wait-for-service")
        .step("poll-health", callback=lambda s:
            s.shell("curl -fsS https://api.example.com/health || exit 1")
                .repeat(interval_sec=30, limit=10)
        )
)
```

`repeat` is useful when you want to poll an external system until it reaches a desired state.

### Continue-on and timeouts

```python theme={null}
wf = (
    chain("deploy-with-rollback")
        .step("deploy", callback=lambda s:
            s.shell("./deploy.sh")
                .timeout(900)
        )

        # Always attempt rollback on failure, but keep workflow marked as failed
        .step("rollback", callback=lambda s:
            s.shell("./rollback.sh")
                .depends("deploy")
                .continue_on(failure=True)
        )
)
```

You can also use `.signal_on_stop(...)`, `.mail_on_error(...)`, and `.retries(...)` for smaller adjustments to how a step behaves at runtime.

## Complete examples

### Example 1: Multi-step data processing

This example models a simple batch ETL pipeline. It shows how to use workflow
parameters, a mix of shell and Python steps, and explicit dependencies to
coordinate a multi-stage data flow.

* The workflow takes `INPUT_DIR` and `OUTPUT_DIR` parameters so you can reuse
  the same definition across environments or datasets.
* `setup` prepares the target directory up front so later steps can assume it
  exists.
* `list-files` discovers input CSV files and exposes the list through an
  output (`FILE_LIST`), which is a typical pattern when you want to inspect or
  log what is about to be processed.
* `process` uses a Python step with `pandas` to perform the actual data
  cleaning and transformation. Because it depends on `list-files`, it runs only
  after discovery has completed.
* `summarize` is a lightweight shell step that gives operators a quick view of
  the generated artifacts, making the pipeline easier to debug.

Use this pattern whenever you have a small number of discrete ETL phases and
you want orchestration (dependencies, retries, scheduling) separated from the
data-processing code itself.

```python theme={null}
from kubiya.dsl import chain

wf = (
        chain("data-processing-pipeline")
                .description("Process and transform data files")
                .params(
                        INPUT_DIR="/data/raw",
                        OUTPUT_DIR="/data/processed"
                )

                # Step 1: Create output directory
                .step("setup", "mkdir -p ${OUTPUT_DIR}")

                # Step 2: List input files
                .step("list-files", callback=lambda s:
                        s.shell("ls -1 ${INPUT_DIR}/*.csv")
                                .output("FILE_LIST")
                                .depends("setup")
                )

                # Step 3: Process files with Python
                .step("process", callback=lambda s:
                        s.python("""
import os
import pandas as pd

input_dir = os.getenv('INPUT_DIR')
output_dir = os.getenv('OUTPUT_DIR')

# Process each CSV file
for filename in os.listdir(input_dir):
        if filename.endswith('.csv'):
                df = pd.read_csv(os.path.join(input_dir, filename))

                # Clean data
                df_clean = df.dropna()
                df_clean = df_clean.drop_duplicates()

                # Save processed file
                output_path = os.path.join(output_dir, f"processed_{filename}")
                df_clean.to_csv(output_path, index=False)

                print(f"Processed {filename}: {len(df)} -> {len(df_clean)} rows")
                        """)
                        .depends("list-files")
                )

                # Step 4: Generate summary
                .step("summarize", callback=lambda s:
                        s.shell("ls -lh ${OUTPUT_DIR}")
                                .depends("process")
                )
)
```

### Example 2: Complex dependencies

This example demonstrates why graph workflows are useful for non-trivial CI/CD
pipelines. Multiple services are built in parallel, tests fan out, artifacts
are pushed only after successful validation, and a final deployment plus
health check gates the end of the workflow.

* `build-*` steps can run concurrently because they have no dependencies.
* `test-*` depend on their respective builds, so a broken build stops that
  service’s tests from running unnecessarily.
* `push-*` steps ensure only tested images are published; the worker image is
  pushed directly after build when there are no tests.
* `deploy-all` waits on all pushes to complete, which is where you would also
  typically add approvals, notifications, or stricter retry/timeout policies.
* `health-check` runs from a dedicated container to validate the deployed
  endpoints and fail fast if something is wrong, giving a clear last point in
  the graph to attach alerts or rollbacks.

Use this structure when you have many related services and want to maximize
parallelism without sacrificing correctness or observability.

```python theme={null}
from kubiya.dsl import graph

wf = (
    graph("complex-deployment")
        .description("Multi-service deployment with health checks")
        .params(VERSION="v1.0.0", ENVIRONMENT="staging")

        # Build all services (can run in parallel)
        .step("build-api", "docker build -t api:${VERSION} ./api")
        .step("build-frontend", "docker build -t frontend:${VERSION} ./frontend")
        .step("build-worker", "docker build -t worker:${VERSION} ./worker")

        # Run tests (depend on builds)
        .step("test-api", callback=lambda s:
            s.shell("docker run api:${VERSION} pytest tests/")
                .depends("build-api")
        )
        .step("test-frontend", callback=lambda s:
            s.shell("docker run frontend:${VERSION} npm test")
                .depends("build-frontend")
        )

        # Push images (depend on tests)
        .step("push-api", callback=lambda s:
            s.shell("docker push api:${VERSION}")
                .depends("test-api")
        )
        .step("push-frontend", callback=lambda s:
            s.shell("docker push frontend:${VERSION}")
                .depends("test-frontend")
        )
        .step("push-worker", callback=lambda s:
            s.shell("docker push worker:${VERSION}")
                .depends("build-worker")
        )

        # Deploy (depends on all pushes)
        .step("deploy-all", callback=lambda s:
            s.shell("kubectl apply -f k8s/ -n ${ENVIRONMENT}")
                .depends("push-api")
                .depends("push-frontend")
                .depends("push-worker")
        )

        # Health check (depends on deployment)
        .step("health-check", callback=lambda s:
            s.docker(
                image="alpine:latest",
                content="""
#!/bin/sh
apk add --no-cache curl
sleep 30  # Wait for pods to be ready
curl -f http://api.${ENVIRONMENT}.svc.cluster.local/health || exit 1
curl -f http://frontend.${ENVIRONMENT}.svc.cluster.local/health || exit 1
echo "✅ All services healthy"
                """
            )
            .depends("deploy-all")
        )
)
```

## Best Practices

<AccordionGroup>
  <Accordion title="Keep Steps Atomic" icon="atom">
    Each step should have a single, clear purpose:

    ```python theme={null}
    # Good - separate concerns
    .step("build", "docker build -t app .")
    .step("test", "pytest tests/")
    .step("push", "docker push app:latest")

    # Avoid - doing too much
    .step("build-and-test-and-push", "docker build && pytest && docker push")
    ```
  </Accordion>

  <Accordion title="Use Dependencies Wisely" icon="diagram-project">
    Only add dependencies when truly needed:

    ```python theme={null}
    # Good - clear dependencies
    .step("build", "docker build -t app .")
    .step("test", callback=lambda s:
        s.shell("pytest tests/")
            .depends("build")  # Needs the build artifact
    )

    # Avoid - unnecessary dependencies (steps can run in parallel)
    .step("lint", "flake8 .")
    .step("format", callback=lambda s:
        s.shell("black .")
            .depends("lint")  # Unnecessary - these can run in parallel
    )
    ```
  </Accordion>

  <Accordion title="Name Outputs Clearly" icon="tag">
    Use descriptive names for output variables:

    ```python theme={null}
    # Good - clear purpose
    .step("get-version", callback=lambda s:
        s.shell("cat version.txt")
            .output("APP_VERSION")
    )

    # Avoid - vague names
    .step("get-version", callback=lambda s:
        s.shell("cat version.txt")
            .output("OUTPUT")
    )
    ```
  </Accordion>

  <Accordion title="Add Descriptions" icon="message">
    Document complex steps:

    ```python theme={null}
    .step("migrate-database", callback=lambda s:
        s.shell("./migrate.sh")
            .description("Apply database migrations to production schema")
    )
    ```
  </Accordion>
</AccordionGroup>

## Next Steps

<Card title="Examples" icon="lightbulb" href="/sdk/examples">
  Browse real-world workflow patterns
</Card>
