The Kubiya SDK enables you to build custom automations, integrate with existing applications, and create intelligent workflows programmatically. Perfect for platform engineers and developers who want to embed automation capabilities into their systems.

Installation

Prerequisites

  • Python 3.8+ (Python 3.10+ recommended)
  • pip or poetry for package management
  • API access to Kubiya platform

Install the SDK

# Install from PyPI
pip install kubiya-sdk

# Or with optional dependencies
pip install kubiya-sdk[async,dev]

Verify Installation

import kubiya_sdk
print(f"Kubiya SDK version: {kubiya_sdk.__version__}")

Authentication

Get Your API Key

  1. Visit compose.kubiya.ai
  2. Go to Settings โ†’ API Keys
  3. Generate a new key for your application

Set Up Authentication

# Add to your shell profile or .env file
export KUBIYA_API_KEY="your-api-key-here"
export KUBIYA_ORG="your-organization"  # Optional

Your First Automation

Simple Workflow Execution

Letโ€™s start with a basic example that checks system health:
from kubiya_sdk import Kubiya

# Initialize the client
client = Kubiya()

# Generate and execute a workflow from natural language
result = client.compose(
    goal="Check the health of all services in production",
    mode="act",  # Execute immediately
    stream=False  # Get final result
)

print(f"Workflow Status: {result.status}")
print(f"Execution ID: {result.execution_id}")

# Access workflow results
if result.status == "completed":
    for step in result.steps:
        print(f"Step: {step.name} - Status: {step.status}")
        if step.outputs:
            print(f"Output: {step.outputs}")

Streaming Workflow Generation

For real-time feedback during workflow generation and execution:
from kubiya_sdk import Kubiya

client = Kubiya()

# Stream workflow generation
for event in client.compose(
    goal="Deploy frontend service v2.1.0 to staging",
    mode="act",
    stream=True
):
    if event.type == "workflow_generated":
        print(f"Generated workflow: {event.data.workflow.name}")
    elif event.type == "step_started":
        print(f"Starting: {event.data.step_name}")
    elif event.type == "step_completed":
        print(f"Completed: {event.data.step_name}")
    elif event.type == "execution_completed":
        print(f"Workflow finished: {event.data.status}")
        break

Building Custom Workflows

Workflow Definition DSL

Create workflows using Pythonโ€™s expressive syntax:
from kubiya_sdk import Workflow, Step, Parallel
from kubiya_sdk.tools import KubectlTool, SlackTool, DatadogTool

def create_deployment_workflow():
    """Creates a safe deployment workflow with monitoring"""
    
    workflow = Workflow(
        name="safe-microservice-deployment",
        description="Deploy microservice with health checks and rollback"
    )
    
    # Pre-deployment validation
    pre_checks = Parallel([
        Step("validate-config").tool(KubectlTool()).command([
            "kubectl", "apply", "--dry-run=client", "-f", "${CONFIG_FILE}"
        ]),
        Step("check-dependencies").tool(KubectlTool()).command([
            "kubectl", "get", "pods", "-l", "app=${DEPENDENCY_SERVICE}"
        ]),
        Step("verify-no-incidents").tool(DatadogTool()).query(
            "sum:incident.active{env:production}"
        )
    ])
    
    # Deployment step with conditions
    deployment = Step("deploy-service").tool(KubectlTool()).command([
        "kubectl", "apply", "-f", "${CONFIG_FILE}"
    ]).condition(
        lambda ctx: ctx.pre_checks.all_successful()
    )
    
    # Health monitoring
    health_check = Step("health-monitoring").tool("health-checker").config({
        "service": "${SERVICE_NAME}",
        "timeout": "5m",
        "success_threshold": 3,
        "failure_threshold": 1
    })
    
    # Rollback on failure
    rollback = Step("automatic-rollback").tool(KubectlTool()).command([
        "kubectl", "rollout", "undo", "deployment/${SERVICE_NAME}"
    ]).condition(
        lambda ctx: ctx.health_check.failed()
    )
    
    # Notification
    notify = Step("notify-team").tool(SlackTool()).message(
        channel="#deployments",
        text="๐Ÿš€ Deployment of ${SERVICE_NAME} ${STATUS}: ${RESULT_SUMMARY}"
    )
    
    # Wire up the workflow
    workflow.add_steps([
        pre_checks,
        deployment.depends_on(pre_checks),
        health_check.depends_on(deployment),
        rollback.depends_on(health_check),
        notify.depends_on([health_check, rollback])
    ])
    
    return workflow

