Basic Step Structure
Simple Steps
Copy
Ask AI
from kubiya.dsl import workflow
wf = (
workflow("basic-steps")
# Simple shell command
.step("list-files", "ls -la /tmp")
# Command with parameters
.step("check-status", "kubectl get pods -n ${NAMESPACE}")
)
Steps with Callbacks
For advanced configuration, use the callback pattern:Copy
Ask AI
wf = (
workflow("advanced-steps")
.step("deploy", callback=lambda s:
s.shell("kubectl apply -f deployment.yaml")
.description("Deploy application to Kubernetes")
.output("DEPLOYMENT_RESULT")
)
)
Step Types
1. Shell Commands
Execute shell commands and scripts:Copy
Ask AI
# Simple command
wf.step("simple", "echo 'Hello World'")
# Multi-line script
wf.step("script", callback=lambda s:
s.shell("""
#!/bin/bash
set -e
echo "Starting process..."
for i in {1..5}; do
echo "Processing item $i"
done
echo "Complete!"
""")
)
# With shell type specified
wf.step("bash-script", callback=lambda s:
s.shell("grep 'TODO' *.py | less")
.shell_type("bash")
)
2. Shell Scripts with Arguments
Copy
Ask AI
wf.step("parameterized-script", callback=lambda s:
s.script("""
#!/bin/sh
echo "Processing data..."
grep "${pattern}" "${file}"
""")
.args(
pattern="ERROR",
file="application.log"
)
)
3. Python Code
Execute Python code directly:Copy
Ask AI
wf.step("python-task", callback=lambda s:
s.python("""
import json
import sys
# Process data
data = {
"status": "success",
"count": 42,
"items": ["item1", "item2", "item3"]
}
# Output results
print(json.dumps(data, indent=2))
""")
)
# Python with variables
wf.step("python-with-vars", callback=lambda s:
s.python("""
colors = ["red", "yellow", "green", "blue"]
for i, color in enumerate(colors, 1):
print(f"{i}. Color: {color}")
""")
)
4. Docker Containers
Run steps in Docker containers:Copy
Ask AI
wf.step("containerized-python", callback=lambda s:
s.docker(
image="python:3.11-slim",
content="""
#!/usr/bin/env python3
import sys
print(f"Python version: {sys.version}")
print("Running in Docker!")
"""
)
)
# Alpine container with dependencies
wf.step("alpine-task", callback=lambda s:
s.docker(
image="alpine:latest",
content="""
#!/bin/sh
set -e
if ! command -v curl >/dev/null 2>&1; then
apk add --no-cache curl
fi
curl -fsS https://api.example.com/health
"""
)
)
5. Kubiya API Calls
Interact with Kubiya platform APIs:Copy
Ask AI
wf.step("get-runners", callback=lambda s:
s.kubiya(
url="api/v3/runners",
method="GET"
).output("RUNNERS_DATA")
)
wf.step("create-resource", callback=lambda s:
s.kubiya(
url="api/v3/resources",
method="POST",
body={"name": "resource-name", "type": "config"}
)
)
6. Custom Tool Definitions
Define custom tools inline:Copy
Ask AI
wf.step("custom-tool", callback=lambda s:
s.tool_def(
name="service_health_check",
description="Check service health endpoint",
type="docker",
image="alpine:latest",
content="""
#!/bin/sh
if ! command -v curl >/dev/null 2>&1; then
apk add --no-cache curl
fi
echo "Checking health of $SERVER_URL..."
if curl -fsS "$SERVER_URL" >/dev/null; then
echo "✅ Service is healthy"
exit 0
else
echo "❌ Service health check failed"
exit 1
fi
""",
args={
"SERVER_URL": "https://api.example.com/health"
}
)
)
Step Dependencies
Sequential Dependencies
Control the execution order:Copy
Ask AI
wf = (
workflow("sequential-pipeline")
# Step 1: Build
.step("build", "docker build -t myapp:latest .")
# Step 2: Test (depends on build)
.step("test", callback=lambda s:
s.shell("docker run myapp:latest pytest tests/")
.depends("build")
)
# Step 3: Push (depends on test)
.step("push", callback=lambda s:
s.shell("docker push myapp:latest")
.depends("test")
)
# Step 4: Deploy (depends on push)
.step("deploy", callback=lambda s:
s.shell("kubectl apply -f deployment.yaml")
.depends("push")
)
)
Multiple Dependencies
A step can depend on multiple previous steps:Copy
Ask AI
wf = (
workflow("multi-dependency")
# Independent steps
.step("unit-tests", "pytest tests/unit/")
.step("integration-tests", "pytest tests/integration/")
.step("lint", "flake8 .")
# This step waits for all three to complete
.step("build", callback=lambda s:
s.shell("docker build -t app .")
.depends("unit-tests")
.depends("integration-tests")
.depends("lint")
)
)
Step Outputs and Variables
Capturing Outputs
Capture step output for use in later steps:Copy
Ask AI
wf = (
workflow("data-flow")
.params(INPUT_FILE="data.csv")
# Produce data
.step("count-lines", callback=lambda s:
s.shell("wc -l ${INPUT_FILE}")
.output("LINE_COUNT")
)
# Consume data
.step("report", callback=lambda s:
s.shell("echo 'Processed {{LINE_COUNT}} lines'")
.depends("count-lines")
)
)
Complex Data Flow
Copy
Ask AI
wf = (
workflow("advanced-data-flow")
# Step 1: Fetch data and output multiple variables
.step("fetch", callback=lambda s:
s.python("""
import json
data = {
"count": 100,
"status": "success",
"timestamp": "2024-01-01T00:00:00Z"
}
print(json.dumps(data))
""")
.output("API_RESPONSE")
)
# Step 2: Process using the output
.step("process", callback=lambda s:
s.python("""
import json
response = json.loads('{{API_RESPONSE}}')
print(f"Processing {response['count']} items")
print(f"Status: {response['status']}")
""")
.depends("fetch")
)
)
Step Configuration
Step Descriptions
Add descriptions for documentation:Copy
Ask AI
wf.step("deploy-production", callback=lambda s:
s.shell("kubectl apply -f deployment.yaml")
.description("Deploy application to production Kubernetes cluster")
)
Output Variables
Name output variables for use in subsequent steps:Copy
Ask AI
wf.step("get-version", callback=lambda s:
s.shell("cat version.txt")
.output("APP_VERSION")
)
wf.step("tag-image", callback=lambda s:
s.shell("docker tag app:latest app:{{APP_VERSION}}")
.depends("get-version")
)
Variable Interpolation
Using Workflow Parameters
Copy
Ask AI
wf = (
workflow("parameterized")
.params(
SERVICE_NAME="my-service",
ENVIRONMENT="staging",
REPLICAS="3"
)
.step("deploy", "kubectl apply -f deployment.yaml")
.step("scale", callback=lambda s:
s.shell("kubectl scale deployment ${SERVICE_NAME} --replicas=${REPLICAS} -n ${ENVIRONMENT}")
)
)
Using Step Outputs
Copy
Ask AI
wf = (
workflow("output-usage")
# Produce output
.step("get-config", callback=lambda s:
s.shell("cat config.json")
.output("CONFIG")
)
# Use output with {{variable}} syntax
.step("apply-config", callback=lambda s:
s.shell("echo '{{CONFIG}}' | kubectl apply -f -")
.depends("get-config")
)
)
Complete Examples
Example 1: Multi-Step Data Processing
Copy
Ask AI
from kubiya import KubiyaClient, workflow
client = KubiyaClient()
wf = (
workflow("data-processing-pipeline")
.description("Process and transform data files")
.params(
INPUT_DIR="/data/raw",
OUTPUT_DIR="/data/processed"
)
# Step 1: Create output directory
.step("setup", "mkdir -p ${OUTPUT_DIR}")
# Step 2: List input files
.step("list-files", callback=lambda s:
s.shell("ls -1 ${INPUT_DIR}/*.csv")
.output("FILE_LIST")
.depends("setup")
)
# Step 3: Process files with Python
.step("process", callback=lambda s:
s.python("""
import os
import pandas as pd
input_dir = os.getenv('INPUT_DIR')
output_dir = os.getenv('OUTPUT_DIR')
# Process each CSV file
for filename in os.listdir(input_dir):
if filename.endswith('.csv'):
df = pd.read_csv(os.path.join(input_dir, filename))
# Clean data
df_clean = df.dropna()
df_clean = df_clean.drop_duplicates()
# Save processed file
output_path = os.path.join(output_dir, f"processed_{filename}")
df_clean.to_csv(output_path, index=False)
print(f"Processed {filename}: {len(df)} -> {len(df_clean)} rows")
""")
.depends("list-files")
)
# Step 4: Generate summary
.step("summarize", callback=lambda s:
s.shell("ls -lh ${OUTPUT_DIR}")
.depends("process")
)
)
# Execute
for event in client.execute_workflow(wf.to_dict(), stream=True):
print(event)
Example 2: Complex Dependencies
Copy
Ask AI
wf = (
workflow("complex-deployment")
.description("Multi-service deployment with health checks")
.params(VERSION="v1.0.0", ENVIRONMENT="staging")
# Build all services (can run in parallel)
.step("build-api", "docker build -t api:${VERSION} ./api")
.step("build-frontend", "docker build -t frontend:${VERSION} ./frontend")
.step("build-worker", "docker build -t worker:${VERSION} ./worker")
# Run tests (depend on builds)
.step("test-api", callback=lambda s:
s.shell("docker run api:${VERSION} pytest tests/")
.depends("build-api")
)
.step("test-frontend", callback=lambda s:
s.shell("docker run frontend:${VERSION} npm test")
.depends("build-frontend")
)
# Push images (depend on tests)
.step("push-api", callback=lambda s:
s.shell("docker push api:${VERSION}")
.depends("test-api")
)
.step("push-frontend", callback=lambda s:
s.shell("docker push frontend:${VERSION}")
.depends("test-frontend")
)
.step("push-worker", callback=lambda s:
s.shell("docker push worker:${VERSION}")
.depends("build-worker")
)
# Deploy (depends on all pushes)
.step("deploy-all", callback=lambda s:
s.shell("kubectl apply -f k8s/ -n ${ENVIRONMENT}")
.depends("push-api")
.depends("push-frontend")
.depends("push-worker")
)
# Health check (depends on deployment)
.step("health-check", callback=lambda s:
s.docker(
image="alpine:latest",
content="""
#!/bin/sh
apk add --no-cache curl
sleep 30 # Wait for pods to be ready
curl -f http://api.${ENVIRONMENT}.svc.cluster.local/health || exit 1
curl -f http://frontend.${ENVIRONMENT}.svc.cluster.local/health || exit 1
echo "✅ All services healthy"
"""
)
.depends("deploy-all")
)
)
Best Practices
Keep Steps Atomic
Keep Steps Atomic
Each step should have a single, clear purpose:
Copy
Ask AI
# Good - separate concerns
.step("build", "docker build -t app .")
.step("test", "pytest tests/")
.step("push", "docker push app:latest")
# Avoid - doing too much
.step("build-and-test-and-push", "docker build && pytest && docker push")
Use Dependencies Wisely
Use Dependencies Wisely
Only add dependencies when truly needed:
Copy
Ask AI
# Good - clear dependencies
.step("build", "docker build -t app .")
.step("test", callback=lambda s:
s.shell("pytest tests/")
.depends("build") # Needs the build artifact
)
# Avoid - unnecessary dependencies (steps can run in parallel)
.step("lint", "flake8 .")
.step("format", callback=lambda s:
s.shell("black .")
.depends("lint") # Unnecessary - these can run in parallel
)
Name Outputs Clearly
Name Outputs Clearly
Use descriptive names for output variables:
Copy
Ask AI
# Good - clear purpose
.step("get-version", callback=lambda s:
s.shell("cat version.txt")
.output("APP_VERSION")
)
# Avoid - vague names
.step("get-version", callback=lambda s:
s.shell("cat version.txt")
.output("OUTPUT")
)
Add Descriptions
Add Descriptions
Document complex steps:
Copy
Ask AI
.step("migrate-database", callback=lambda s:
s.shell("./migrate.sh")
.description("Apply database migrations to production schema")
)
Next Steps
Examples
Browse real-world workflow patterns