Skip to main content
The Kubiya Workflow DSL (Domain Specific Language) lets you describe automation workflows as Python code. You get a fluent, chainable API for defining steps, data flow, scheduling, and reliability policies, while Kubiya handles execution in your runners and infrastructure. Use it for CI/CD pipelines, infrastructure automation, incident runbooks, data processing, or any other repeatable workflow you want under version control. You can build automations today with tools like GitHub Actions, but those workflows are tightly coupled to a single repository and YAML-based configuration. Kubiya workflows are designed to be Python-native, reusable across projects and repos, and deeply integrated with Kubiya agents, tools, and the Control Plane. The same workflow definition can be triggered from the CLI, API, or UI, run on different runners, and orchestrate many external systems—not just your Git provider. Use the Workflow DSL when you want version-controlled automation that lives alongside your application code, but is portable across environments, can call into Kubiya agents and tools as first-class steps, and gives you centralized observability and controls. Typical use cases include CI/CD pipelines, infrastructure automation, incident runbooks, data processing jobs, and any other repeatable workflow you want under Kubiya’s governance.

Key Features

  • Python-native DSL – Define workflows using familiar Python syntax
  • Chain and graph workflows – Model simple linear pipelines or complex dependency graphs
  • Rich executors – Run shell, Python, Docker build/run, HTTP/SSH calls, tools, and agents
  • First-class data flow – Pass parameters, env vars, secrets, and step outputs between steps
  • Operational controls – Configure scheduling, queues, timeouts, retries, and notifications
  • Testable and reviewable – Store workflows in Git and exercise them in unit/integration tests

Quick Start

Simple Workflow

from kubiya.dsl import chain

# A simple sequential workflow ("chain" type)
wf = (
    chain("hello-world")
        .description("My first Kubiya workflow")
        .step("greet", "echo 'Hello from Kubiya!'")
)

# Convert to a definition you can execute with Kubiya
workflow_def = wf.to_dict()
chain(...) creates a sequential workflow where steps run in the order you define them. For more complex topologies you can use graph(...) and explicit dependencies (see below).

Multi-step deployment workflow

from kubiya.dsl import chain

deploy_service = (
    chain("deploy-service")
        .description("Build, test, and deploy a service to Kubernetes")
        .params(
            SERVICE_NAME="payments-api",
            IMAGE_TAG="v1.0.0",
            ENVIRONMENT="staging",
        )

        .step("build-image", "docker build -t ${SERVICE_NAME}:${IMAGE_TAG} .")
        .step("unit-tests", "pytest tests/ -q")
        .step("deploy", callback=lambda s:
            s.shell(
                "kubectl set image deployment/${SERVICE_NAME} "
                "${SERVICE_NAME}=${SERVICE_NAME}:${IMAGE_TAG} -n ${ENVIRONMENT}"
            )
            .output("DEPLOYMENT_STATUS")
        )
)
This example shows a typical real-world CI/CD pipeline: build an image, run tests, and roll out a deployment, while capturing deployment status into DEPLOYMENT_STATUS for later reporting.

Core Concepts

1. Workflows

A workflow is a container for steps plus all of the operational configuration around them.
from kubiya.dsl import graph

wf = (
    graph("nightly-data-refresh")
        .description("Nightly ETL pipeline for analytics")
        .type("graph")  # explicit, for clarity
        .schedule("0 2 * * *")  # run every night at 02:00
        .env(LOG_LEVEL="info", DATA_DIR="/mnt/data")
        .params(
            DATE="`date '+%Y-%m-%d'`",
            BATCH_SIZE="1000",
        )
        .runner("kubiya-hosted-runner")
        .queue("etl-jobs", max_active_runs=3)
)
Key workflow-level capabilities include:
  • Typechain for simple linear flows, graph when you want explicit dependencies
  • Scheduling – Cron-style schedules for periodic workflows
  • Environment and parameters – Shared configuration for all steps in the workflow
  • Queues and concurrency – Control how many workflow runs and steps can be active at once
  • Runners – Choose where the workflow executes (e.g., a specific worker pool)

2. Steps and executors

Steps are the building blocks of a workflow. Each step describes what should run (shell, Python, Docker, HTTP, tool, agent, etc.) and how it should behave (env, timeouts, retries, dependencies).
# Simple shell step
wf.step("check-status", "kubectl get pods -n monitoring")

# Step with advanced configuration
wf.step("deploy", callback=lambda s:
    s.shell("kubectl apply -f deployment.yaml")
        .env(ENVIRONMENT="production")
        .timeout(600)
        .retries(2)
        .output("DEPLOYMENT_RESULT")
)

3. Dependencies and workflow shape

In chain workflows, steps run sequentially in the order they are declared. In graph workflows, you can create complex dependency graphs using depends(...).
from kubiya.dsl import graph

