Deep dive into advanced workflow features, streaming, custom providers, and production deployment
Master the advanced features of Kubiya workflows for production-grade automation.
Execute shell commands in any container:
result = step.shell(
name="process-data",
image="alpine:latest",
command="""
echo "Processing..."
cat input.json | jq '.data[]' > output.json
""",
env={"API_KEY": "${secrets.API_KEY}"}
)
Run Python scripts with full ecosystem:
analysis = step.python(
name="analyze-metrics",
image="python:3.11-slim",
packages=["pandas", "numpy", "scikit-learn"],
script="""
import pandas as pd
import numpy as np
df = pd.read_csv('/data/metrics.csv')
summary = df.describe()
print(summary.to_json())
""",
volumes={"/data": "metrics-volume"}
)
Run any containerized application:
server = step.container(
name="web-server",
image="nginx:alpine",
ports={"80": "8080"},
volumes={
"./config": "/etc/nginx/conf.d",
"./static": "/usr/share/nginx/html"
},
health_check={
"test": ["CMD", "curl", "-f", "http://localhost/health"],
"interval": "30s",
"retries": 3
}
)
Embed AI decision-making:
decision = step.inline_agent(
name="deployment-decision",
message="Analyze these metrics and decide if we should deploy: ${metrics}",
runners=["kubiya-hosted"],
llm_model="gpt-4",
is_debug_mode=True,
tools=[
{
"name": "check-metrics",
"type": "docker",
"image": "datadog/agent:latest",
"content": "datadog-check.sh",
"args": {"threshold": "0.95"}
}
]
)
from kubiya_workflow_sdk import Client
import json
client = Client(api_key="your-key")
# Stream workflow execution
for event in client.execute_workflow(workflow, stream=True):
if event.type == "step.started":
print(f"Starting: {event.data['step_name']}")
elif event.type == "log":
print(f"Log: {event.data['message']}")
elif event.type == "step.completed":
print(f"Completed: {event.data['step_name']}")
# For integration with Vercel AI SDK
async for chunk in client.stream_workflow(workflow, format="vercel"):
# Chunk format compatible with Vercel AI SDK
if chunk['type'] == '0': # Text delta
print(chunk['value'], end='')
elif chunk['type'] == '8': # Tool call
tool_data = json.loads(chunk['value'])
print(f"Tool: {tool_data['toolName']}")
class WorkflowStreamProcessor:
def __init__(self):
self.steps_completed = 0
self.logs = []
async def process_stream(self, workflow):
async for event in workflow.stream():
if event.type == "step.completed":
self.steps_completed += 1
await self.notify_progress(self.steps_completed)
elif event.type == "log":
self.logs.append(event.data)
await self.send_to_monitoring(event.data)
from kubiya_workflow_sdk.providers import BaseProvider
from typing import Dict, Any, AsyncGenerator
class CustomLLMProvider(BaseProvider):
"""Custom provider for your LLM"""
def __init__(self, api_key: str, model: str = "custom-model"):
self.api_key = api_key
self.model = model
async def compose(
self,
task: str,
mode: str = "plan",
context: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Generate workflow from natural language"""
# Your LLM logic here
workflow = await self._call_llm(task, context)
return {
"workflow": workflow,
"metadata": {"model": self.model}
}
async def stream_compose(
self,
task: str,
mode: str = "plan"
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream workflow generation"""
async for chunk in self._stream_llm(task):
yield {
"type": "token",
"content": chunk
}
from kubiya_workflow_sdk.providers import register_provider
# Register your provider
register_provider("custom-llm", CustomLLMProvider)
# Use it
provider = get_provider("custom-llm", api_key="...")
workflow = await provider.compose("Deploy my app")
Simply create a local runner on the Kubiya platform web interface, REST API, or CLI to get a manifest, give your runner a name - and deploy it on your cluster -> You can now reference this runner string for workflow execution
# test_workflow.py
import pytest
from kubiya_workflow_sdk import Workflow, Client, Step
def test_data_pipeline():
workflow = Workflow(
name="test-pipeline",
steps=[
Step("fetch", "curl -o data.json https://api.example.com/data"),
Step("process", "jq '.items[]' data.json > processed.json"),
Step("validate", "python validate.py processed.json")
]
)
# Use client with your test runner
client = Client(api_key="test-key")
result = client.execute_workflow(
workflow,
runner="your-test-runner" # Created via Kubiya platform
)
assert result.success
assert len(result.steps) == 3
# Enable debug mode for detailed logs
workflow = Workflow(
name="debug-workflow",
debug=True,
log_level="DEBUG"
)
# Step-through debugging
for step in workflow.steps:
print(f"Executing: {step.name}")
result = step.execute(breakpoint=True)
# Inspect state
print(f"Output: {result.output}")
print(f"Logs: {result.logs}")
# Continue or modify
if input("Continue? (y/n): ") != "y":
break
@workflow
def production_pipeline():
try:
data = step.fetch_critical_data(
retry=3,
retry_interval="exponential",
timeout="5m",
on_failure="continue" # Don't fail entire workflow
)
except StepError as e:
# Fallback logic
data = step.fetch_cached_data()
step.alert_team(f"Using cached data: {e}")
# Add monitoring hooks
workflow.add_hook("before_step", lambda s: metrics.increment(f"step.{s.name}.started"))
workflow.add_hook("after_step", lambda s, r: metrics.histogram(f"step.{s.name}.duration", r.duration))
# Custom spans
with workflow.span("critical-section"):
result = step.critical_operation()
# Use Kubernetes secrets
step.database_operation(
env={
"DB_PASSWORD": "${k8s:secret/db-credentials/password}",
"API_KEY": "${vault:secret/api-keys/external}"
}
)
# Or environment variables
step.secure_operation(
env={
"TOKEN": "${env:SECURE_TOKEN}",
"CERT": "${file:/secrets/cert.pem}"
}
)
# Cache step outputs
@step.cache(key="data-fetch-${date}", ttl="1h")
def fetch_expensive_data():
return step.shell("./fetch-data.sh")
# Store artifacts
step.generate_report(
output_artifacts={
"report.pdf": "/reports/",
"metrics.json": "/metrics/"
}
)
@workflow
def dynamic_pipeline(regions: list):
# Generate steps dynamically
results = []
for region in regions:
result = step.process_region(
name=f"process-{region}",
env={"REGION": region},
parallel=True # Run all regions in parallel
)
results.append(result)
# Wait for all and aggregate
step.aggregate_results(
inputs=results,
wait_for_all=True
)
@workflow
def conditional_deployment():
test_result = step.run_tests()
if test_result.exit_code == 0:
if test_result.coverage > 80:
step.deploy_to_production()
else:
step.deploy_to_staging()
step.alert_team("Low coverage: ${test_result.coverage}%")
else:
step.rollback()
step.create_incident()
@workflow
def map_reduce_analysis():
# Map phase - parallel processing
chunks = step.split_data(chunks=10)
processed = step.map(
over=chunks,
operation=lambda chunk: step.process_chunk(
data=chunk,
image="processor:latest"
)
)
# Reduce phase
result = step.reduce(
data=processed,
operation="merge",
image="reducer:latest"
)
Step Timeout
# Increase timeout for long-running steps
step.long_operation(
timeout="30m", # Default is 5m
grace_period="5m" # Time to cleanup after timeout
)
Memory Issues
# Set appropriate memory limits
step.memory_intensive(
resources={"limits": {"memory": "32Gi"}},
swap_limit="64Gi"
)
Network Issues
# Configure network policies
step.external_api_call(
network_mode="host",
dns_servers=["8.8.8.8", "8.8.4.4"],
extra_hosts={"api.internal": "10.0.0.100"}
)
# Maximize parallelism
workflow.configure(
max_parallel_steps=20,
queue_size=100
)
# Reuse containers for similar steps
workflow.configure(
container_reuse=True,
pool_size=10
)
# Layer caching for builds
step.build_image(
cache_from=["registry/base:latest"],
build_args={"BUILDKIT_INLINE_CACHE": "1"}
)
Complete API documentation
Real-world workflow patterns
Common issues and solutions
Was this page helpful?