Skip to main content
Get started with the Control Plane SDK to manage agents, integrations, and secrets. This guide uses the current ControlPlaneClient and minimal, factual examples.

Prerequisites

Before you begin, ensure you have:
  • Python 3.8 or higher installed
  • pip package manager
  • A Kubiya API key from the Kubiya platform

Installation

Install the Kubiya SDK using pip:
pip install kubiya-sdk
Or using poetry:
poetry add kubiya-sdk

Setup

Configure API Key

Set your API key as an environment variable:
export KUBIYA_API_KEY="your-api-key-here"
Or use a .env file:
# .env
KUBIYA_API_KEY=your-api-key-here

Initialize the Client

from kubiya import ControlPlaneClient

# Initialize with environment variable
client = ControlPlaneClient()

# Or pass API key directly
client = ControlPlaneClient(api_key="your-api-key-here")
Using environment variables is recommended for security. Never commit API keys to version control.

Create and Manage an Agent

Minimal agent creation:
agent_data = {
    "name": "devops-helper",
    "model_id": "claude-sonnet-4"
}

created = client.agents.create(agent_data)
print(created["agent_id"])
Required fields for create:
  • name: Agent display name
  • model_id: Provider id (e.g., claude-sonnet-4)
Attach skills and integrations later via update() using IDs. Keep creation minimal until the agent is working end-to-end.
List agents:
agents = client.agents.list(limit=10)
for a in agents:
    print(a["name"], a.get("agent_id"))
Update an agent (attach skills/integrations by IDs if needed):
update_data = {
    "skill_ids": ["skill-123"]
}

updated = client.agents.update(created["agent_id"], update_data)
print(updated["agent_id"])

Execute an Agent

Execution requires worker_queue_id, and a prompt.
payload = {
    "worker_queue_id": "default",
    "prompt": "Deploy NGINX to EKS"
}

result = client.agents.execute(created["agent_id"], payload)
print(result.get("status"))
Required fields for execute:
  • worker_queue_id: Queue name (e.g., default)
  • prompt: What the agent should do (task description)
Prefer executing by known agent_id. If you only have a name, first list() and resolve the agent_id to avoid name collisions.

Delete an Agent

resp = client.agents.delete(created["agent_id"])
print("deleted")

Integrations and Secrets

List integrations and get credentials:
integrations = client.integrations.list()
creds = client.integrations.get_integration_credentials(vendor="aws", id="aws")
Some vendors (e.g., github_app, jira) may use a fixed id like "0". Check the list output and use the exact vendor and id returned.
List secrets and get a secret value:
secrets = client.secrets.list()
value = client.secrets.get_value("github-token")
Best practices:
  • Store sensitive values in Secrets and reference them at runtime
  • list() returns metadata only; use get_value(name) to fetch the actual secret

Verify Setup

Quick health check:
status = client.health.check()
print(status)
Expected output: a small dict indicating service health. If the call fails, verify your base_url, API key, and network access.

Workflow DSL

Use the Workflow DSL to define and execute multi-step workflows. Execute a workflow definition:
# Execute an inline workflow definition (dict)
for event in client.workflows.execute(
  workflow_definition={
    # Name and ordered steps
    "name": "deploy-app",
    "steps": [
      {"name": "build", "command": "docker build -t myapp ."},  # build step
      {"name": "deploy", "command": "kubectl apply -f deployment.yaml"}  # deploy step
    ]
  },
  parameters={"environment": "staging"},  # optional runtime parameters
  stream=True
):
  print(event)
Define workflows using the DSL:
from kubiya.dsl import workflow

wf = (
  workflow("hello-kubiya")  # workflow id
    .description("My first Kubiya workflow")
    .step("greet", "echo 'Hello from Kubiya!'")  # shell step
    .step("date", "date")
    .step("system-info", "uname -a")
)

for event in client.execute_workflow(wf.to_dict(), stream=True):  # execute with streaming
  print(event)
Advanced step configuration:
from kubiya.dsl import workflow

wf = workflow("data-processing").params(
  INPUT_FILE="data.csv",
  OUTPUT_DIR="/tmp/processed"
)

wf.step("create-dir", "mkdir -p ${OUTPUT_DIR}")  # prepare output dir

wf.step("process", callback=lambda s:
    s.shell("cat ${INPUT_FILE} | wc -l > ${OUTPUT_DIR}/count.txt")  # count lines
      .depends("create-dir")  # ensure directory exists first
)

wf.step("verify", callback=lambda s:
    s.shell("cat ${OUTPUT_DIR}/count.txt")  # print result
      .depends("process")  # run after processing
)

for event in client.execute_workflow(wf.to_dict(), stream=True):  # execute and stream events
  print(event)

Best Practices

Always use clear, descriptive names for workflows and steps:
workflow("deploy-production-api")  # Good
workflow("deploy")  # Too vague
Include descriptions for workflows and complex steps:
wf = (
    workflow("data-pipeline")
        .description("ETL pipeline for customer data processing")
)
Add error handling around agent operations:
try:
    result = client.agents.execute(agent_id, payload)
    print(result)
except Exception as e:
    print(f"Agent execution failed: {e}")
Make workflows reusable with parameters:
    wf = (
        workflow("deploy")
            .params(
                ENVIRONMENT="staging",
                VERSION="latest"
            )
    )

Next Steps