Skip to main content
Complete reference for all SDK classes, methods, and functions for the Control Plane Client.

ControlPlaneClient

The main entry point for interacting with the Kubiya Control Plane API.

Class: ControlPlaneClient

from kubiya import ControlPlaneClient

client = ControlPlaneClient(
    api_key: str = None,
    base_url: str = "https://control-plane.kubiya.ai",
    timeout: int = 300,
    max_retries: int = 3,
    org_name: str = None
)

Parameters

ParameterTypeDefaultDescription
api_keystrNoneKubiya API key. If not provided, reads from KUBIYA_API_KEY env var
base_urlstr"https://control-plane.kubiya.ai"Control Plane API endpoint URL
timeoutint300Request timeout in seconds
max_retriesint3Number of retries for failed requests
org_namestrNoneOrganization name (for multi-org accounts)

Services

The client provides access to these services:
  • client.health - Health Service
  • client.agents - Agents Service
  • client.teams - Teams Service
  • client.projects - Projects Service
  • client.environments - Environments Service
  • client.jobs - Jobs Service
  • client.graph - Context Graph Service
  • client.models - Models Service
  • client.runtimes - Runtimes Service
  • client.skills - Skills Service
  • client.policies - Policies Service
  • client.workers - Workers Service
  • client.secrets - Secrets Service
  • client.integrations - Integrations Service
  • client.context - Context Service
  • client.task_planning - Task Planning Service

Health Service

client.health.check()

Check health status of the Control Plane.
status = client.health.check()
print(status['status'])  # 'healthy'
Returns: Dictionary with health status

client.health.ready()

Check if Control Plane is ready to accept requests.

client.health.detailed()

Get detailed health including database, Redis, and Temporal status.

Agents Service

client.agents.list(skip=0, limit=100, status_filter=None)

List all agents in the organization.
agents = client.agents.list(limit=10)
for agent in agents:
    print(agent['name'], agent['agent_id'])
Parameters:
  • skip (int): Number of records to skip for pagination
  • limit (int): Maximum number of records to return
  • status_filter (str, optional): Filter by status
Returns: List of agent dictionaries

client.agents.get(agent_id)

Get a specific agent by ID.
agent = client.agents.get("agent-uuid")
Returns: Agent dictionary

client.agents.create(agent_data)

Create a new agent.
agent = client.agents.create({
    "name": "devops-helper",
    "model_id": "claude-sonnet-4"
})
Returns: Created agent dictionary

client.agents.update(agent_id, agent_data)

Update an existing agent.
updated = client.agents.update(agent_id, {"skill_ids": ["skill-123"]})
Returns: Updated agent dictionary

client.agents.delete(agent_id)

Delete an agent.

client.agents.execute(agent_id, execution_data)

Execute an agent task via Temporal workflow.
result = client.agents.execute(agent_id, {
    "worker_queue_id": "default",
    "prompt": "Deploy NGINX to EKS"
})
Returns: Execution details dictionary

Teams Service

client.teams.list(skip=0, limit=100, status_filter=None)

List all teams in the organization.
teams = client.teams.list()
for team in teams:
    print(team['name'])
Parameters:
  • skip (int): Number of records to skip
  • limit (int): Maximum number of records
  • status_filter (str, optional): Filter by status (‘active’, ‘inactive’, ‘archived’, ‘idle’)
Returns: List of team dictionaries

client.teams.get(team_id)

Get a specific team by ID.

client.teams.create(team_data)

Create a new team.
team = client.teams.create({
    "name": "Platform Engineering",
    "description": "Infrastructure automation team"
})

client.teams.update(team_id, team_data)

Update an existing team.

client.teams.delete(team_id)

Delete a team.

client.teams.add_agent(team_id, agent_id)

Add an agent to a team.

client.teams.remove_agent(team_id, agent_id)

Remove an agent from a team.

client.teams.execute(team_id, execution_data)

Execute a team task.
result = client.teams.execute(team_id, {
    "prompt": "Deploy to production",
    "worker_queue_id": "default"
})

client.teams.execute_stream(team_id, execution_data)

Execute a team task with streaming response.
for chunk in client.teams.execute_stream(team_id, execution_data):
    print(chunk)

Jobs Service

client.jobs.list(enabled=None, trigger_type=None)

List all jobs.
jobs = client.jobs.list(trigger_type='cron')
Parameters:
  • enabled (bool, optional): Filter by enabled status
  • trigger_type (str, optional): Filter by trigger type (‘cron’, ‘webhook’, ‘manual’)

client.jobs.get(job_id)

Get a specific job.

client.jobs.create(job_data)

Create a new job.
job = client.jobs.create({
    "name": "daily-backup",
    "trigger_type": "cron",
    "cron_schedule": "0 2 * * *",
    "cron_timezone": "UTC",
    "planning_mode": "predefined_agent",
    "entity_type": "agent",
    "entity_id": "agent-uuid",
    "prompt_template": "Run daily backup for {{environment}}"
})

client.jobs.update(job_id, job_data)

Update a job.

client.jobs.delete(job_id)

Delete a job.

client.jobs.trigger(job_id, parameters=None, config_override=None)

Manually trigger a job.
result = client.jobs.trigger(job_id, parameters={"environment": "production"})
Returns: Dictionary with workflow_id, execution_id, and status

client.jobs.enable(job_id)

Enable a job and unpause its schedule.

client.jobs.disable(job_id)