pipeline = (
    graph("build-and-test")
        .step("build", "docker build -t app .")
        .step("lint", "ruff check .")
        .step("unit-tests", "pytest tests/unit")
        .step("integration-tests", callback=lambda s:
            s.shell("pytest tests/integration")
                .depends("build", "unit-tests")
        )
        .step("publish", callback=lambda s:
            s.shell("docker push app:latest")
                .depends("integration-tests", "lint")
        )
)
This lets you model fan-out/fan-in patterns, optional checks, and more sophisticated pipelines than a simple linear chain.

4. Parameters, outputs, and data flow

Parameters and outputs make it easy to pass data between steps without hard-coding values.
data_workflow = (
    chain("daily-report")
        .params(INPUT_FILE="data.csv")

        # Step that produces output
        .step("count-rows", callback=lambda s:
            s.shell("wc -l ${INPUT_FILE} | awk '{print $1}'")
                .output("ROW_COUNT")
        )

        # Step that uses the output
        .step("summarize", callback=lambda s:
            s.shell("echo 'Processed {{ROW_COUNT}} rows for ${INPUT_FILE}'")
                .depends("count-rows")
        )
)
You can use ${PARAM_NAME} for parameters and {{OUTPUT_NAME}} for values produced by previous steps.

Step Types

The DSL ships with several built-in executors so you can describe most real-world automation tasks without custom glue code.

Shell commands

wf.step("list-tmp", "ls -la /tmp")

wf.step("scripted-task", callback=lambda s:
    s.shell(
        """
        #!/usr/bin/env bash
        echo "Starting process..."
        for i in {1..5}; do
          echo "Step $i"
        done
        """
    )
)
Use shell steps for quick checks, small scripts, or delegating work to existing CLI tools.

Python code

wf.step("python-task", callback=lambda s:
    s.python(
        """
        import json

        data = {"status": "success", "count": 42}
        print(json.dumps(data))
        """
    )
    .output("TASK_RESULT")
)
Python steps are great for lightweight data processing, API orchestration, or glue code that would be cumbersome in shell.

Native Docker build and run

Build images and run containers as first-class workflow steps.
wf.step("build-image", callback=lambda s:
    s.docker_build(
        image="myorg/payments-api:${IMAGE_TAG}",
        git={"url": "https://github.com/myorg/payments-api", "ref": "main"},
        dockerfile="Dockerfile",
        push=True,
    )
)

wf.step("run-migrations", callback=lambda s:
    s.docker_run(
        image="myorg/payments-api:${IMAGE_TAG}",
        command=["python", "manage.py", "migrate"],
        env={"ENVIRONMENT": "${ENVIRONMENT}"},
        memory="1Gi",
        cpu_limit="1",
    )
    .depends("build-image")
)
These steps let you standardize how images are built and run, optionally delegating execution to Kubernetes with resource limits and scheduling hints.

HTTP and SSH executors

# Call an external API as part of a workflow
wf.step("notify-webhook", callback=lambda s:
    s.http(
        url="https://hooks.example.com/deployments",
        method="POST",
        headers={"Content-Type": "application/json"},
        body={"service": "payments-api", "status": "deployed"},
    )
)

# Run a command over SSH on a remote host
wf.step("restart-service", callback=lambda s:
    s.ssh(
        host="bastion.example.com",
        user="deploy",
        command="sudo systemctl restart payments-api",
    )
)
Use these executors when you need to integrate with legacy systems, on-prem services, or existing SSH-based automation.

Agents and LLM-powered steps

You can embed Kubiya agents or inline agents directly into workflows to handle free-form tasks like incident analysis, ticket triage, or runbook execution.
wf.step("summarize-incident", callback=lambda s:
    s.inline_agent(
        message="Summarize the incident based on {{LOG_OUTPUT}}",
        agent_name="sre-incident-analyzer",
        ai_instructions="You are an on-call SRE. Provide a concise summary and next actions.",
        runners=["sre-agents"],
        llm_model="gpt-4o",
    )
    .depends("collect-logs")
    .output("INCIDENT_SUMMARY")
)
This pattern is useful when you want AI assistance as a formal step in your automation, rather than a separate manual process.

Tools and bounded services

Define tools inline or reference existing tools, and attach temporary services like databases or caches for testing.
wf.step("health-checker", callback=lambda s:
    s.tool_def(
        name="service-health-checker",
        description="Check HTTP health endpoint for a service",
        type="docker",
        image="curlimages/curl:latest",
        content="""
        #!/bin/sh
        curl -fsS "$SERVICE_URL/health" || exit 1
        """,
        args={"SERVICE_URL": "https://api.example.com"},
    )
)
For more advanced scenarios you can use with_database, with_cache, or with_message_queue to spin up temporary dependencies for tests and data migrations.

