Skip to main content
Get started with the Kubiya SDK by building and executing your first workflow.

Your First Workflow

Let’s create a simple workflow that demonstrates the core capabilities of the SDK.

Step 1: Initialize the Client

from kubiya import KubiyaClient

# Initialize with your API key
client = KubiyaClient(api_key="your-api-key")
If you’ve set the KUBIYA_API_KEY environment variable, you can simply use client = KubiyaClient()

Step 2: Define a Workflow

from kubiya.dsl import workflow

# Create a simple workflow using the DSL
wf = (
    workflow("hello-kubiya")
        .description("My first Kubiya workflow")
        .step("greet", "echo 'Hello from Kubiya!'")
        .step("date", "date")
        .step("system-info", "uname -a")
)

Step 3: Execute the Workflow

# Execute with streaming to see real-time output
for event in client.execute_workflow(wf.to_dict(), stream=True):
    print(event)

Complete Example

Here’s the full code:
from kubiya import KubiyaClient
from kubiya.dsl import workflow

# Initialize client
client = KubiyaClient(api_key="your-api-key")

# Define workflow
wf = (
    workflow("hello-kubiya")
        .description("My first Kubiya workflow")
        .step("greet", "echo 'Hello from Kubiya!'")
        .step("date", "date")
        .step("system-info", "uname -a")
)

# Execute with streaming
print("🚀 Starting workflow execution...")
for event in client.execute_workflow(wf.to_dict(), stream=True):
    print(f"📋 {event}")
print("✅ Workflow completed!")

Multi-Step Workflow with Dependencies

Let’s create a more complex workflow with step dependencies:
from kubiya.dsl import workflow

wf = (
    workflow("data-processing")
        .description("Process data with multiple steps")
        .params(
            INPUT_FILE="data.csv",
            OUTPUT_DIR="/tmp/processed"
        )
        # Step 1: Create output directory
        .step("create-dir", "mkdir -p ${OUTPUT_DIR}")

        # Step 2: Process data (depends on step 1)
        .step("process", callback=lambda s:
            s.shell("cat ${INPUT_FILE} | wc -l > ${OUTPUT_DIR}/count.txt")
                .depends("create-dir")
        )

        # Step 3: Verify output (depends on step 2)
        .step("verify", callback=lambda s:
            s.shell("cat ${OUTPUT_DIR}/count.txt")
                .depends("process")
        )
)

# Execute
for event in client.execute_workflow(wf.to_dict(), stream=True):
    print(event)

Using the Client SDK

The Client SDK provides direct access to Kubiya platform services:

Managing Agents

# List all agents
agents = client.agents.list(limit=10)
for agent in agents:
    print(f"Agent: {agent['name']} - {agent.get('description', 'No description')}")

# Create a new agent
agent = client.agents.create(
    name="devops-helper",
    description="AI assistant for DevOps tasks",
    llm_model="claude-sonnet-4",
    tools=["kubectl", "terraform"]
)
print(f"Created agent: {agent['uuid']}")

Managing Workflows

# Execute a workflow with parameters
workflow_def = {
    "name": "deploy-app",
    "steps": [
        {"name": "build", "command": "docker build -t myapp ."},
        {"name": "deploy", "command": "kubectl apply -f deployment.yaml"}
    ]
}

for event in client.workflows.execute(
    workflow_definition=workflow_def,
    parameters={"environment": "staging"},
    stream=True
):
    print(f"Status: {event}")

Managing Secrets

# Create a secret
client.secrets.create(
    name="github-token",
    value="ghp_xxxxxxxxxxxx",
    description="GitHub API token"
)

# List secrets
secrets = client.secrets.list()
for secret in secrets:
    print(f"Secret: {secret['name']}")

# Get secret value
token = client.secrets.value("github-token")

Workflow with Python Code

Execute Python code directly in your workflows:
from kubiya.dsl import workflow

wf = (
    workflow("python-workflow")
        .description("Workflow with Python code")

        # Python step
        .step("calculate", callback=lambda s:
            s.python(
                """
colors = ["red", "yellow", "green", "blue"]
for i, color in enumerate(colors, 1):
    print(f"{i}. Color: {color}")
                """
            )
        )

        # Shell step using Python output
        .step("summarize", callback=lambda s:
            s.shell("echo 'Processing complete!'")
                .depends("calculate")
        )
)

