The Control Plane Client SDK provides a comprehensive Python interface to Kubiya’s Control Plane services, enabling you to programmatically manage models, runtimes, skills, policies, agents, workers, and the context graph.
The Control Plane Client (ControlPlaneClient) is the modern SDK for accessing Kubiya’s Control Plane services. It provides service-oriented access to all platform capabilities including graph operations, cognitive memory, and infrastructure management.
Key Features
- 🧠 Context Graph Operations: Intelligent search, semantic search, and graph queries
- 💾 Cognitive Memory: Store and recall context across interactions
- 👥 Multi-Agent Teams: Create and orchestrate collaborative agent teams
- 📁 Project Organization: Organize agents and teams into structured projects
- ⏰ Job Scheduling: Schedule recurring tasks with cron or webhook triggers
- 🌍 Environment Management: Manage isolated runtime environments
- 🗺️ Task Planning: AI-powered task decomposition and planning
- 🔄 Context Resolution: Entity context with inheritance across layers
- 🤖 Agent & Model Management: Access and configure LLMs and agents
- 🛠️ Skills & Policies: Configure tools, capabilities, and access controls
- 📊 Infrastructure Management: Monitor workers and execution infrastructure
Quick Start
from kubiya import ControlPlaneClient
# Initialize the client
client = ControlPlaneClient(api_key="your-api-key")
# Use intelligent search
result = client.graph.intelligent_search(
keywords="Find all production databases with high availability"
)
print(result["answer"]) # Natural language answer
for node in result["nodes"]:
print(f"- {node['properties']['name']}")
Available Control Plane Services
The Control Plane Client provides access to these services:
| Service | Purpose | Key Methods |
|---|
health | Health checks | check |
graph | Context Graph operations | intelligent_search, semantic_search, store_memory, recall_memory |
datasets | Dataset management | create_dataset, list_datasets, get_dataset, delete_dataset |
ingestion | Graph data ingestion | ingest_node, ingest_nodes_batch, ingest_relationship |
models | LLM model management | list, get, get_default |
runtimes | Agent runtime configs | list, get, create |
context | Context resolution & management | get, update, delete, resolve |
skills | Tool sets and capabilities | list, get, create |
policies | Policy management | list, get, create |
task_planning | AI-powered task planning | plan, plan_stream |
agents | Agent management | list, get, create, update, delete |
teams | Multi-agent teams | list, get, create, update, delete, execute, execute_stream |
projects | Project organization | list, get, create, update, delete, add_agent, add_team |
environments | Runtime environments | list, get, create, update, delete, get_worker_command |
jobs | Scheduled & webhook jobs | list, get, create, update, trigger, enable, disable |
workers | Worker management | list, get |
secrets | Secrets management | list, get, create |
integrations | Integration configs | list, get |
Client Initialization
Basic Initialization
from kubiya import ControlPlaneClient
# Basic initialization with API key
client = ControlPlaneClient(api_key="your-api-key")
Using Environment Variables
The recommended approach for secure credential management:
# Set environment variable
export KUBIYA_API_KEY="your-api-key"
from kubiya import ControlPlaneClient
# Reads KUBIYA_API_KEY from environment
client = ControlPlaneClient()
Advanced Configuration
from kubiya import ControlPlaneClient
client = ControlPlaneClient(
api_key="your-api-key",
base_url="https://control-plane.kubiya.ai", # Control Plane API base URL (default)
timeout=300, # Request timeout in seconds (default: 300)
max_retries=3, # Number of retry attempts (default: 3)
org_name="my-organization" # Optional organization name
)
Core Services
Context Graph Service
Query and explore your knowledge graph with AI-powered search:
# AI-powered intelligent search
result = client.graph.intelligent_search(
keywords="Find all EC2 instances in us-east-1",
integration="AWS"
)
print(result["answer"])
for node in result["nodes"]:
print(f"- {node['properties']['name']}")
Dataset Management
Manage cognitive datasets for memory and knowledge:
# Create a dataset
dataset = client.datasets.create_dataset(
name="production-knowledge",
description="Production environment context",
scope="org"
)
# List datasets
datasets = client.datasets.list_datasets()
for ds in datasets:
print(f"Dataset: {ds['name']}")
Learn more about Dataset Management →
Graph Ingestion
Import nodes and relationships into the context graph:
# Ingest nodes in batch
client.ingestion.ingest_nodes_batch(
nodes=[
{
"id": "server-1",
"labels": ["Server", "Production"],
"properties": {"name": "prod-server-01", "region": "us-east-1"}
},
{
"id": "db-1",
"labels": ["Database", "PostgreSQL"],
"properties": {"name": "prod-db-01", "size": "large"}
}
],
duplicate_handling="skip"
)
# Ingest relationships
client.ingestion.ingest_relationships_batch(
relationships=[
{
"source_id": "server-1",
"target_id": "db-1",
"relationship_type": "CONNECTS_TO",
"properties": {"port": 5432}
}
]
)
Learn more about Graph Ingestion →
Models Service
Manage LLM models and configurations:
# List available models
models = client.models.list_models()
for model in models:
print(f"Model: {model['name']} - Provider: {model['provider']}")
# Get model details
model = client.models.get_model("claude-3-opus-20240229")
print(f"Context window: {model['context_window']}")
Learn more about Models Service →
Runtimes Service
Configure agent runtime environments:
# List agent runtimes
runtimes = client.runtimes.list_runtimes()
for runtime in runtimes:
print(f"Runtime: {runtime['name']}")
# Get runtime configuration
runtime = client.runtimes.get_runtime("agno-runtime")
print(f"Runtime config: {runtime}")
Learn more about Runtimes Service →
Skills Service
Manage tools and capabilities:
# List available skills
skills = client.skills.list_skills()
for skill in skills:
print(f"Skill: {skill['name']} - Category: {skill['category']}")
# Get skill details
skill = client.skills.get_skill("kubernetes")
print(f"Tools: {skill['tools']}")
Learn more about Skills Service →
Policies Service
Manage security and access policies:
# List policies
policies = client.policies.list_policies()
for policy in policies:
print(f"Policy: {policy['name']}")
# Get policy details
policy = client.policies.get_policy("security-policy-01")
print(f"Enforcement: {policy['enforcement']}")
Learn more about Policies Service →
Agents Service
Manage Control Plane agents:
# List agents
agents = client.agents.list_agents()
for agent in agents:
print(f"Agent: {agent['name']} - Status: {agent['status']}")
# Get agent configuration
agent = client.agents.get_agent("devops-assistant")
print(f"Capabilities: {agent['capabilities']}")
Learn more about Agents Service →
Workers Service
Monitor and manage execution infrastructure:
# List workers
workers = client.workers.list_workers()
for worker in workers:
print(f"Worker: {worker['id']} - Status: {worker['status']}")
# Get worker status
worker = client.workers.get_worker("worker-123")
print(f"Health: {worker['health']}")
Learn more about Workers Service →
Teams Service
Create and manage multi-agent teams:
# Create a team
team = client.teams.create({
"name": "DevOps Team",
"description": "Infrastructure automation team",
"runtime": "default"
})
# Add agents to team
client.teams.add_agent(team_id=team['id'], agent_id="agent-uuid")
# Execute team task with streaming
for chunk in client.teams.execute_stream(
team_id=team['id'],
execution_data={
"prompt": "Deploy to production",
"worker_queue_id": "queue-uuid"
}
):
print(chunk.decode('utf-8'), end='', flush=True)
Learn more about Teams Service →
Projects Service
Organize agents and teams into projects:
# Create a project
project = client.projects.create({
"name": "E-commerce Platform",
"description": "Main e-commerce project"
})
# Add agents to project
client.projects.add_agent(
project_id=project['id'],
agent_id="agent-uuid",
role="developer"
)
# Add team to project
client.projects.add_team(
project_id=project['id'],
team_id="team-uuid",
role="infrastructure"
)
# List project agents
agents = client.projects.list_agents(project_id=project['id'])
Learn more about Projects Service →
Environments Service
Manage runtime environments:
# Create environment
environment = client.environments.create({
"name": "Production",
"description": "Production environment",
"status": "active"
})
# Get worker registration command
worker_info = client.environments.get_worker_command(
environment_id=environment['id']
)
print(f"Worker command: {worker_info['command']}")
# List environments
environments = client.environments.list(status_filter="active")
Learn more about Environments Service →
Jobs Service
Schedule and automate tasks:
# Create a cron job
job = client.jobs.create({
"name": "Daily Backup",
"trigger_type": "cron",
"cron_schedule": "0 2 * * *", # Daily at 2 AM
"planning_mode": "predefined_agent",
"entity_type": "agent",
"entity_id": "backup-agent-uuid",
"prompt_template": "Perform database backup"
})
# Create a webhook job
webhook_job = client.jobs.create({
"name": "Deploy on Push",
"trigger_type": "webhook",
"planning_mode": "predefined_team",
"entity_type": "team",
"entity_id": "deployment-team-uuid",
"prompt_template": "Deploy {{branch}} to {{environment}}"
})
print(f"Webhook URL: {webhook_job['webhook_url']}")
# Manually trigger a job
result = client.jobs.trigger(
job_id=job['id'],
parameters={"database": "prod"}
)
Learn more about Jobs Service →
Context Service
Manage entity context with inheritance:
# Update agent context
client.context.update(
entity_type="agent",
entity_id="agent-uuid",
context_data={
"region": "us-west-2",
"tier": "premium",
"tags": ["production", "critical"]
}
)
# Resolve context with inheritance
resolved = client.context.resolve(
entity_type="agent",
entity_id="agent-uuid"
)
print(f"Resolved context: {resolved}")
# Get entity-specific context
context = client.context.get(
entity_type="team",
entity_id="team-uuid"
)
Learn more about Context Service →
Task Planning Service
AI-powered task planning and decomposition:
# Generate a task plan
plan = client.task_planning.plan(
task_description="Deploy new microservice to production",
context={
"service_name": "payment-api",
"environment": "production"
}
)
print(f"Plan: {plan['title']}")
for i, step in enumerate(plan['steps'], 1):
print(f"{i}. {step['description']}")
# Stream plan generation
for update in client.task_planning.plan_stream(
task_description="Implement CI/CD pipeline"
):
print(update['data'], end='', flush=True)
Learn more about Task Planning Service →
Error Handling
The SDK provides specialized exceptions for different failure scenarios:
from kubiya.core.exceptions import (
APIError as KubiyaAPIError,
AuthenticationError as KubiyaAuthenticationError,
ConnectionError as KubiyaConnectionError,
TimeoutError as KubiyaTimeoutError,
)
from kubiya.resources.exceptions import (
GraphError,
ValidationError,
ModelError,
WorkerError,
)
# Handle graph errors
try:
result = client.graph.intelligent_search(keywords="test query")
except GraphError as e:
print(f"Graph operation failed: {e}")
# Handle authentication errors
try:
client = ControlPlaneClient(api_key="invalid-key")
client.health.check()
except KubiyaAuthenticationError as e:
print(f"Authentication failed: {e}")
# Handle API errors
try:
result = client.datasets.get_dataset("non-existent-dataset")
except KubiyaAPIError as e:
print(f"API error: {e.status_code}")
# Handle validation errors
try:
client.ingestion.ingest_node(
id="", # Invalid empty ID
labels=[],
properties={}
)
except ValidationError as e:
print(f"Validation failed: {e}")
# Handle timeout errors
try:
result = client.graph.intelligent_search(
keywords="complex query",
max_turns=10
)
except KubiyaTimeoutError as e:
print(f"Request timed out: {e}")
Learn more about Error Handling →
Best Practices
1. Use Environment Variables for Credentials
# Good - secure and flexible
from kubiya import ControlPlaneClient
client = ControlPlaneClient() # Uses KUBIYA_API_KEY from environment
# Avoid - hardcoding credentials
client = ControlPlaneClient(api_key="hardcoded-key") # Don't do this!
2. Handle Errors Appropriately
from kubiya.resources.exceptions import GraphError
try:
result = client.graph.intelligent_search(keywords="query")
# Process result
except GraphError as e:
print(f"Search failed: {e}")
# Implement retry or fallback logic
3. Use Batch Operations for Efficiency
# Good - batch import
client.ingestion.ingest_nodes_batch(
nodes=[
{"id": "node-1", "labels": ["Type1"], "properties": {}},
{"id": "node-2", "labels": ["Type2"], "properties": {}},
# ... more nodes
]
)
# Avoid - individual imports in a loop
# for node in nodes:
# client.ingestion.ingest_node(...) # Less efficient
4. Clean Up Resources
# Always clean up sessions when done
session_id = None
try:
result = client.graph.intelligent_search(keywords="query")
session_id = result["session_id"]
# Continue conversation...
follow_up = client.graph.intelligent_search(
keywords="follow-up query",
session_id=session_id
)
finally:
if session_id:
client.graph.delete_search_session(session_id)
5. Use Type Hints
from kubiya import ControlPlaneClient
from typing import List, Dict
def search_infrastructure(
client: ControlPlaneClient,
query: str
) -> List[Dict]:
"""Search infrastructure using intelligent search."""
result = client.graph.intelligent_search(keywords=query)
return result["nodes"]
Learn more about Best Practices →
Common Patterns
# Paginate through results
def list_all_datasets(client: ControlPlaneClient):
skip = 0
limit = 50
while True:
datasets = client.datasets.list_datasets(skip=skip, limit=limit)
if not datasets:
break
for dataset in datasets:
print(f"Dataset: {dataset['name']}")
skip += limit
Retry Logic
from time import sleep
from kubiya.resources.exceptions import APIError
def retry_operation(operation, max_attempts=3):
"""Retry an operation with exponential backoff."""
for attempt in range(max_attempts):
try:
return operation()
except APIError as e:
if attempt == max_attempts - 1:
raise
print(f"Attempt {attempt + 1} failed, retrying...")
sleep(2 ** attempt) # Exponential backoff
# Usage
result = retry_operation(
lambda: client.graph.intelligent_search(keywords="query")
)
Context Manager
from contextlib import contextmanager
@contextmanager
def control_plane_client(api_key: str):
"""Context manager for ControlPlaneClient."""
client = ControlPlaneClient(api_key=api_key)
try:
yield client
finally:
# Cleanup if needed
pass
# Usage
with control_plane_client("your-api-key") as client:
result = client.graph.get_stats()
print(f"Nodes: {result['node_count']}")
Architecture
The Control Plane Client SDK is organized into service modules:
ControlPlaneClient
├── graph # Context Graph operations
│ ├── intelligent_search()
│ ├── semantic_search()
│ ├── store_memory()
│ └── recall_memory()
├── datasets # Dataset management
│ ├── create_dataset()
│ ├── list_datasets()
│ └── get_dataset()
├── ingestion # Graph data ingestion
│ ├── ingest_node()
│ └── ingest_nodes_batch()
├── models # LLM model management
│ ├── list()
│ ├── get()
│ └── get_default()
├── runtimes # Agent runtime configs
│ ├── list()
│ └── get()
├── context # Context resolution & management
│ ├── get()
│ ├── update()
│ ├── delete()
│ └── resolve()
├── skills # Tool sets and capabilities
│ ├── list()
│ └── get()
├── policies # Policy management
│ ├── list()
│ └── get()
├── task_planning # AI-powered task planning
│ ├── plan()
│ └── plan_stream()
├── agents # Agent management
│ ├── list()
│ ├── get()
│ ├── create()
│ └── update()
├── teams # Multi-agent teams
│ ├── list()
│ ├── get()
│ ├── create()
│ ├── add_agent()
│ ├── execute()
│ └── execute_stream()
├── projects # Project organization
│ ├── list()
│ ├── get()
│ ├── create()
│ ├── add_agent()
│ └── add_team()
├── environments # Runtime environments
│ ├── list()
│ ├── get()
│ ├── create()
│ └── get_worker_command()
├── jobs # Scheduled & webhook jobs
│ ├── list()
│ ├── get()
│ ├── create()
│ ├── trigger()
│ ├── enable()
│ └── disable()
├── workers # Worker management
│ ├── list()
│ └── get()
├── secrets # Secrets management
│ ├── list()
│ └── get()
└── integrations # Integration configs
├── list()
└── get()
Next Steps