Advanced Features

Conditional and guarded execution

Use preconditions to control when a step should run, based on parameters, outputs, or external checks.
from kubiya.dsl import chain

wf = (
    chain("conditional-deploy")
        .params(ENVIRONMENT="staging")

        .step("build", "docker build -t app .")

        # Only run security scan in production
        .step("security-scan", callback=lambda s:
            s.shell("trivy image app:latest")
                .preconditions("${ENVIRONMENT} == 'production'")
        )
)

Error handling, retries, and continue-on

Configure how steps behave on failure so workflows are resilient but still predictable.
wf = (
    chain("resilient-pipeline")
        .step("risky-operation", callback=lambda s:
            s.shell("./deploy.sh")
                .retry(limit=3, interval_sec=30, backoff=2.0)
        )

        # Run rollback if deploy fails, but keep the workflow marked as failed
        .step("rollback", callback=lambda s:
            s.shell("./rollback.sh")
                .depends("risky-operation")
                .continue_on(failure=True)
        )
)
You can also use repeat(...) for polling patterns (for example, waiting until a condition in an external system becomes true).

Tool definitions with services

Inline tools can be paired with temporary services such as databases to build realistic ephemeral environments.
wf.step("run-db-migrations", callback=lambda s:
    s.tool_def(
        name="db-migrator",
        description="Run database migrations against a temporary Postgres instance",
        type="docker",
        image="alpine:latest",
        content="""
        #!/bin/sh
        alembic upgrade head
        """,
        args={},
    )
    .with_database(db_type="postgres")
)
This is especially useful for integration tests and sandbox environments.

Complete Example

The example below shows a realistic CI/CD workflow that builds a Docker image, runs tests inside a container, deploys to Kubernetes, and notifies a webhook.
from kubiya.dsl import chain

cicd_pipeline = (
    chain("ci-cd-pipeline")
        .description("CI/CD pipeline with Docker build/run, tests, and deployment")
        .params(
            SERVICE_NAME="payments-api",
            IMAGE_TAG="v1.0.0",
            ENVIRONMENT="staging",
        )
        .queue("cicd-jobs", max_active_runs=5)

        # Step 1: Build image from source and push
        .step("build-image", callback=lambda s:
            s.docker_build(
                image="myorg/${SERVICE_NAME}:${IMAGE_TAG}",
                git={"url": "https://github.com/myorg/payments-api", "ref": "main"},
                push=True,
            )
        )

        # Step 2: Run tests inside a container
        .step("test", callback=lambda s:
            s.docker_run(
                image="myorg/${SERVICE_NAME}:${IMAGE_TAG}",
                command=["pytest", "tests", "-q"],
                env={"ENVIRONMENT": "test"},
                memory="1Gi",
                cpu_limit="1",
            )
            .depends("build-image")
        )

        # Step 3: Deploy to Kubernetes
        .step("deploy", callback=lambda s:
            s.shell(
                "kubectl set image deployment/${SERVICE_NAME} "
                "${SERVICE_NAME}=myorg/${SERVICE_NAME}:${IMAGE_TAG} -n ${ENVIRONMENT}"
            )
            .depends("test")
            .output("DEPLOYMENT_STATUS")
        )

        # Step 4: Notify external system
        .step("notify", callback=lambda s:
            s.http(
                url="https://hooks.example.com/deployments",
                method="POST",
                headers={"Content-Type": "application/json"},
                body={
                    "service": "${SERVICE_NAME}",
                    "environment": "${ENVIRONMENT}",
                    "status": "{{DEPLOYMENT_STATUS}}",
                },
            )
            .depends("deploy")
        )
)

# Convert to a definition for execution (SDK, CLI, or Control Plane)
workflow_def = cicd_pipeline.to_dict()

DSL Methods Reference

Workflow methods