# Create and register the workflow
workflow = create_deployment_workflow()
client.workflows.create(workflow)

Parameterized Workflows

Make workflows reusable with parameters:
from kubiya_sdk import Workflow, Parameter

def database_backup_workflow():
    """Parameterized database backup workflow"""
    
    workflow = Workflow(
        name="database-backup",
        description="Backup database with configurable retention",
        parameters=[
            Parameter("database_name", type="string", required=True),
            Parameter("environment", type="string", default="staging"),
            Parameter("retention_days", type="int", default=30),
            Parameter("notify_channel", type="string", default="#ops")
        ]
    )
    
    # Backup step using parameters
    backup = Step("create-backup").tool("pg-dump").config({
        "host": "${environment}.db.company.com",
        "database": "${database_name}",
        "output_file": "backup_${database_name}_${timestamp}.sql.gz",
        "compression": True
    })
    
    # Upload to cloud storage
    upload = Step("upload-backup").tool("aws-s3").config({
        "bucket": "company-database-backups",
        "key": "${environment}/${database_name}/${backup.output_file}",
        "storage_class": "GLACIER"
    }).depends_on(backup)
    
    # Set retention policy
    lifecycle = Step("set-lifecycle").tool("aws-s3-lifecycle").config({
        "bucket": "company-database-backups",
        "prefix": "${environment}/${database_name}/",
        "delete_after_days": "${retention_days}"
    }).depends_on(upload)
    
    # Notification
    notify = Step("notify-completion").tool("slack").config({
        "channel": "${notify_channel}",
        "message": "โœ… Database backup completed: ${database_name} (${environment})"
    }).depends_on(lifecycle)
    
    workflow.add_steps([backup, upload, lifecycle, notify])
    return workflow

# Execute with parameters
client.workflows.execute(
    "database-backup",
    parameters={
        "database_name": "user_service_db",
        "environment": "production", 
        "retention_days": 90,
        "notify_channel": "#database-ops"
    }
)

Advanced SDK Features

Async/Await Support

For high-performance applications, use async operations:
import asyncio
from kubiya_sdk import AsyncKubiya

async def parallel_deployments():
    """Deploy multiple services concurrently"""
    
    client = AsyncKubiya()
    
    # Define multiple deployment tasks
    deployment_tasks = [
        client.compose(
            goal=f"Deploy {service} to production",
            mode="act",
            stream=False
        )
        for service in ["frontend", "api", "worker", "scheduler"]
    ]
    
    # Execute all deployments concurrently
    results = await asyncio.gather(*deployment_tasks)
    
    # Process results
    for i, result in enumerate(results):
        service = ["frontend", "api", "worker", "scheduler"][i]
        print(f"{service}: {result.status}")
        
    return results

# Run async workflow
results = asyncio.run(parallel_deployments())

Context Management

Access Kubiyaโ€™s context graph for intelligent automation:
from kubiya_sdk import Kubiya

client = Kubiya()

# Query infrastructure context
context = client.context.query(
    resource_type="kubernetes_service",
    environment="production",
    filters={"team": "payments"}
)

for service in context.resources:
    print(f"Service: {service.name}")
    print(f"Replicas: {service.metadata.replicas}")
    print(f"Dependencies: {service.relationships.depends_on}")
    
    # Check service health based on context
    health = client.compose(
        goal=f"Check health of {service.name} service",
        context={"target_service": service.name},
        mode="plan"  # Generate workflow without executing
    )
    
    if health.workflow:
        print(f"Health check steps: {len(health.workflow.steps)}")

Error Handling & Retries

Build robust automations with comprehensive error handling:
from kubiya_sdk import Kubiya, WorkflowError
from kubiya_sdk.retry import exponential_backoff

client = Kubiya()