for event in client.execute_workflow(wf.to_dict(), stream=True):
    print(event)

Workflow with Outputs and Variables

Capture step outputs and use them in subsequent steps:
from kubiya.dsl import workflow

wf = (
    workflow("data-pipeline")
        .description("Pipeline with outputs")

        # Step 1: Produce data
        .step("fetch-data", callback=lambda s:
            s.shell("echo '42'")
                .output("RESULT_VAR")
        )

        # Step 2: Use the output
        .step("process-data", callback=lambda s:
            s.python("print(int('{{RESULT_VAR}}') * 2)")
                .depends("fetch-data")
        )
)

for event in client.execute_workflow(wf.to_dict(), stream=True):
    print(event)

Advanced Example: CI/CD Pipeline

Here’s a complete CI/CD workflow:
from kubiya.dsl import workflow

cicd_pipeline = (
    workflow("ci-cd-pipeline")
        .description("Complete CI/CD pipeline")
        .params(
            BRANCH="main",
            SERVICE_NAME="my-service",
            VERSION="v1.0.0"
        )

        # Clone repository
        .step("checkout", "git clone -b ${BRANCH} https://github.com/org/repo.git")

        # Run tests
        .step("test", callback=lambda s:
            s.shell("cd repo && pytest tests/ -v")
                .depends("checkout")
        )

        # Build Docker image
        .step("build", callback=lambda s:
            s.shell("cd repo && docker build -t ${SERVICE_NAME}:${VERSION} .")
                .depends("test")
        )

        # Push to registry
        .step("push", callback=lambda s:
            s.shell("docker push ${SERVICE_NAME}:${VERSION}")
                .depends("build")
        )

        # Deploy to Kubernetes
        .step("deploy", callback=lambda s:
            s.shell("kubectl set image deployment/${SERVICE_NAME} ${SERVICE_NAME}=${SERVICE_NAME}:${VERSION}")
                .depends("push")
        )

        # Verify deployment
        .step("verify", callback=lambda s:
            s.shell("kubectl rollout status deployment/${SERVICE_NAME}")
                .depends("deploy")
        )
)

# Execute the pipeline
for event in client.execute_workflow(cicd_pipeline.to_dict(), stream=True):
    print(event)

Using Docker Containers

Run steps in Docker containers:
from kubiya.dsl import workflow

wf = (
    workflow("docker-workflow")
        .description("Workflow using Docker containers")

        .step("python-task", callback=lambda s:
            s.docker(
                image="python:3.11-slim",
                content="""
#!/usr/bin/env python3
import sys
print(f"Python version: {sys.version}")
print("Running in Docker!")
                """
            )
        )
)

for event in client.execute_workflow(wf.to_dict(), stream=True):
    print(event)

Async Execution

Use the async client for non-blocking execution:
from kubiya import StreamingKubiyaClient
from kubiya.dsl import workflow
import asyncio

async def main():
    wf = (
        workflow("async-workflow")
            .description("Async workflow execution")
            .step("task1", "echo 'Task 1'")
            .step("task2", "echo 'Task 2'")
    )

    async with StreamingKubiyaClient(api_key="your-api-key") as client:
        async for event in client.execute_workflow_stream(wf.to_dict()):
            print(f"Event: {event}")

# Run the async workflow
asyncio.run(main())

Best Practices

Always use clear, descriptive names for workflows and steps:
workflow("deploy-production-api")  # Good
workflow("deploy")  # Too vague
Include descriptions for workflows and complex steps:
wf = (
    workflow("data-pipeline")
        .description("ETL pipeline for customer data processing")
)
Always include error handling in production workflows:
try:
    for event in client.execute_workflow(wf.to_dict(), stream=True):
        print(event)
except Exception as e:
    print(f"Workflow failed: {e}")
    # Handle the error appropriately
Make workflows reusable with parameters:
wf = (
    workflow("deploy")
        .params(
            ENVIRONMENT="staging",
            VERSION="latest"
        )
)

Next Steps