Practical examples of automated workflows using containers and AI
from kubiya import workflow, step
@workflow
def deploy_app():
# Run tests in Node.js container
test = step("test").docker(
image="node:18",
command="npm test"
)
# Build Docker image
build = step("build").docker(
image="docker:latest",
command="docker build -t myapp:latest ."
).depends("test")
# Deploy to Kubernetes
step("deploy").docker(
image="kubectl:latest",
command="kubectl apply -f deployment.yaml"
).depends("build")
@workflow
def etl_pipeline():
# Extract data from database
extract = step("extract").docker(
image="postgres:15",
command="pg_dump -h $DB_HOST -t users > users.sql"
)
# Transform with Python
transform = step("transform").docker(
image="python:3.11",
script="""
import pandas as pd
df = pd.read_sql_query("SELECT * FROM users", connection)
df['processed'] = df['created_at'].dt.date
df.to_csv('transformed_users.csv')
"""
).depends("extract")
# Load to data warehouse
step("load").docker(
image="snowflake/snowcli:latest",
command="snow stage copy transformed_users.csv @warehouse"
).depends("transform")
@workflow
def ai_code_review(pr_number: int):
# Fetch PR changes
fetch = step("fetch-pr").docker(
image="alpine/git:latest",
command=f"git diff main..pr-{pr_number} > changes.diff"
).output("PR_CHANGES")
# AI agent analyzes the code
review = step("analyze-code").inline_agent(
message="Review this code for quality, bugs, and best practices: ${PR_CHANGES}",
agent_name="code-reviewer",
tools=[
{
"name": "security-scanner",
"image": "securecodewarrior/semgrep:latest",
"command": "semgrep --config=auto ."
}
]
).depends("fetch-pr").output("REVIEW_RESULTS")
# Post review comments
step("post-review").docker(
image="github/hub:latest",
command="hub api repos/:owner/:repo/pulls/${pr_number}/reviews -f body='${REVIEW_RESULTS}'"
).depends("analyze-code")
from kubiya_workflow_sdk.tools import Tool, Arg
# Define custom tools
url_validator = Tool(
name="url_validator",
description="Validate URL format and connectivity",
type="docker",
image="curlimages/curl:latest",
content="""
#!/bin/sh
echo "Validating URL: $url"
if echo "$url" | grep -q '^https\\?://'; then
echo "✓ Valid URL format"
curl -s --head "$url" && echo "✓ URL is accessible"
else
echo "✗ Invalid URL format"
exit 1
fi
""",
args=[
Arg(name="url", type="str", description="URL to validate", required=True)
]
)
json_processor = Tool(
name="json_processor",
description="Process and validate JSON data",
type="docker",
image="python:3.11-alpine",
content="""
#!/bin/sh
python3 -c "
import json
import sys
try:
data = json.loads('$json_data')
if '$operation' == 'validate':
print('✓ Valid JSON')
print(f'Keys: {list(data.keys())}')
elif '$operation' == 'pretty':
print(json.dumps(data, indent=2))
except json.JSONDecodeError as e:
print(f'✗ Invalid JSON: {e}')
sys.exit(1)
"
""",
args=[
Arg(name="json_data", type="str", description="JSON data to process", required=True),
Arg(name="operation", type="str", description="Operation type",
default="validate", options=["validate", "pretty"])
]
)
@workflow
def incident_response():
# Detect issue from monitoring
detect = step("detect").docker(
image="alpine:latest",
script="curl -s http://prometheus:9090/api/v1/alerts"
).output("ALERTS")
# AI agent determines severity and actions
assess = step("assess-incident").inline_agent(
message="Analyze these alerts and recommend actions: ${ALERTS}",
agent_name="incident-commander",
tools=[
{
"name": "check-logs",
"image": "elasticsearch/elasticsearch:8.0",
"command": "curl -X GET 'localhost:9200/_search'"
},
{
"name": "check-resources",
"image": "kubernetes/kubectl:latest",
"command": "kubectl top nodes"
}
]
).depends("detect").output("RESPONSE_PLAN")
# Execute remediation
step("remediate").docker(
image="ansible/ansible:latest",
command="ansible-playbook fix-${RESPONSE_PLAN.issue_type}.yml"
).depends("assess-incident")
@workflow
def process_files():
# List files to process
list_files = step("list").shell(
"ls /data/*.csv > files.txt"
).output("FILE_LIST")
# Process each file in parallel
step("process").parallel(
items="${FILE_LIST}",
max_concurrent=3,
docker_config={
"image": "python:3.11",
"script": """
import pandas as pd
import sys
filename = sys.argv[1]
df = pd.read_csv(f'/data/{filename}')
df['processed'] = True
df.to_csv(f'/output/processed_{filename}')
"""
}
).depends("list")
@workflow
def database_migration():
# Backup database
backup = step("backup").docker(
image="postgres:15",
command="pg_dump production_db > backup_$(date +%Y%m%d).sql"
)
# Test migration on copy
test = step("test-migration").docker(
image="postgres:15",
script="""
# Create test database from backup
createdb test_migration
psql test_migration < backup_*.sql
# Run migration
psql test_migration < migration.sql
# Validate
psql test_migration -c "SELECT COUNT(*) FROM users;"
"""
).depends("backup")
# Apply to production only if test passes
step("migrate-prod").docker(
image="postgres:15",
command="psql production_db < migration.sql"
).depends("test").preconditions("${test.exit_code} == 0")
@workflow
def resilient_workflow():
# Step with retry logic
fetch = step("fetch-data").docker(
image="curlimages/curl:latest",
command="curl -f https://api.example.com/data"
).retry(limit=3, backoff="exponential")
# Process data with timeout
process = step("process").docker(
image="python:3.11",
script="process_data.py"
).depends("fetch").timeout("10m")
# Cleanup that always runs
step("cleanup").shell(
"rm -rf /tmp/workspace/*"
).continue_on_failure(True)
@workflow
def multi_env_deploy(environment: str):
# Build once
build = step("build").docker(
image="docker:latest",
command="docker build -t myapp:${GIT_SHA} ."
)
# Deploy to staging (always)
deploy_staging = step("deploy-staging").docker(
image="kubectl:latest",
command="kubectl apply -f staging.yaml"
).depends("build")
# Deploy to production (only if staging succeeds and env is prod)
step("deploy-production").docker(
image="kubectl:latest",
command="kubectl apply -f production.yaml"
).depends("deploy-staging").preconditions(
"${deploy_staging.exit_code} == 0",
"${environment} == 'production'"
)
node:18.17
not node:latest
)
Handle failures gracefully: Add retry logic and cleanup steps that always run
Pass data between steps: Use .output()
to capture results and reference them in later steps
Leverage parallel execution: Independent steps will run simultaneously by default
Include AI where it adds value: Use inline agents for analysis, decision-making, and dynamic responses
Test incrementally: Start with simple workflows and add complexity gradually
Was this page helpful?