Disable a job and pause its schedule.

client.jobs.get_executions(job_id, limit=50, offset=0)

Get execution history for a job.

Context Graph Service

client.graph.health()

Check health of the context graph service.

client.graph.list_nodes(integration=None, skip=0, limit=100)

List nodes in the context graph.
nodes = client.graph.list_nodes(integration="aws", limit=50)
print(f"Found {nodes['count']} nodes")

client.graph.get_node(node_id, integration=None)

Get a specific node by ID.

client.graph.search_nodes(search_data, integration=None, skip=0, limit=100)

Search nodes using structured query.
results = client.graph.search_nodes({
    "labels": ["EC2Instance"],
    "properties": {"state": "running"}
})

client.graph.search_nodes_by_text(text_query, integration=None, skip=0, limit=100)

Search nodes using text query.
results = client.graph.search_nodes_by_text({
    "property_name": "name",
    "search_text": "production"
})

client.graph.get_relationships(node_id, direction=None, relationship_type=None, integration=None, skip=0, limit=100)

Get relationships for a node.
rels = client.graph.get_relationships(
    node_id="...",
    direction="outgoing",
    relationship_type="HAS_POLICY"
)

client.graph.get_subgraph(node_id, depth=1, integration=None)

Get a subgraph starting from a node.
subgraph = client.graph.get_subgraph(node_id, depth=2)

client.graph.list_labels(integration=None, skip=0, limit=100)

Get all node labels (types) in the graph.

client.graph.list_relationship_types(integration=None, skip=0, limit=100)

Get all relationship types.

client.graph.get_stats(integration=None, skip=0, limit=100)

Get graph statistics.

client.graph.execute_query(query)

Execute a custom Cypher query.
results = client.graph.execute_query({
    "query": "MATCH (n:EC2Instance) WHERE n.state = 'running' RETURN n LIMIT 10"
})

client.graph.list_integrations(skip=0, limit=100)

Get all integrations in the context graph.

Projects Service

client.projects.list(skip=0, limit=100)

List all projects.

client.projects.get(project_id)

Get a specific project.

client.projects.create(project_data)

Create a new project.

client.projects.update(project_id, project_data)

Update a project.

client.projects.delete(project_id)

Delete a project.

Environments Service

client.environments.list(skip=0, limit=100)

List all environments.

client.environments.get(environment_id)

Get a specific environment.

client.environments.create(environment_data)

Create a new environment.

client.environments.update(environment_id, environment_data)

Update an environment.

client.environments.delete(environment_id)

Delete an environment.

Models Service

client.models.list()

List available LLM models.
models = client.models.list()
for model in models:
    print(model['name'])

client.models.get(model_id)

Get a specific model.

client.models.get_default()

Get the default model.

Skills Service

client.skills.list()

List all skills.

client.skills.get(skill_id)

Get a specific skill.

client.skills.associate(entity_id, skill_ids)

Associate skills with an entity.
client.skills.associate(
    entity_id="agent-uuid",
    skill_ids=["skill-1", "skill-2"]
)

Policies Service

client.policies.list()

List all policies.

client.policies.get(policy_id)

Get a specific policy.

client.policies.create(policy_data)

Create a new policy.

client.policies.update(policy_id, policy_data)

Update a policy.

client.policies.delete(policy_id)

Delete a policy.

Secrets Service

client.secrets.list()

List secrets (metadata only).

client.secrets.get_value(name)

Get a secret value.
value = client.secrets.get_value("github-token")

Integrations Service

client.integrations.list()

List all integrations.

client.integrations.get_integration_credentials(vendor, id)

Get credentials for an integration.
creds = client.integrations.get_integration_credentials(vendor="aws", id="aws")

Task Planning Service

client.task_planning.plan(task_description, context=None)

Generate a task plan from a description.
plan = client.task_planning.plan(
    task_description="Deploy a new microservice to production",
    context={"environment": "production"}
)

client.task_planning.plan_stream(task_description, context=None)

Generate a task plan with streaming updates.
for update in client.task_planning.plan_stream("Deploy microservice"):
    print(update)

Workers Service

client.workers.list()

List all workers.

client.workers.get(worker_id)

Get a specific worker.

Context Service

client.context.resolve(entity_id)

Resolve entity context with inheritance.

Exceptions

Control Plane Exceptions

from kubiya.resources.exceptions import (
    ControlPlaneError,
    GraphError,
    AgentError,
    TeamError,
    JobError,
    ModelError,
    SkillError,
    PolicyError,
    EnvironmentError,
    WorkerError,
    TaskPlanningError
)

Example Error Handling

from kubiya.resources.exceptions import GraphError, AgentError

try:
    nodes = client.graph.list_nodes()
except GraphError as e:
    print(f"Graph operation failed: {e}")

try:
    agent = client.agents.get("invalid-uuid")
except AgentError as e:
    print(f"Agent operation failed: {e}")

Environment Variables

VariableDescriptionDefault
KUBIYA_API_KEYAPI key for authenticationNone
KUBIYA_CONTROL_PLANE_BASE_URLControl Plane API URLhttps://control-plane.kubiya.ai

Type Hints

The SDK provides type hints for better IDE support:
from typing import Dict, List, Any
from kubiya import ControlPlaneClient

def list_agents(client: ControlPlaneClient) -> List[Dict[str, Any]]:
    """List agents with proper type hints"""
    return client.agents.list()

Next Steps