Skip to main content
The Control Plane Client SDK provides a comprehensive Python interface to Kubiya’s Control Plane services, enabling you to programmatically manage models, runtimes, skills, policies, agents, workers, and the context graph.
The Control Plane Client (ControlPlaneClient) is the modern SDK for accessing Kubiya’s Control Plane services. It provides service-oriented access to all platform capabilities including graph operations, cognitive memory, and infrastructure management.

Key Features

  • 🧠 Context Graph Operations: Intelligent search, semantic search, and graph queries
  • 💾 Cognitive Memory: Store and recall context across interactions
  • 👥 Multi-Agent Teams: Create and orchestrate collaborative agent teams
  • 📁 Project Organization: Organize agents and teams into structured projects
  • ⏰ Job Scheduling: Schedule recurring tasks with cron or webhook triggers
  • 🌍 Environment Management: Manage isolated runtime environments
  • 🗺️ Task Planning: AI-powered task decomposition and planning
  • 🔄 Context Resolution: Entity context with inheritance across layers
  • 🤖 Agent & Model Management: Access and configure LLMs and agents
  • 🛠️ Skills & Policies: Configure tools, capabilities, and access controls
  • 📊 Infrastructure Management: Monitor workers and execution infrastructure

Quick Start

from kubiya import ControlPlaneClient

# Initialize the client
client = ControlPlaneClient(api_key="your-api-key")

# Use intelligent search
result = client.graph.intelligent_search(
    keywords="Find all production databases with high availability"
)

print(result["answer"])  # Natural language answer
for node in result["nodes"]:
    print(f"- {node['properties']['name']}")

Available Control Plane Services

The Control Plane Client provides access to these services:
ServicePurposeKey Methods
healthHealth checkscheck
graphContext Graph operationsintelligent_search, semantic_search, store_memory, recall_memory
datasetsDataset managementcreate_dataset, list_datasets, get_dataset, delete_dataset
ingestionGraph data ingestioningest_node, ingest_nodes_batch, ingest_relationship
modelsLLM model managementlist, get, get_default
runtimesAgent runtime configslist, get, create
contextContext resolution & managementget, update, delete, resolve
skillsTool sets and capabilitieslist, get, create
policiesPolicy managementlist, get, create
task_planningAI-powered task planningplan, plan_stream
agentsAgent managementlist, get, create, update, delete
teamsMulti-agent teamslist, get, create, update, delete, execute, execute_stream
projectsProject organizationlist, get, create, update, delete, add_agent, add_team
environmentsRuntime environmentslist, get, create, update, delete, get_worker_command
jobsScheduled & webhook jobslist, get, create, update, trigger, enable, disable
workersWorker managementlist, get
secretsSecrets managementlist, get, create
integrationsIntegration configslist, get

Client Initialization

Basic Initialization

from kubiya import ControlPlaneClient

# Basic initialization with API key
client = ControlPlaneClient(api_key="your-api-key")

Using Environment Variables

The recommended approach for secure credential management:
# Set environment variable
export KUBIYA_API_KEY="your-api-key"
from kubiya import ControlPlaneClient

# Reads KUBIYA_API_KEY from environment
client = ControlPlaneClient()

Advanced Configuration

from kubiya import ControlPlaneClient

client = ControlPlaneClient(
    api_key="your-api-key",
    base_url="https://control-plane.kubiya.ai",  # Control Plane API base URL (default)
    timeout=300,                                   # Request timeout in seconds (default: 300)
    max_retries=3,                                 # Number of retry attempts (default: 3)
    org_name="my-organization"                     # Optional organization name
)

Core Services

Context Graph Service

Query and explore your knowledge graph with AI-powered search:
# AI-powered intelligent search
result = client.graph.intelligent_search(
    keywords="Find all EC2 instances in us-east-1",
    integration="AWS"
)

print(result["answer"])
for node in result["nodes"]:
    print(f"- {node['properties']['name']}")

Dataset Management

Manage cognitive datasets for memory and knowledge:
# Create a dataset
dataset = client.datasets.create_dataset(
    name="production-knowledge",
    description="Production environment context",
    scope="org"
)

# List datasets
datasets = client.datasets.list_datasets()
for ds in datasets:
    print(f"Dataset: {ds['name']}")
Learn more about Dataset Management →

