Get started with the Kubiya SDK by building and executing your first workflow.
Your First Workflow
Let’s create a simple workflow that demonstrates the core capabilities of the SDK.
Step 1: Initialize the Client
from kubiya import KubiyaClient
# Initialize with your API key
client = KubiyaClient( api_key = "your-api-key" )
If you’ve set the KUBIYA_API_KEY environment variable, you can simply use client = KubiyaClient()
Step 2: Define a Workflow
from kubiya.dsl import workflow
# Create a simple workflow using the DSL
wf = (
workflow( "hello-kubiya" )
.description( "My first Kubiya workflow" )
.step( "greet" , "echo 'Hello from Kubiya!'" )
.step( "date" , "date" )
.step( "system-info" , "uname -a" )
)
Step 3: Execute the Workflow
# Execute with streaming to see real-time output
for event in client.execute_workflow(wf.to_dict(), stream = True ):
print (event)
Complete Example
Here’s the full code:
from kubiya import KubiyaClient
from kubiya.dsl import workflow
# Initialize client
client = KubiyaClient( api_key = "your-api-key" )
# Define workflow
wf = (
workflow( "hello-kubiya" )
.description( "My first Kubiya workflow" )
.step( "greet" , "echo 'Hello from Kubiya!'" )
.step( "date" , "date" )
.step( "system-info" , "uname -a" )
)
# Execute with streaming
print ( "🚀 Starting workflow execution..." )
for event in client.execute_workflow(wf.to_dict(), stream = True ):
print ( f "📋 { event } " )
print ( "✅ Workflow completed!" )
Multi-Step Workflow with Dependencies
Let’s create a more complex workflow with step dependencies:
from kubiya.dsl import workflow
wf = (
workflow( "data-processing" )
.description( "Process data with multiple steps" )
.params(
INPUT_FILE = "data.csv" ,
OUTPUT_DIR = "/tmp/processed"
)
# Step 1: Create output directory
.step( "create-dir" , "mkdir -p $ {OUTPUT_DIR} " )
# Step 2: Process data (depends on step 1)
.step( "process" , callback = lambda s :
s.shell( "cat $ {INPUT_FILE} | wc -l > $ {OUTPUT_DIR} /count.txt" )
.depends( "create-dir" )
)
# Step 3: Verify output (depends on step 2)
.step( "verify" , callback = lambda s :
s.shell( "cat $ {OUTPUT_DIR} /count.txt" )
.depends( "process" )
)
)
# Execute
for event in client.execute_workflow(wf.to_dict(), stream = True ):
print (event)
Using the Client SDK
The Client SDK provides direct access to Kubiya platform services:
Managing Agents
# List all agents
agents = client.agents.list( limit = 10 )
for agent in agents:
print ( f "Agent: { agent[ 'name' ] } - { agent.get( 'description' , 'No description' ) } " )
# Create a new agent
agent = client.agents.create(
name = "devops-helper" ,
description = "AI assistant for DevOps tasks" ,
llm_model = "claude-sonnet-4" ,
tools = [ "kubectl" , "terraform" ]
)
print ( f "Created agent: { agent[ 'uuid' ] } " )
Managing Workflows
# Execute a workflow with parameters
workflow_def = {
"name" : "deploy-app" ,
"steps" : [
{ "name" : "build" , "command" : "docker build -t myapp ." },
{ "name" : "deploy" , "command" : "kubectl apply -f deployment.yaml" }
]
}
for event in client.workflows.execute(
workflow_definition = workflow_def,
parameters = { "environment" : "staging" },
stream = True
):
print ( f "Status: { event } " )
Managing Secrets
# Create a secret
client.secrets.create(
name = "github-token" ,
value = "ghp_xxxxxxxxxxxx" ,
description = "GitHub API token"
)
# List secrets
secrets = client.secrets.list()
for secret in secrets:
print ( f "Secret: { secret[ 'name' ] } " )
# Get secret value
token = client.secrets.value( "github-token" )
Workflow with Python Code
Execute Python code directly in your workflows:
from kubiya.dsl import workflow
wf = (
workflow( "python-workflow" )
.description( "Workflow with Python code" )
# Python step
.step( "calculate" , callback = lambda s :
s.python(
"""
colors = ["red", "yellow", "green", "blue"]
for i, color in enumerate(colors, 1):
print(f" {i} . Color: {color} ")
"""
)
)
# Shell step using Python output
.step( "summarize" , callback = lambda s :
s.shell( "echo 'Processing complete!'" )
.depends( "calculate" )
)
)
for event in client.execute_workflow(wf.to_dict(), stream = True ):
print (event)
Workflow with Outputs and Variables
Capture step outputs and use them in subsequent steps:
from kubiya.dsl import workflow
wf = (
workflow( "data-pipeline" )
.description( "Pipeline with outputs" )
# Step 1: Produce data
.step( "fetch-data" , callback = lambda s :
s.shell( "echo '42'" )
.output( "RESULT_VAR" )
)
# Step 2: Use the output
.step( "process-data" , callback = lambda s :
s.python( "print(int(' {{ RESULT_VAR }} ') * 2)" )
.depends( "fetch-data" )
)
)
for event in client.execute_workflow(wf.to_dict(), stream = True ):
print (event)
Advanced Example: CI/CD Pipeline
Here’s a complete CI/CD workflow:
from kubiya.dsl import workflow
cicd_pipeline = (
workflow( "ci-cd-pipeline" )
.description( "Complete CI/CD pipeline" )
.params(
BRANCH = "main" ,
SERVICE_NAME = "my-service" ,
VERSION = "v1.0.0"
)
# Clone repository
.step( "checkout" , "git clone -b $ {BRANCH} https://github.com/org/repo.git" )
# Run tests
.step( "test" , callback = lambda s :
s.shell( "cd repo && pytest tests/ -v" )
.depends( "checkout" )
)
# Build Docker image
.step( "build" , callback = lambda s :
s.shell( "cd repo && docker build -t $ {SERVICE_NAME} :$ {VERSION} ." )
.depends( "test" )
)
# Push to registry
.step( "push" , callback = lambda s :
s.shell( "docker push $ {SERVICE_NAME} :$ {VERSION} " )
.depends( "build" )
)
# Deploy to Kubernetes
.step( "deploy" , callback = lambda s :
s.shell( "kubectl set image deployment/$ {SERVICE_NAME} $ {SERVICE_NAME} =$ {SERVICE_NAME} :$ {VERSION} " )
.depends( "push" )
)
# Verify deployment
.step( "verify" , callback = lambda s :
s.shell( "kubectl rollout status deployment/$ {SERVICE_NAME} " )
.depends( "deploy" )
)
)
# Execute the pipeline
for event in client.execute_workflow(cicd_pipeline.to_dict(), stream = True ):
print (event)
Using Docker Containers
Run steps in Docker containers:
from kubiya.dsl import workflow
wf = (
workflow( "docker-workflow" )
.description( "Workflow using Docker containers" )
.step( "python-task" , callback = lambda s :
s.docker(
image = "python:3.11-slim" ,
content = """
#!/usr/bin/env python3
import sys
print(f"Python version: {sys.version} ")
print("Running in Docker!")
"""
)
)
)
for event in client.execute_workflow(wf.to_dict(), stream = True ):
print (event)
Async Execution
Use the async client for non-blocking execution:
from kubiya import StreamingKubiyaClient
from kubiya.dsl import workflow
import asyncio
async def main ():
wf = (
workflow( "async-workflow" )
.description( "Async workflow execution" )
.step( "task1" , "echo 'Task 1'" )
.step( "task2" , "echo 'Task 2'" )
)
async with StreamingKubiyaClient( api_key = "your-api-key" ) as client:
async for event in client.execute_workflow_stream(wf.to_dict()):
print ( f "Event: { event } " )
# Run the async workflow
asyncio.run(main())
Best Practices
Always use clear, descriptive names for workflows and steps: workflow( "deploy-production-api" ) # Good
workflow( "deploy" ) # Too vague
Include descriptions for workflows and complex steps: wf = (
workflow( "data-pipeline" )
.description( "ETL pipeline for customer data processing" )
)
Always include error handling in production workflows: try :
for event in client.execute_workflow(wf.to_dict(), stream = True ):
print (event)
except Exception as e:
print ( f "Workflow failed: { e } " )
# Handle the error appropriately
Make workflows reusable with parameters: wf = (
workflow( "deploy" )
.params(
ENVIRONMENT = "staging" ,
VERSION = "latest"
)
)
Next Steps