Get started with the Control Plane SDK to manage agents, integrations, and secrets. This guide uses the current ControlPlaneClient and minimal, factual examples.
Prerequisites
Before you begin, ensure you have:
Python 3.8 or higher installed
pip package manager
A Kubiya API key from the Kubiya platform
Installation
Install the Kubiya SDK using pip:
Or using poetry:
Setup
Set your API key as an environment variable:
export KUBIYA_API_KEY = "your-api-key-here"
Or use a .env file:
# .env
KUBIYA_API_KEY = your-api-key-here
Initialize the Client
from kubiya import ControlPlaneClient
# Initialize with environment variable
client = ControlPlaneClient()
# Or pass API key directly
client = ControlPlaneClient( api_key = "your-api-key-here" )
Using environment variables is recommended for security. Never commit API keys to version control.
Create and Manage an Agent
Minimal agent creation:
agent_data = {
"name" : "devops-helper" ,
"model_id" : "claude-sonnet-4"
}
created = client.agents.create(agent_data)
print (created[ "agent_id" ])
Required fields for create:
name : Agent display name
model_id : Provider id (e.g., claude-sonnet-4)
Attach skills and integrations later via update() using IDs. Keep creation minimal until the agent is working end-to-end.
List agents:
agents = client.agents.list( limit = 10 )
for a in agents:
print (a[ "name" ], a.get( "agent_id" ))
Update an agent (attach skills/integrations by IDs if needed):
update_data = {
"skill_ids" : [ "skill-123" ]
}
updated = client.agents.update(created[ "agent_id" ], update_data)
print (updated[ "agent_id" ])
Execute an Agent
Execution requires worker_queue_id, and a prompt.
payload = {
"worker_queue_id" : "default" ,
"prompt" : "Deploy NGINX to EKS"
}
result = client.agents.execute(created[ "agent_id" ], payload)
print (result.get( "status" ))
Required fields for execute:
worker_queue_id : Queue name (e.g., default)
prompt : What the agent should do (task description)
Prefer executing by known agent_id. If you only have a name, first list() and resolve the agent_id to avoid name collisions.
Delete an Agent
resp = client.agents.delete(created[ "agent_id" ])
print ( "deleted" )
Integrations and Secrets
List integrations and get credentials:
integrations = client.integrations.list()
creds = client.integrations.get_integration_credentials( vendor = "aws" , id = "aws" )
Some vendors (e.g., github_app, jira) may use a fixed id like "0". Check the list output and use the exact vendor and id returned.
List secrets and get a secret value:
secrets = client.secrets.list()
value = client.secrets.get_value( "github-token" )
Best practices:
Store sensitive values in Secrets and reference them at runtime
list() returns metadata only; use get_value(name) to fetch the actual secret
Verify Setup
Quick health check:
status = client.health.check()
print (status)
Expected output: a small dict indicating service health. If the call fails, verify your base_url, API key, and network access.
Workflow DSL
Use the Workflow DSL to define and execute multi-step workflows.
Execute a workflow definition:
# Execute an inline workflow definition (dict)
for event in client.workflows.execute(
workflow_definition = {
# Name and ordered steps
"name" : "deploy-app" ,
"steps" : [
{ "name" : "build" , "command" : "docker build -t myapp ." }, # build step
{ "name" : "deploy" , "command" : "kubectl apply -f deployment.yaml" } # deploy step
]
},
parameters = { "environment" : "staging" }, # optional runtime parameters
stream = True
):
print (event)
Define workflows using the DSL:
from kubiya.dsl import workflow
wf = (
workflow( "hello-kubiya" ) # workflow id
.description( "My first Kubiya workflow" )
.step( "greet" , "echo 'Hello from Kubiya!'" ) # shell step
.step( "date" , "date" )
.step( "system-info" , "uname -a" )
)
for event in client.execute_workflow(wf.to_dict(), stream = True ): # execute with streaming
print (event)
Advanced step configuration:
from kubiya.dsl import workflow
wf = workflow( "data-processing" ).params(
INPUT_FILE = "data.csv" ,
OUTPUT_DIR = "/tmp/processed"
)
wf.step( "create-dir" , "mkdir -p $ {OUTPUT_DIR} " ) # prepare output dir
wf.step( "process" , callback = lambda s :
s.shell( "cat $ {INPUT_FILE} | wc -l > $ {OUTPUT_DIR} /count.txt" ) # count lines
.depends( "create-dir" ) # ensure directory exists first
)
wf.step( "verify" , callback = lambda s :
s.shell( "cat $ {OUTPUT_DIR} /count.txt" ) # print result
.depends( "process" ) # run after processing
)
for event in client.execute_workflow(wf.to_dict(), stream = True ): # execute and stream events
print (event)
Best Practices
Always use clear, descriptive names for workflows and steps: workflow( "deploy-production-api" ) # Good
workflow( "deploy" ) # Too vague
Include descriptions for workflows and complex steps: wf = (
workflow( "data-pipeline" )
.description( "ETL pipeline for customer data processing" )
)
Add error handling around agent operations: try :
result = client.agents.execute(agent_id, payload)
print (result)
except Exception as e:
print ( f "Agent execution failed: { e } " )
Make workflows reusable with parameters: wf = (
workflow( "deploy" )
.params(
ENVIRONMENT = "staging" ,
VERSION = "latest"
)
)
Next Steps