Graph Ingestion

Import nodes and relationships into the context graph:
# Ingest nodes in batch
client.ingestion.ingest_nodes_batch(
    nodes=[
        {
            "id": "server-1",
            "labels": ["Server", "Production"],
            "properties": {"name": "prod-server-01", "region": "us-east-1"}
        },
        {
            "id": "db-1",
            "labels": ["Database", "PostgreSQL"],
            "properties": {"name": "prod-db-01", "size": "large"}
        }
    ],
    duplicate_handling="skip"
)

# Ingest relationships
client.ingestion.ingest_relationships_batch(
    relationships=[
        {
            "source_id": "server-1",
            "target_id": "db-1",
            "relationship_type": "CONNECTS_TO",
            "properties": {"port": 5432}
        }
    ]
)
Learn more about Graph Ingestion →

Models Service

Manage LLM models and configurations:
# List available models
models = client.models.list_models()
for model in models:
    print(f"Model: {model['name']} - Provider: {model['provider']}")

# Get model details
model = client.models.get_model("claude-3-opus-20240229")
print(f"Context window: {model['context_window']}")
Learn more about Models Service →

Runtimes Service

Configure agent runtime environments:
# List agent runtimes
runtimes = client.runtimes.list_runtimes()
for runtime in runtimes:
    print(f"Runtime: {runtime['name']}")

# Get runtime configuration
runtime = client.runtimes.get_runtime("agno-runtime")
print(f"Runtime config: {runtime}")
Learn more about Runtimes Service →

Skills Service

Manage tools and capabilities:
# List available skills
skills = client.skills.list_skills()
for skill in skills:
    print(f"Skill: {skill['name']} - Category: {skill['category']}")

# Get skill details
skill = client.skills.get_skill("kubernetes")
print(f"Tools: {skill['tools']}")
Learn more about Skills Service →

Policies Service

Manage security and access policies:
# List policies
policies = client.policies.list_policies()
for policy in policies:
    print(f"Policy: {policy['name']}")

# Get policy details
policy = client.policies.get_policy("security-policy-01")
print(f"Enforcement: {policy['enforcement']}")
Learn more about Policies Service →

Agents Service

Manage Control Plane agents:
# List agents
agents = client.agents.list_agents()
for agent in agents:
    print(f"Agent: {agent['name']} - Status: {agent['status']}")

# Get agent configuration
agent = client.agents.get_agent("devops-assistant")
print(f"Capabilities: {agent['capabilities']}")
Learn more about Agents Service →

Workers Service

Monitor and manage execution infrastructure:
# List workers
workers = client.workers.list_workers()
for worker in workers:
    print(f"Worker: {worker['id']} - Status: {worker['status']}")

# Get worker status
worker = client.workers.get_worker("worker-123")
print(f"Health: {worker['health']}")
Learn more about Workers Service →

Teams Service

Create and manage multi-agent teams:
# Create a team
team = client.teams.create({
    "name": "DevOps Team",
    "description": "Infrastructure automation team",
    "runtime": "default"
})

# Add agents to team
client.teams.add_agent(team_id=team['id'], agent_id="agent-uuid")

# Execute team task with streaming
for chunk in client.teams.execute_stream(
    team_id=team['id'],
    execution_data={
        "prompt": "Deploy to production",
        "worker_queue_id": "queue-uuid"
    }
):
    print(chunk.decode('utf-8'), end='', flush=True)
Learn more about Teams Service →

Projects Service

Organize agents and teams into projects:
# Create a project
project = client.projects.create({
    "name": "E-commerce Platform",
    "description": "Main e-commerce project"
})

# Add agents to project
client.projects.add_agent(
    project_id=project['id'],
    agent_id="agent-uuid",
    role="developer"
)

# Add team to project
client.projects.add_team(
    project_id=project['id'],
    team_id="team-uuid",
    role="infrastructure"
)

# List project agents
agents = client.projects.list_agents(project_id=project['id'])
Learn more about Projects Service →

Environments Service

Manage runtime environments:
# Create environment
environment = client.environments.create({
    "name": "Production",
    "description": "Production environment",
    "status": "active"
})

# Get worker registration command
worker_info = client.environments.get_worker_command(
    environment_id=environment['id']
)
print(f"Worker command: {worker_info['command']}")