@exponential_backoff(max_retries=3, base_delay=1.0)
def robust_deployment(service_name, version):
    """Deployment with automatic retry logic"""
    
    try:
        result = client.compose(
            goal=f"Deploy {service_name} version {version} with health checks",
            mode="act",
            timeout=1200  # 20 minute timeout
        )
        
        if result.status == "completed":
            return result
        elif result.status == "failed":
            # Analyze failure and decide if retry is appropriate
            if "temporary" in result.error_message.lower():
                raise WorkflowError("Temporary failure, will retry")
            else:
                raise WorkflowError(f"Permanent failure: {result.error_message}")
                
    except Exception as e:
        print(f"Deployment attempt failed: {e}")
        raise

# Usage with error handling
try:
    result = robust_deployment("payment-service", "v2.1.0")
    print(f"Deployment successful: {result.execution_id}")
except WorkflowError as e:
    print(f"Deployment failed after retries: {e}")
    # Trigger rollback or alert
    client.compose(
        goal="Rollback payment-service to last stable version",
        mode="act"
    )

Custom Tools Integration

Create custom tools for your specific needs:
from kubiya_sdk import Tool, Parameter, StepResult

class CustomMonitoringTool(Tool):
    """Custom tool for company-specific monitoring"""
    
    name = "company-monitor"
    description = "Monitor internal services and metrics"
    
    parameters = [
        Parameter("service", type="string", required=True),
        Parameter("metric", type="string", required=True),
        Parameter("threshold", type="float", default=80.0)
    ]
    
    def execute(self, context):
        """Execute the monitoring check"""
        service = context.parameters["service"]
        metric = context.parameters["metric"]
        threshold = context.parameters["threshold"]
        
        # Custom monitoring logic here
        current_value = self.get_metric_value(service, metric)
        
        result = StepResult(
            success=current_value < threshold,
            outputs={
                "current_value": current_value,
                "threshold": threshold,
                "status": "healthy" if current_value < threshold else "unhealthy"
            },
            logs=[
                f"Checking {metric} for {service}",
                f"Current value: {current_value}",
                f"Threshold: {threshold}"
            ]
        )
        
        return result
    
    def get_metric_value(self, service, metric):
        """Custom logic to retrieve metric value"""
        # Implement your monitoring system integration
        import requests
        
        response = requests.get(
            f"https://monitoring.company.com/api/metrics",
            params={"service": service, "metric": metric}
        )
        
        return response.json()["value"]

# Register custom tool
client.tools.register(CustomMonitoringTool())

# Use in workflow
workflow = Workflow("service-monitoring")
check = Step("monitor-service").tool("company-monitor").parameters({
    "service": "payment-api",
    "metric": "cpu_utilization",
    "threshold": 75.0
})
workflow.add_step(check)

Integration Patterns

Web Application Integration

Integrate Kubiya into your web applications:
from flask import Flask, request, jsonify
from kubiya_sdk import Kubiya

app = Flask(__name__)
client = Kubiya()

@app.route('/api/deploy', methods=['POST'])
def trigger_deployment():
    """API endpoint to trigger deployments"""
    
    data = request.json
    service = data.get('service')
    version = data.get('version')
    environment = data.get('environment', 'staging')
    
    # Validate request
    if not service or not version:
        return jsonify({"error": "Service and version required"}), 400
    
    try:
        # Generate deployment workflow
        result = client.compose(
            goal=f"Deploy {service} version {version} to {environment}",
            mode="plan",  # Generate workflow, don't execute yet
            stream=False
        )
        
        return jsonify({
            "workflow_id": result.workflow_id,
            "steps": len(result.workflow.steps),
            "estimated_duration": result.estimated_duration,
            "execute_url": f"/api/execute/{result.workflow_id}"
        })
        
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/execute/<workflow_id>', methods=['POST'])
def execute_workflow(workflow_id):
    """Execute a previously generated workflow"""
    
    try:
        # Execute the workflow
        execution = client.workflows.execute(workflow_id)
        
        return jsonify({
            "execution_id": execution.id,
            "status": execution.status,
            "monitor_url": f"/api/status/{execution.id}"
        })
        
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/status/<execution_id>')
def get_execution_status(execution_id):
    """Get execution status and results"""
    
    try:
        execution = client.executions.get(execution_id)
        
        return jsonify({
            "status": execution.status,
            "progress": execution.progress_percent,
            "current_step": execution.current_step,
            "logs": execution.logs[-10:],  # Last 10 log entries
            "completed": execution.status in ["completed", "failed"]
        })
        
    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

