Installation
Prerequisites
- Python 3.8+ (Python 3.10+ recommended)
- pip or poetry for package management
- API access to Kubiya platform
Install the SDK
Copy
Ask AI
# Install from PyPI
pip install kubiya-sdk
# Or with optional dependencies
pip install kubiya-sdk[async,dev]
Verify Installation
Copy
Ask AI
import kubiya_sdk
print(f"Kubiya SDK version: {kubiya_sdk.__version__}")
Authentication
Get Your API Key
- Visit compose.kubiya.ai
- Go to Settings โ API Keys
- Generate a new key for your application
Set Up Authentication
Copy
Ask AI
# Add to your shell profile or .env file
export KUBIYA_API_KEY="your-api-key-here"
export KUBIYA_ORG="your-organization" # Optional
Your First Automation
Simple Workflow Execution
Letโs start with a basic example that checks system health:Copy
Ask AI
from kubiya_sdk import Kubiya
# Initialize the client
client = Kubiya()
# Generate and execute a workflow from natural language
result = client.compose(
goal="Check the health of all services in production",
mode="act", # Execute immediately
stream=False # Get final result
)
print(f"Workflow Status: {result.status}")
print(f"Execution ID: {result.execution_id}")
# Access workflow results
if result.status == "completed":
for step in result.steps:
print(f"Step: {step.name} - Status: {step.status}")
if step.outputs:
print(f"Output: {step.outputs}")
Streaming Workflow Generation
For real-time feedback during workflow generation and execution:Copy
Ask AI
from kubiya_sdk import Kubiya
client = Kubiya()
# Stream workflow generation
for event in client.compose(
goal="Deploy frontend service v2.1.0 to staging",
mode="act",
stream=True
):
if event.type == "workflow_generated":
print(f"Generated workflow: {event.data.workflow.name}")
elif event.type == "step_started":
print(f"Starting: {event.data.step_name}")
elif event.type == "step_completed":
print(f"Completed: {event.data.step_name}")
elif event.type == "execution_completed":
print(f"Workflow finished: {event.data.status}")
break
Building Custom Workflows
Workflow Definition DSL
Create workflows using Pythonโs expressive syntax:Copy
Ask AI
from kubiya_sdk import Workflow, Step, Parallel
from kubiya_sdk.tools import KubectlTool, SlackTool, DatadogTool
def create_deployment_workflow():
"""Creates a safe deployment workflow with monitoring"""
workflow = Workflow(
name="safe-microservice-deployment",
description="Deploy microservice with health checks and rollback"
)
# Pre-deployment validation
pre_checks = Parallel([
Step("validate-config").tool(KubectlTool()).command([
"kubectl", "apply", "--dry-run=client", "-f", "${CONFIG_FILE}"
]),
Step("check-dependencies").tool(KubectlTool()).command([
"kubectl", "get", "pods", "-l", "app=${DEPENDENCY_SERVICE}"
]),
Step("verify-no-incidents").tool(DatadogTool()).query(
"sum:incident.active{env:production}"
)
])
# Deployment step with conditions
deployment = Step("deploy-service").tool(KubectlTool()).command([
"kubectl", "apply", "-f", "${CONFIG_FILE}"
]).condition(
lambda ctx: ctx.pre_checks.all_successful()
)
# Health monitoring
health_check = Step("health-monitoring").tool("health-checker").config({
"service": "${SERVICE_NAME}",
"timeout": "5m",
"success_threshold": 3,
"failure_threshold": 1
})
# Rollback on failure
rollback = Step("automatic-rollback").tool(KubectlTool()).command([
"kubectl", "rollout", "undo", "deployment/${SERVICE_NAME}"
]).condition(
lambda ctx: ctx.health_check.failed()
)
# Notification
notify = Step("notify-team").tool(SlackTool()).message(
channel="#deployments",
text="๐ Deployment of ${SERVICE_NAME} ${STATUS}: ${RESULT_SUMMARY}"
)
# Wire up the workflow
workflow.add_steps([
pre_checks,
deployment.depends_on(pre_checks),
health_check.depends_on(deployment),
rollback.depends_on(health_check),
notify.depends_on([health_check, rollback])
])
return workflow
# Create and register the workflow
workflow = create_deployment_workflow()
client.workflows.create(workflow)
Parameterized Workflows
Make workflows reusable with parameters:Copy
Ask AI
from kubiya_sdk import Workflow, Parameter
def database_backup_workflow():
"""Parameterized database backup workflow"""
workflow = Workflow(
name="database-backup",
description="Backup database with configurable retention",
parameters=[
Parameter("database_name", type="string", required=True),
Parameter("environment", type="string", default="staging"),
Parameter("retention_days", type="int", default=30),
Parameter("notify_channel", type="string", default="#ops")
]
)
# Backup step using parameters
backup = Step("create-backup").tool("pg-dump").config({
"host": "${environment}.db.company.com",
"database": "${database_name}",
"output_file": "backup_${database_name}_${timestamp}.sql.gz",
"compression": True
})
# Upload to cloud storage
upload = Step("upload-backup").tool("aws-s3").config({
"bucket": "company-database-backups",
"key": "${environment}/${database_name}/${backup.output_file}",
"storage_class": "GLACIER"
}).depends_on(backup)
# Set retention policy
lifecycle = Step("set-lifecycle").tool("aws-s3-lifecycle").config({
"bucket": "company-database-backups",
"prefix": "${environment}/${database_name}/",
"delete_after_days": "${retention_days}"
}).depends_on(upload)
# Notification
notify = Step("notify-completion").tool("slack").config({
"channel": "${notify_channel}",
"message": "โ
Database backup completed: ${database_name} (${environment})"
}).depends_on(lifecycle)
workflow.add_steps([backup, upload, lifecycle, notify])
return workflow
# Execute with parameters
client.workflows.execute(
"database-backup",
parameters={
"database_name": "user_service_db",
"environment": "production",
"retention_days": 90,
"notify_channel": "#database-ops"
}
)
Advanced SDK Features
Async/Await Support
For high-performance applications, use async operations:Copy
Ask AI
import asyncio
from kubiya_sdk import AsyncKubiya
async def parallel_deployments():
"""Deploy multiple services concurrently"""
client = AsyncKubiya()
# Define multiple deployment tasks
deployment_tasks = [
client.compose(
goal=f"Deploy {service} to production",
mode="act",
stream=False
)
for service in ["frontend", "api", "worker", "scheduler"]
]
# Execute all deployments concurrently
results = await asyncio.gather(*deployment_tasks)
# Process results
for i, result in enumerate(results):
service = ["frontend", "api", "worker", "scheduler"][i]
print(f"{service}: {result.status}")
return results
# Run async workflow
results = asyncio.run(parallel_deployments())
Context Management
Access Kubiyaโs context graph for intelligent automation:Copy
Ask AI
from kubiya_sdk import Kubiya
client = Kubiya()
# Query infrastructure context
context = client.context.query(
resource_type="kubernetes_service",
environment="production",
filters={"team": "payments"}
)
for service in context.resources:
print(f"Service: {service.name}")
print(f"Replicas: {service.metadata.replicas}")
print(f"Dependencies: {service.relationships.depends_on}")
# Check service health based on context
health = client.compose(
goal=f"Check health of {service.name} service",
context={"target_service": service.name},
mode="plan" # Generate workflow without executing
)
if health.workflow:
print(f"Health check steps: {len(health.workflow.steps)}")
Error Handling & Retries
Build robust automations with comprehensive error handling:Copy
Ask AI
from kubiya_sdk import Kubiya, WorkflowError
from kubiya_sdk.retry import exponential_backoff
client = Kubiya()
@exponential_backoff(max_retries=3, base_delay=1.0)
def robust_deployment(service_name, version):
"""Deployment with automatic retry logic"""
try:
result = client.compose(
goal=f"Deploy {service_name} version {version} with health checks",
mode="act",
timeout=1200 # 20 minute timeout
)
if result.status == "completed":
return result
elif result.status == "failed":
# Analyze failure and decide if retry is appropriate
if "temporary" in result.error_message.lower():
raise WorkflowError("Temporary failure, will retry")
else:
raise WorkflowError(f"Permanent failure: {result.error_message}")
except Exception as e:
print(f"Deployment attempt failed: {e}")
raise
# Usage with error handling
try:
result = robust_deployment("payment-service", "v2.1.0")
print(f"Deployment successful: {result.execution_id}")
except WorkflowError as e:
print(f"Deployment failed after retries: {e}")
# Trigger rollback or alert
client.compose(
goal="Rollback payment-service to last stable version",
mode="act"
)
Custom Tools Integration
Create custom tools for your specific needs:Copy
Ask AI
from kubiya_sdk import Tool, Parameter, StepResult
class CustomMonitoringTool(Tool):
"""Custom tool for company-specific monitoring"""
name = "company-monitor"
description = "Monitor internal services and metrics"
parameters = [
Parameter("service", type="string", required=True),
Parameter("metric", type="string", required=True),
Parameter("threshold", type="float", default=80.0)
]
def execute(self, context):
"""Execute the monitoring check"""
service = context.parameters["service"]
metric = context.parameters["metric"]
threshold = context.parameters["threshold"]
# Custom monitoring logic here
current_value = self.get_metric_value(service, metric)
result = StepResult(
success=current_value < threshold,
outputs={
"current_value": current_value,
"threshold": threshold,
"status": "healthy" if current_value < threshold else "unhealthy"
},
logs=[
f"Checking {metric} for {service}",
f"Current value: {current_value}",
f"Threshold: {threshold}"
]
)
return result
def get_metric_value(self, service, metric):
"""Custom logic to retrieve metric value"""
# Implement your monitoring system integration
import requests
response = requests.get(
f"https://monitoring.company.com/api/metrics",
params={"service": service, "metric": metric}
)
return response.json()["value"]
# Register custom tool
client.tools.register(CustomMonitoringTool())
# Use in workflow
workflow = Workflow("service-monitoring")
check = Step("monitor-service").tool("company-monitor").parameters({
"service": "payment-api",
"metric": "cpu_utilization",
"threshold": 75.0
})
workflow.add_step(check)
Integration Patterns
Web Application Integration
Integrate Kubiya into your web applications:Copy
Ask AI
from flask import Flask, request, jsonify
from kubiya_sdk import Kubiya
app = Flask(__name__)
client = Kubiya()
@app.route('/api/deploy', methods=['POST'])
def trigger_deployment():
"""API endpoint to trigger deployments"""
data = request.json
service = data.get('service')
version = data.get('version')
environment = data.get('environment', 'staging')
# Validate request
if not service or not version:
return jsonify({"error": "Service and version required"}), 400
try:
# Generate deployment workflow
result = client.compose(
goal=f"Deploy {service} version {version} to {environment}",
mode="plan", # Generate workflow, don't execute yet
stream=False
)
return jsonify({
"workflow_id": result.workflow_id,
"steps": len(result.workflow.steps),
"estimated_duration": result.estimated_duration,
"execute_url": f"/api/execute/{result.workflow_id}"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/execute/<workflow_id>', methods=['POST'])
def execute_workflow(workflow_id):
"""Execute a previously generated workflow"""
try:
# Execute the workflow
execution = client.workflows.execute(workflow_id)
return jsonify({
"execution_id": execution.id,
"status": execution.status,
"monitor_url": f"/api/status/{execution.id}"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/status/<execution_id>')
def get_execution_status(execution_id):
"""Get execution status and results"""
try:
execution = client.executions.get(execution_id)
return jsonify({
"status": execution.status,
"progress": execution.progress_percent,
"current_step": execution.current_step,
"logs": execution.logs[-10:], # Last 10 log entries
"completed": execution.status in ["completed", "failed"]
})
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
CI/CD Pipeline Integration
Use the SDK in your CI/CD pipelines:Copy
Ask AI
#!/usr/bin/env python3
"""
CI/CD Pipeline Integration Script
Usage: python deploy_pipeline.py --service frontend --version $CI_COMMIT_TAG
"""
import argparse
import sys
import os
from kubiya_sdk import Kubiya
def main():
parser = argparse.ArgumentParser(description='Deploy service via Kubiya')
parser.add_argument('--service', required=True, help='Service name')
parser.add_argument('--version', required=True, help='Version to deploy')
parser.add_argument('--environment', default='staging', help='Target environment')
parser.add_argument('--wait', action='store_true', help='Wait for completion')
args = parser.parse_args()
# Initialize Kubiya client
client = Kubiya(
api_key=os.environ['KUBIYA_API_KEY'],
organization=os.environ.get('KUBIYA_ORG')
)
try:
# Generate and execute deployment
result = client.compose(
goal=f"""
Deploy {args.service} version {args.version} to {args.environment}:
1. Validate deployment prerequisites
2. Execute rolling deployment with health checks
3. Run smoke tests after deployment
4. Send notification to team
""",
mode="act",
stream=True if args.wait else False
)
if args.wait:
# Stream execution progress
for event in result:
if event.type == "step_started":
print(f"โณ {event.data.step_name}")
elif event.type == "step_completed":
print(f"โ
{event.data.step_name}")
elif event.type == "step_failed":
print(f"โ {event.data.step_name}: {event.data.error}")
elif event.type == "execution_completed":
if event.data.status == "completed":
print(f"๐ Deployment successful!")
sys.exit(0)
else:
print(f"๐ฅ Deployment failed: {event.data.error}")
sys.exit(1)
else:
print(f"Deployment initiated: {result.execution_id}")
print(f"Monitor at: https://compose.kubiya.ai/executions/{result.execution_id}")
except Exception as e:
print(f"Failed to deploy: {e}")
sys.exit(1)
if __name__ == '__main__':
main()
Testing Your Automations
Unit Testing Workflows
Copy
Ask AI
import unittest
from unittest.mock import Mock, patch
from kubiya_sdk import Workflow, Step
from kubiya_sdk.testing import WorkflowTestCase
class TestDeploymentWorkflow(WorkflowTestCase):
def setUp(self):
self.workflow = Workflow("test-deployment")
self.workflow.add_step(
Step("deploy").tool("kubectl").command([
"kubectl", "apply", "-f", "${CONFIG_FILE}"
])
)
@patch('kubiya_sdk.runners.KubernetesRunner')
def test_successful_deployment(self, mock_runner):
"""Test successful deployment scenario"""
# Mock successful kubectl response
mock_runner.return_value.execute.return_value.success = True
mock_runner.return_value.execute.return_value.outputs = {
"deployment": "frontend",
"status": "applied"
}
# Execute workflow
result = self.execute_workflow(
self.workflow,
parameters={"CONFIG_FILE": "test-deployment.yaml"}
)
# Assertions
self.assertTrue(result.success)
self.assertEqual(result.status, "completed")
self.assertEqual(len(result.executed_steps), 1)
def test_deployment_validation(self):
"""Test workflow parameter validation"""
with self.assertRaises(ValueError):
self.execute_workflow(self.workflow, parameters={}) # Missing CONFIG_FILE
if __name__ == '__main__':
unittest.main()
Production Safety: Always test your SDK integrations in staging environments first. Use parameterized workflows to avoid hardcoded production values in your code.
Whatโs Next?
๐ Youโre now ready to build with the Kubiya SDK! Continue your journey:Explore Core Concepts
Understand the platform architecture powering your automations
How-To Guides
Learn advanced SDK patterns and best practices
API Reference
Complete SDK API documentation and examples
Example Library
Browse real-world automation examples and templates
Pro tip: Start with simple, read-only automations (status checks, metrics gathering) before building complex workflows. This helps you understand the SDK patterns and build confidence with the platform.