# List environments
environments = client.environments.list(status_filter="active")
Learn more about Environments Service →

Jobs Service

Schedule and automate tasks:
# Create a cron job
job = client.jobs.create({
    "name": "Daily Backup",
    "trigger_type": "cron",
    "cron_schedule": "0 2 * * *",  # Daily at 2 AM
    "planning_mode": "predefined_agent",
    "entity_type": "agent",
    "entity_id": "backup-agent-uuid",
    "prompt_template": "Perform database backup"
})

# Create a webhook job
webhook_job = client.jobs.create({
    "name": "Deploy on Push",
    "trigger_type": "webhook",
    "planning_mode": "predefined_team",
    "entity_type": "team",
    "entity_id": "deployment-team-uuid",
    "prompt_template": "Deploy {{branch}} to {{environment}}"
})
print(f"Webhook URL: {webhook_job['webhook_url']}")

# Manually trigger a job
result = client.jobs.trigger(
    job_id=job['id'],
    parameters={"database": "prod"}
)
Learn more about Jobs Service →

Context Service

Manage entity context with inheritance:
# Update agent context
client.context.update(
    entity_type="agent",
    entity_id="agent-uuid",
    context_data={
        "region": "us-west-2",
        "tier": "premium",
        "tags": ["production", "critical"]
    }
)

# Resolve context with inheritance
resolved = client.context.resolve(
    entity_type="agent",
    entity_id="agent-uuid"
)
print(f"Resolved context: {resolved}")

# Get entity-specific context
context = client.context.get(
    entity_type="team",
    entity_id="team-uuid"
)
Learn more about Context Service →

Task Planning Service

AI-powered task planning and decomposition:
# Generate a task plan
plan = client.task_planning.plan(
    task_description="Deploy new microservice to production",
    context={
        "service_name": "payment-api",
        "environment": "production"
    }
)

print(f"Plan: {plan['title']}")
for i, step in enumerate(plan['steps'], 1):
    print(f"{i}. {step['description']}")

# Stream plan generation
for update in client.task_planning.plan_stream(
    task_description="Implement CI/CD pipeline"
):
    print(update['data'], end='', flush=True)
Learn more about Task Planning Service →

Error Handling

The SDK provides specialized exceptions for different failure scenarios:
from kubiya.core.exceptions import (
    APIError as KubiyaAPIError,
    AuthenticationError as KubiyaAuthenticationError,
    ConnectionError as KubiyaConnectionError,
    TimeoutError as KubiyaTimeoutError,
)
from kubiya.resources.exceptions import (
    GraphError,
    ValidationError,
    ModelError,
    WorkerError,
)

# Handle graph errors
try:
    result = client.graph.intelligent_search(keywords="test query")
except GraphError as e:
    print(f"Graph operation failed: {e}")

# Handle authentication errors
try:
    client = ControlPlaneClient(api_key="invalid-key")
    client.health.check()
except KubiyaAuthenticationError as e:
    print(f"Authentication failed: {e}")

# Handle API errors
try:
    result = client.datasets.get_dataset("non-existent-dataset")
except KubiyaAPIError as e:
    print(f"API error: {e.status_code}")

# Handle validation errors
try:
    client.ingestion.ingest_node(
        id="",  # Invalid empty ID
        labels=[],
        properties={}
    )
except ValidationError as e:
    print(f"Validation failed: {e}")

# Handle timeout errors
try:
    result = client.graph.intelligent_search(
        keywords="complex query",
        max_turns=10
    )
except KubiyaTimeoutError as e:
    print(f"Request timed out: {e}")
Learn more about Error Handling →

Best Practices

1. Use Environment Variables for Credentials

# Good - secure and flexible
from kubiya import ControlPlaneClient

client = ControlPlaneClient()  # Uses KUBIYA_API_KEY from environment

# Avoid - hardcoding credentials
client = ControlPlaneClient(api_key="hardcoded-key")  # Don't do this!

2. Handle Errors Appropriately

from kubiya.resources.exceptions import GraphError

try:
    result = client.graph.intelligent_search(keywords="query")
    # Process result
except GraphError as e:
    print(f"Search failed: {e}")
    # Implement retry or fallback logic

3. Use Batch Operations for Efficiency

