SDK API Reference
Complete reference for all SDK classes, methods, and functions.
Client
The main entry point for interacting with Kubiya API.
Class: Client
from kubiya_workflow_sdk import Client
client = Client(
api_key: str = None,
api_url: str = None,
org_name: str = None,
timeout: int = 30,
retry_count: int = 3,
verify_ssl: bool = True,
proxy: str = None
)
Parameters
Parameter | Type | Default | Description |
---|
api_key | str | None | Kubiya API key. If not provided, reads from KUBIYA_API_KEY env var |
api_url | str | "https://api.kubiya.ai" | API endpoint URL |
org_name | str | None | Organization name for multi-org accounts |
timeout | int | 30 | Request timeout in seconds |
retry_count | int | 3 | Number of retries for failed requests |
verify_ssl | bool | True | Whether to verify SSL certificates |
proxy | str | None | HTTP/SOCKS proxy URL |
Methods
execute_workflow(workflow, params=None, stream=False, runner=None)
Execute a workflow.
result = client.execute_workflow(
workflow=my_workflow,
params={"env": "production"},
stream=True,
runner="my-custom-runner" # Optional: specify a runner created via Kubiya platform
)
Parameters:
workflow
: Workflow object or dict
params
: Dict of parameters to pass to workflow
stream
: Boolean to enable streaming response
runner
: String name of the runner (created via Kubiya platform). Defaults to “kubiya-hosted”
Returns: ExecutionResult
or generator of StreamEvent
Runners must be created through the Kubiya platform (web interface or API). The platform will provide a Kubernetes manifest or Helm chart to deploy the runner in your infrastructure.
list_workflows()
List all workflows in the organization.
workflows = client.list_workflows()
for wf in workflows:
print(f"{wf.name}: {wf.description}")
get_workflow(name)
Get a specific workflow by name.
workflow = client.get_workflow("data-pipeline")
test_connection()
Test API connectivity and authentication.
if client.test_connection():
print("Connected successfully!")
Workflow
Class: Workflow
from kubiya_workflow_sdk import Workflow
workflow = Workflow(
name: str,
description: str = None,
version: str = "1.0.0",
steps: List[Step] = None,
parameters: Dict[str, Parameter] = None,
runner: str = "kubiya-hosted",
tags: List[str] = None,
metadata: Dict[str, Any] = None
)
Methods
add_step(step)
Add a step to the workflow.
workflow.add_step(
Step(name="process", image="python:3.11", command="process.py")
)
to_dict()
Convert workflow to dictionary representation.
workflow_dict = workflow.to_dict()
to_yaml()
Convert workflow to YAML string.
yaml_string = workflow.to_yaml()
print(yaml_string)
validate()
Validate workflow structure and dependencies.
errors = workflow.validate()
if errors:
print(f"Validation errors: {errors}")
execute(client=None, **kwargs)
Execute the workflow.
result = workflow.execute(
client=client,
params={"env": "staging"}
)
Step
Class: Step
from kubiya_workflow_sdk import Step
step = Step(
name: str,
image: str = None,
command: str = None,
script: str = None,
env: Dict[str, str] = None,
volumes: Dict[str, str] = None,
resources: Dict[str, Any] = None,
depends: List[str] = None,
retry: RetryPolicy = None,
timeout: str = "5m",
when: str = None,
outputs: Dict[str, str] = None
)
Properties
Property | Type | Description |
---|
name | str | Unique step identifier |
image | str | Docker image to use |
command | str | Command to execute |
script | str | Script content to run |
env | Dict[str, str] | Environment variables |
volumes | Dict[str, str] | Volume mounts |
resources | Dict[str, Any] | Resource limits/requests |
depends | List[str] | Step dependencies |
retry | RetryPolicy | Retry configuration |
timeout | str | Execution timeout |
when | str | Conditional execution |
outputs | Dict[str, str] | Output mappings |
DSL Functions
workflow
Decorator
from kubiya_workflow_sdk.dsl import workflow
@workflow(
name: str = None,
description: str = None,
runner: str = "kubiya-hosted",
version: str = "1.0.0"
)
def my_workflow():
# Workflow logic
pass
step
Functions
step.shell(command, **kwargs)
Execute a shell command.
result = step.shell(
"echo 'Hello World'",
name="greeting",
image="alpine:latest"
)
step.python(script, **kwargs)
Execute Python code.
output = step.python(
"""
import pandas as pd
df = pd.read_csv('data.csv')
print(df.describe())
""",
name="analyze",
packages=["pandas", "numpy"]
)
step.container(image, **kwargs)
Run a container with advanced options.
step.container(
image="nginx:alpine",
name="webserver",
ports={"80": "8080"},
health_check={
"test": ["CMD", "curl", "-f", "http://localhost/"],
"interval": "30s"
}
)
step.inline_agent(message, **kwargs)
Execute an inline AI agent.
analysis = step.inline_agent(
message="Analyze these logs: ${logs}",
runners=["kubiya-hosted"],
llm_model="gpt-4",
tools=[{
"name": "parse-logs",
"type": "docker",
"image": "log-parser:latest"
}]
)
Providers
Function: get_provider(name, **kwargs)
Get a workflow provider instance.
from kubiya_workflow_sdk.providers import get_provider
provider = get_provider(
"adk",
api_key="...",
model="gemini-1.5-pro"
)
Class: BaseProvider
Base class for custom providers.
from kubiya_workflow_sdk.providers import BaseProvider
class CustomProvider(BaseProvider):
async def compose(self, task: str, mode: str = "plan", **kwargs):
# Implementation
pass
async def execute_workflow(self, workflow, **kwargs):
# Implementation
pass
Streaming
Class: StreamEvent
@dataclass
class StreamEvent:
type: str # "step.started", "log", "step.completed", etc.
timestamp: float
step_name: str = None
data: Dict[str, Any] = None
error: str = None
Stream Event Types
Type | Description | Data Fields |
---|
workflow.started | Workflow execution started | workflow_id , name |
step.started | Step execution started | step_name , image |
log | Log output | message , level |
step.completed | Step finished | step_name , exit_code , duration |
workflow.completed | Workflow finished | workflow_id , status |
error | Error occurred | message , step_name |
Errors
Exception Classes
KubiyaError
Base exception for all SDK errors.
from kubiya_workflow_sdk.errors import KubiyaError
try:
client.execute_workflow(workflow)
except KubiyaError as e:
print(f"Kubiya error: {e}")
AuthenticationError
Raised for authentication failures.
from kubiya_workflow_sdk.errors import AuthenticationError
try:
client = Client(api_key="invalid")
except AuthenticationError as e:
print("Invalid API key")
ValidationError
Raised for workflow validation failures.
from kubiya_workflow_sdk.errors import ValidationError
try:
workflow.validate()
except ValidationError as e:
print(f"Validation failed: {e.errors}")
ExecutionError
Raised for workflow execution failures.
from kubiya_workflow_sdk.errors import ExecutionError
try:
result = workflow.execute()
except ExecutionError as e:
print(f"Execution failed at step: {e.step_name}")
Utilities
Function: load_workflow(path)
Load workflow from YAML file.
from kubiya_workflow_sdk.utils import load_workflow
workflow = load_workflow("workflows/pipeline.yaml")
Function: save_workflow(workflow, path)
Save workflow to YAML file.
from kubiya_workflow_sdk.utils import save_workflow
save_workflow(workflow, "workflows/pipeline.yaml")
Function: merge_workflows(*workflows)
Merge multiple workflows into one.
from kubiya_workflow_sdk.utils import merge_workflows
combined = merge_workflows(etl_workflow, ml_workflow)
Type Definitions
RetryPolicy
from kubiya_workflow_sdk.types import RetryPolicy
retry = RetryPolicy(
max_attempts: int = 3,
backoff: str = "exponential", # "constant", "linear", "exponential"
initial_delay: str = "1s",
max_delay: str = "5m"
)
ResourceSpec
from kubiya_workflow_sdk.types import ResourceSpec
resources = ResourceSpec(
requests={"cpu": "1", "memory": "2Gi"},
limits={"cpu": "2", "memory": "4Gi"},
gpus=1 # Optional GPU request
)
Parameter
from kubiya_workflow_sdk.types import Parameter
param = Parameter(
name="environment",
type="string",
default="staging",
choices=["dev", "staging", "production"],
required=False,
description="Target environment"
)
Examples
Basic Workflow
from kubiya_workflow_sdk import Client, Workflow, Step
# Create workflow
workflow = Workflow(
name="hello-world",
steps=[
Step(name="greet", image="alpine", command="echo 'Hello!'")
]
)
# Execute
client = Client()
result = client.execute_workflow(workflow)
print(result.output)
Streaming Execution
# Stream execution events
for event in client.execute_workflow(workflow, stream=True):
if event.type == "log":
print(event.data["message"])
elif event.type == "step.completed":
print(f"Step {event.step_name} completed")
Error Handling
from kubiya_workflow_sdk.errors import ExecutionError, StepError
try:
result = workflow.execute()
except StepError as e:
print(f"Step {e.step_name} failed: {e.message}")
# Handle step failure
except ExecutionError as e:
print(f"Workflow failed: {e}")
# Handle workflow failure
Version Compatibility
SDK Version | API Version | Python Version |
---|
2.0.x | v2 | 3.8+ |
1.5.x | v1 | 3.7+ |
1.0.x | v1 | 3.6+ |
Next Steps
Responses are generated using AI and may contain mistakes.