CI/CD Pipeline Integration

Use the SDK in your CI/CD pipelines:
#!/usr/bin/env python3
"""
CI/CD Pipeline Integration Script
Usage: python deploy_pipeline.py --service frontend --version $CI_COMMIT_TAG
"""

import argparse
import sys
import os
from kubiya_sdk import Kubiya

def main():
    parser = argparse.ArgumentParser(description='Deploy service via Kubiya')
    parser.add_argument('--service', required=True, help='Service name')
    parser.add_argument('--version', required=True, help='Version to deploy')
    parser.add_argument('--environment', default='staging', help='Target environment')
    parser.add_argument('--wait', action='store_true', help='Wait for completion')
    
    args = parser.parse_args()
    
    # Initialize Kubiya client
    client = Kubiya(
        api_key=os.environ['KUBIYA_API_KEY'],
        organization=os.environ.get('KUBIYA_ORG')
    )
    
    try:
        # Generate and execute deployment
        result = client.compose(
            goal=f"""
            Deploy {args.service} version {args.version} to {args.environment}:
            1. Validate deployment prerequisites  
            2. Execute rolling deployment with health checks
            3. Run smoke tests after deployment
            4. Send notification to team
            """,
            mode="act",
            stream=True if args.wait else False
        )
        
        if args.wait:
            # Stream execution progress
            for event in result:
                if event.type == "step_started":
                    print(f"โณ {event.data.step_name}")
                elif event.type == "step_completed":
                    print(f"โœ… {event.data.step_name}")
                elif event.type == "step_failed":
                    print(f"โŒ {event.data.step_name}: {event.data.error}")
                elif event.type == "execution_completed":
                    if event.data.status == "completed":
                        print(f"๐ŸŽ‰ Deployment successful!")
                        sys.exit(0)
                    else:
                        print(f"๐Ÿ’ฅ Deployment failed: {event.data.error}")
                        sys.exit(1)
        else:
            print(f"Deployment initiated: {result.execution_id}")
            print(f"Monitor at: https://compose.kubiya.ai/executions/{result.execution_id}")
            
    except Exception as e:
        print(f"Failed to deploy: {e}")
        sys.exit(1)

if __name__ == '__main__':
    main()

Testing Your Automations

Unit Testing Workflows

import unittest
from unittest.mock import Mock, patch
from kubiya_sdk import Workflow, Step
from kubiya_sdk.testing import WorkflowTestCase

class TestDeploymentWorkflow(WorkflowTestCase):
    def setUp(self):
        self.workflow = Workflow("test-deployment")
        self.workflow.add_step(
            Step("deploy").tool("kubectl").command([
                "kubectl", "apply", "-f", "${CONFIG_FILE}"
            ])
        )
    
    @patch('kubiya_sdk.runners.KubernetesRunner')
    def test_successful_deployment(self, mock_runner):
        """Test successful deployment scenario"""
        
        # Mock successful kubectl response
        mock_runner.return_value.execute.return_value.success = True
        mock_runner.return_value.execute.return_value.outputs = {
            "deployment": "frontend",
            "status": "applied"
        }
        
        # Execute workflow
        result = self.execute_workflow(
            self.workflow,
            parameters={"CONFIG_FILE": "test-deployment.yaml"}
        )
        
        # Assertions
        self.assertTrue(result.success)
        self.assertEqual(result.status, "completed")
        self.assertEqual(len(result.executed_steps), 1)
    
    def test_deployment_validation(self):
        """Test workflow parameter validation"""
        
        with self.assertRaises(ValueError):
            self.execute_workflow(self.workflow, parameters={})  # Missing CONFIG_FILE

if __name__ == '__main__':
    unittest.main()
Production Safety: Always test your SDK integrations in staging environments first. Use parameterized workflows to avoid hardcoded production values in your code.

Whatโ€™s Next?

๐ŸŽ‰ Youโ€™re now ready to build with the Kubiya SDK! Continue your journey:
Pro tip: Start with simple, read-only automations (status checks, metrics gathering) before building complex workflows. This helps you understand the SDK patterns and build confidence with the platform.