# Good - batch import
client.ingestion.ingest_nodes_batch(
    nodes=[
        {"id": "node-1", "labels": ["Type1"], "properties": {}},
        {"id": "node-2", "labels": ["Type2"], "properties": {}},
        # ... more nodes
    ]
)

# Avoid - individual imports in a loop
# for node in nodes:
#     client.ingestion.ingest_node(...)  # Less efficient

4. Clean Up Resources

# Always clean up sessions when done
session_id = None
try:
    result = client.graph.intelligent_search(keywords="query")
    session_id = result["session_id"]

    # Continue conversation...
    follow_up = client.graph.intelligent_search(
        keywords="follow-up query",
        session_id=session_id
    )
finally:
    if session_id:
        client.graph.delete_search_session(session_id)

5. Use Type Hints

from kubiya import ControlPlaneClient
from typing import List, Dict

def search_infrastructure(
    client: ControlPlaneClient,
    query: str
) -> List[Dict]:
    """Search infrastructure using intelligent search."""
    result = client.graph.intelligent_search(keywords=query)
    return result["nodes"]
Learn more about Best Practices →

Common Patterns

Pagination

# Paginate through results
def list_all_datasets(client: ControlPlaneClient):
    skip = 0
    limit = 50

    while True:
        datasets = client.datasets.list_datasets(skip=skip, limit=limit)
        if not datasets:
            break

        for dataset in datasets:
            print(f"Dataset: {dataset['name']}")

        skip += limit

Retry Logic

from time import sleep
from kubiya.resources.exceptions import APIError

def retry_operation(operation, max_attempts=3):
    """Retry an operation with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            return operation()
        except APIError as e:
            if attempt == max_attempts - 1:
                raise
            print(f"Attempt {attempt + 1} failed, retrying...")
            sleep(2 ** attempt)  # Exponential backoff

# Usage
result = retry_operation(
    lambda: client.graph.intelligent_search(keywords="query")
)

Context Manager

from contextlib import contextmanager

@contextmanager
def control_plane_client(api_key: str):
    """Context manager for ControlPlaneClient."""
    client = ControlPlaneClient(api_key=api_key)
    try:
        yield client
    finally:
        # Cleanup if needed
        pass

# Usage
with control_plane_client("your-api-key") as client:
    result = client.graph.get_stats()
    print(f"Nodes: {result['node_count']}")

Architecture

The Control Plane Client SDK is organized into service modules:
ControlPlaneClient
├── graph              # Context Graph operations
│   ├── intelligent_search()
│   ├── semantic_search()
│   ├── store_memory()
│   └── recall_memory()
├── datasets           # Dataset management
│   ├── create_dataset()
│   ├── list_datasets()
│   └── get_dataset()
├── ingestion          # Graph data ingestion
│   ├── ingest_node()
│   └── ingest_nodes_batch()
├── models             # LLM model management
│   ├── list()
│   ├── get()
│   └── get_default()
├── runtimes           # Agent runtime configs
│   ├── list()
│   └── get()
├── context            # Context resolution & management
│   ├── get()
│   ├── update()
│   ├── delete()
│   └── resolve()
├── skills             # Tool sets and capabilities
│   ├── list()
│   └── get()
├── policies           # Policy management
│   ├── list()
│   └── get()
├── task_planning      # AI-powered task planning
│   ├── plan()
│   └── plan_stream()
├── agents             # Agent management
│   ├── list()
│   ├── get()
│   ├── create()
│   └── update()
├── teams              # Multi-agent teams
│   ├── list()
│   ├── get()
│   ├── create()
│   ├── add_agent()
│   ├── execute()
│   └── execute_stream()
├── projects           # Project organization
│   ├── list()
│   ├── get()
│   ├── create()
│   ├── add_agent()
│   └── add_team()
├── environments       # Runtime environments
│   ├── list()
│   ├── get()
│   ├── create()
│   └── get_worker_command()
├── jobs               # Scheduled & webhook jobs
│   ├── list()
│   ├── get()
│   ├── create()
│   ├── trigger()
│   ├── enable()
│   └── disable()
├── workers            # Worker management
│   ├── list()
│   └── get()
├── secrets            # Secrets management
│   ├── list()
│   └── get()
└── integrations       # Integration configs
    ├── list()
    └── get()

Next Steps