MethodDescription
.description(text)Set a human-readable description for the workflow
.type("chain" | "graph")Explicitly set the workflow type (sequential vs dependency graph)
.schedule(cron)Attach a cron schedule for periodic runs
.env(**variables)Define shared environment variables for all steps
.params(**parameters)Define workflow parameters with default values
.with_files(files)Attach in-memory files (e.g., configs, scripts) to the workflow
.dotenv(*files)Load environment from one or more .env files
.step(name, command=None, callback=fn)Add a step (either simple command or fully configured via callback)
.parallel_steps(name, items, command, max_concurrent=None)Run the same command in parallel across multiple items
.sub_workflow(name, workflow, params=None)Call another workflow as a sub-workflow step
.get_secret_step(name, secret_name, **kwargs)Add a step that retrieves a secret by name
.runner(name)Set which runner / worker group should execute the workflow
.queue(name, max_active_runs=None)Assign the workflow to a queue and cap concurrent runs
.max_active_runs(limit)Limit how many runs of this workflow may be active at once
.max_active_steps(limit)Limit how many steps may run in parallel
.skip_if_successful(skip=True)Skip execution if a successful run already exists for the period
.timeout(seconds)Set a maximum runtime for the workflow
.cleanup_timeout(seconds)Set a timeout for cleanup logic after the workflow ends
.delay(seconds)Delay workflow start after it is triggered
.max_output_size(bytes)Limit the size of captured output
.handlers(success=None, failure=None, exit=None, cancel=None)Register lifecycle hooks that run on workflow events
.notifications(...)Configure email notifications on success/failure
.tags(*tags)Attach tags for search and organization
.group(name)Assign the workflow to a logical group
.preconditions(*conditions)Define workflow-level preconditions that must be satisfied before running
.to_dict()Convert the workflow to a Python dictionary
.to_json(indent=2)Convert the workflow definition to JSON
.to_yaml()Convert the workflow definition to YAML
.compile(indent=2)Compile to JSON (alias for to_json)
.validate()Perform basic validation and return errors/warnings

Step configuration methods

MethodDescription
.description(text)Set a description for the step
.shell(command, **config)Run a shell command with optional configuration (env, shell type, etc.)
.python(script)Run a Python script inside the step
.docker(image, command=None, content=None)Run a Docker container with an optional script or command
.docker_build(image, **config)Build a Docker image (optionally from git) with advanced configuration
.docker_run(image, **config)Run a container with resource limits, env, volumes, and Kubernetes options
.http(url, method="GET", headers=None, body=None)Call an HTTP endpoint
.ssh(host, user, command, port=22, key_file=None)Run a command over SSH on a remote host
.kubiya(url, method="GET", **config)Call Kubiya APIs as part of a workflow step
.llm_completion(...)Run an LLM completion (e.g., summarization, classification) as a step
.inline_agent(...)Configure and run an inline agent for a single step
.agent(...)Invoke an existing agent by name as a workflow step
.tool_def(...)Define a tool inline (image, content, arguments, services)
.tool(name_or_tool, args=None, timeout=None, **kwargs)Use an existing tool by name or Tool instance
.jq(query)Process JSON output using a jq-style query
.args(**arguments)Provide arguments for tool or executor configuration
.depends(*step_names)Declare dependencies on other steps
.parallel(items, max_concurrent=None)Run the same step across multiple items in parallel
.output(name)Capture the step’s output into a named variable
.stdout(path)Redirect standard output to a file
.stderr(path)Redirect standard error to a file
.env(variables=None, **kwargs)Attach environment variables to the step
.dir(path)Set the working directory for the command
.shell_type(shell)Use a specific shell (e.g., bash, sh)
.id(identifier)Set a stable identifier for programmatic referencing
.preconditions(*conditions)Add preconditions that must be met before the step runs
.retry(...)Configure retry policy (limit, intervals, backoff, exit codes, etc.)
.repeat(...)Configure repeat/polling behavior for the step
.continue_on(...)Control when the workflow should continue despite failures
.timeout(seconds)Set a timeout for the step
.retries(count)Set a simple retry count
.signal_on_stop(signal)Choose which signal to send when stopping the step
.mail_on_error(send=True)Enable email notification when the step fails
.with_service(...)Attach a bounded service (e.g., helper container) to a tool step
.with_database(...)Attach a temporary database service for the step
.with_cache(...)Attach a cache service (e.g., Redis) for the step
.with_message_queue(...)Attach a message queue service (e.g., RabbitMQ) for the step

Best Practices

The DSL is flexible enough to describe very large workflows. These guidelines help keep things readable and maintainable as they grow.
Each step should do one thing well:
# Good – separate concerns
.step("build", "docker build -t app .")
.step("test", "pytest tests/")
.step("deploy", "kubectl apply -f deployment.yaml")

# Avoid – too much in one step
.step("build-test-deploy", "docker build && pytest && kubectl apply")
Prefer descriptive names over positional ones:
# Good – clear purpose
.step("validate-deployment-prerequisites", ...)
.step("execute-database-migration", ...)

# Avoid – vague names
.step("step1", ...)
.step("do-stuff", ...)
Make workflows reusable across environments and services:
wf = (
    chain("deploy")
        .params(
            ENVIRONMENT="staging",  # Default value
            VERSION="latest",
            REPLICAS="3",
        )
)
Document intent so others (and future you) can understand the workflow quickly:
wf = (
    chain("complex-pipeline")
        .description("CI/CD pipeline with security scanning and rollback")
)

wf.step("deploy", callback=lambda s:
    s.shell("kubectl apply -f deployment.yaml")
        .description("Deploy application to Kubernetes cluster")
)

Next Steps