Follow these best practices to build reliable, maintainable, and performant applications with the Kubiya SDK.
Overview
This guide covers best practices across:- Authentication & Security: Protect credentials and handle permissions
- Performance: Optimize API usage and reduce latency
- Error Handling: Build resilient applications
- Resource Management: Clean up resources properly
- Code Organization: Structure your code effectively
- Testing: Ensure reliability through testing
Authentication & Security
Use Environment Variables for Credentials
Copy
Ask AI
# ✅ GOOD - Environment variables
import os
from kubiya import ControlPlaneClient
api_key = os.getenv("KUBIYA_API_KEY")
client = ControlPlaneClient(api_key=api_key)
# ❌ BAD - Hardcoded credentials
client = ControlPlaneClient(api_key="hardcoded-key-here") # Don't do this!
Use .env Files for Local Development
Copy
Ask AI
# .env
KUBIYA_API_KEY=your-api-key-here
KUBIYA_BASE_URL=https://control-plane.kubiya.ai
Copy
Ask AI
from dotenv import load_dotenv
from kubiya import ControlPlaneClient
import os
load_dotenv()
client = ControlPlaneClient(
api_key=os.getenv("KUBIYA_API_KEY"),
base_url=os.getenv("KUBIYA_BASE_URL")
)
Never Log Sensitive Data
Copy
Ask AI
import logging
from kubiya import ControlPlaneClient
logger = logging.getLogger(__name__)
client = ControlPlaneClient(api_key="your-api-key")
# ❌ BAD - Logging sensitive data
secret = client.secrets.get_value(name="api-token")
logger.info(f"Got secret: {secret['value']}") # NEVER LOG SECRETS!
# ✅ GOOD - Log metadata only
secret = client.secrets.get_value(name="api-token")
logger.info(f"Retrieved secret: {secret['name']}")
Clear Sensitive Data from Memory
Copy
Ask AI
from kubiya import ControlPlaneClient
import gc
client = ControlPlaneClient(api_key="your-api-key")
# Get secret
secret = client.secrets.get_value(name="api-token")
token = secret['value']
# Use token
# ... perform operations ...
# Clear from memory
token = None
secret = None
gc.collect()
Performance Optimization
Use Batch Operations
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# ❌ BAD - Individual operations in loop
for node in nodes:
client.ingestion.ingest_node(
id=node['id'],
labels=node['labels'],
properties=node['properties']
)
# ✅ GOOD - Batch operation
client.ingestion.ingest_nodes_batch(nodes=nodes)
Implement Caching
Copy
Ask AI
from kubiya import ControlPlaneClient
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
class CachedClient:
"""Wrapper with caching for frequently accessed data."""
def __init__(self, client: ControlPlaneClient, ttl_seconds: int = 300):
self.client = client
self.ttl_seconds = ttl_seconds
self._cache: Dict[str, tuple[Any, datetime]] = {}
def list_models(self, use_cache: bool = True):
"""List models with caching."""
cache_key = "models_list"
if use_cache and cache_key in self._cache:
data, expiry = self._cache[cache_key]
if datetime.utcnow() < expiry:
return data
# Fetch from API
models = self.client.models.list()
# Cache result
expiry = datetime.utcnow() + timedelta(seconds=self.ttl_seconds)
self._cache[cache_key] = (models, expiry)
return models
# Usage
from kubiya import ControlPlaneClient
base_client = ControlPlaneClient(api_key="your-api-key")
cached_client = CachedClient(base_client, ttl_seconds=300)
# First call hits API
models1 = cached_client.list_models()
# Second call uses cache (within TTL)
models2 = cached_client.list_models()
Paginate Large Result Sets
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
def fetch_all_memories(client: ControlPlaneClient, batch_size: int = 100):
"""Fetch all memories with pagination."""
all_memories = []
skip = 0
while True:
batch = client.graph.list_memories(skip=skip, limit=batch_size)
if not batch:
break
all_memories.extend(batch)
skip += batch_size
# Process batch immediately instead of loading all into memory
# process_batch(batch)
return all_memories
Use Async Storage for Large Data
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# ✅ GOOD - Async for large content
job = client.graph.store_memory_async(
dataset_id="logs",
context=large_log_data, # Multiple MB of data
metadata={"source": "application"}
)
# Continue with other work while storage completes
# ...
# ❌ BAD - Blocking for large content
memory = client.graph.store_memory(
dataset_id="logs",
context=large_log_data # Blocks until complete
)
Error Handling
Catch Specific Exceptions
Copy
Ask AI
from kubiya import ControlPlaneClient
from kubiya.core.exceptions import (
AuthenticationError as KubiyaAuthenticationError,
TimeoutError as KubiyaTimeoutError,
APIError as KubiyaAPIError
)
from kubiya.resources.exceptions import GraphError
client = ControlPlaneClient(api_key="your-api-key")
try:
result = client.graph.intelligent_search(keywords="query")
except GraphError as e:
# Handle graph-specific errors
print(f"Graph error: {e}")
except KubiyaAuthenticationError as e:
# Handle auth errors
print(f"Authentication failed: {e}")
except KubiyaTimeoutError as e:
# Handle timeouts
print(f"Request timed out: {e}")
except KubiyaAPIError as e:
# Handle general API errors
print(f"API error: {e}")
Implement Retry Logic
Copy
Ask AI
from kubiya import ControlPlaneClient
from kubiya.core.exceptions import (
ConnectionError as KubiyaConnectionError,
TimeoutError as KubiyaTimeoutError
)
import time
def retry_operation(operation, max_attempts=3, backoff_factor=2):
"""Retry operation with exponential backoff."""
for attempt in range(max_attempts):
try:
return operation()
except (KubiyaConnectionError, KubiyaTimeoutError) as e:
if attempt == max_attempts - 1:
raise
delay = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
time.sleep(delay)
# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = retry_operation(
lambda: client.graph.intelligent_search(keywords="query")
)
Clean Up Resources
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
session_id = None
try:
result = client.graph.intelligent_search(keywords="query")
session_id = result["session_id"]
# Use session...
finally:
# Always clean up
if session_id:
try:
client.graph.delete_search_session(session_id)
except Exception:
pass # Ignore cleanup errors
Resource Management
Use Context Managers
Copy
Ask AI
from contextlib import contextmanager
from kubiya import ControlPlaneClient
@contextmanager
def temp_dataset(client: ControlPlaneClient, name: str):
"""Context manager for temporary dataset."""
dataset = client.datasets.create_dataset(name=name, scope="user")
try:
yield dataset
finally:
# Automatic cleanup
try:
client.datasets.delete_dataset(dataset_id=dataset['id'])
except Exception:
pass
# Usage
client = ControlPlaneClient(api_key="your-api-key")
with temp_dataset(client, "temp-data") as dataset:
# Use dataset
client.graph.store_memory(
dataset_id=dataset['id'],
context="temporary data"
)
# Dataset automatically deleted
Clean Up Search Sessions
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# Track sessions for cleanup
active_sessions = []
try:
# Create searches
result1 = client.graph.intelligent_search(keywords="query1")
active_sessions.append(result1["session_id"])
result2 = client.graph.intelligent_search(keywords="query2")
active_sessions.append(result2["session_id"])
finally:
# Clean up all sessions
for session_id in active_sessions:
try:
client.graph.delete_search_session(session_id)
except Exception:
pass
Monitor Dataset Status
Copy
Ask AI
from kubiya import ControlPlaneClient
import time
client = ControlPlaneClient(api_key="your-api-key")
# Create dataset
dataset = client.datasets.create_dataset(name="large-dataset", scope="org")
# Store large amount of data
for data in large_data_batches:
client.graph.store_memory_async(
dataset_id=dataset['id'],
context=data
)
# Wait for processing to complete
while True:
status = client.datasets.get_dataset_status(dataset_id=dataset['id'])
if status['status'] == 'ready':
break
time.sleep(5)
# Now safe to query
results = client.graph.semantic_search(
query="test",
filters={"dataset_id": dataset['id']}
)
Code Organization
Use Type Hints
Copy
Ask AI
from kubiya import ControlPlaneClient
from typing import List, Dict, Optional
def search_infrastructure(
client: ControlPlaneClient,
query: str,
integration: Optional[str] = None
) -> Dict[str, any]:
"""Search infrastructure with type hints."""
return client.graph.intelligent_search(
keywords=query,
integration=integration
)
def batch_import_nodes(
client: ControlPlaneClient,
nodes: List[Dict[str, any]],
dataset_id: str
) -> Dict[str, any]:
"""Import nodes with type hints."""
return client.ingestion.ingest_nodes_batch(
nodes=nodes,
dataset_id=dataset_id
)
Create Reusable Functions
Copy
Ask AI
from kubiya import ControlPlaneClient
from typing import List, Dict
def create_infrastructure_dataset(
client: ControlPlaneClient,
environment: str
) -> Dict:
"""Create dataset for specific environment."""
return client.datasets.create_dataset(
name=f"{environment}-infrastructure",
description=f"Infrastructure data for {environment} environment",
scope="org"
)
def import_aws_resources(
client: ControlPlaneClient,
region: str,
dataset_id: str
) -> Dict:
"""Import AWS resources from region."""
# Fetch resources
nodes = fetch_aws_resources(region)
# Import to graph
return client.ingestion.ingest_nodes_batch(
nodes=nodes,
dataset_id=dataset_id,
duplicate_handling="update"
)
Use Configuration Classes
Copy
Ask AI
from dataclasses import dataclass
from kubiya import ControlPlaneClient
from typing import Optional
@dataclass
class SearchConfig:
"""Configuration for intelligent search."""
max_turns: int = 5
temperature: float = 0.7
enable_semantic_search: bool = True
enable_cypher_queries: bool = False
integration: Optional[str] = None
def search_with_config(
client: ControlPlaneClient,
query: str,
config: SearchConfig
) -> dict:
"""Search using configuration object."""
return client.graph.intelligent_search(
keywords=query,
max_turns=config.max_turns,
temperature=config.temperature,
enable_semantic_search=config.enable_semantic_search,
enable_cypher_queries=config.enable_cypher_queries,
integration=config.integration
)
# Usage
client = ControlPlaneClient(api_key="your-api-key")
config = SearchConfig(max_turns=10, temperature=0.5)
result = search_with_config(client, "test query", config)
Testing
Write Unit Tests
Copy
Ask AI
import pytest
from kubiya import ControlPlaneClient
from unittest.mock import Mock
@pytest.fixture
def mock_client():
"""Create mock client for testing."""
client = Mock(spec=ControlPlaneClient)
client.graph.intelligent_search.return_value = {
"answer": "Test answer",
"nodes": [],
"confidence": "high"
}
return client
def test_search_function(mock_client):
"""Test function using mocked client."""
result = search_infrastructure(mock_client, "test query")
assert result["answer"] == "Test answer"
mock_client.graph.intelligent_search.assert_called_once()
Test Error Handling
Copy
Ask AI
import pytest
from kubiya.resources.exceptions import GraphError
def test_error_handling():
"""Test that errors are handled correctly."""
# Your error handling test
with pytest.raises(GraphError):
# Operation that should fail
pass
Use Test Fixtures
Copy
Ask AI
import pytest
from kubiya import ControlPlaneClient
@pytest.fixture(scope="session")
def client():
"""Provide test client."""
return ControlPlaneClient(api_key="test-key")
@pytest.fixture
def test_dataset(client):
"""Create and cleanup test dataset."""
dataset = client.datasets.create_dataset(
name="test-dataset",
scope="user"
)
yield dataset
# Cleanup
client.datasets.delete_dataset(dataset_id=dataset['id'])
Monitoring & Logging
Log Important Operations
Copy
Ask AI
import logging
from kubiya import ControlPlaneClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = ControlPlaneClient(api_key="your-api-key")
def import_infrastructure(region: str):
"""Import infrastructure with logging."""
logger.info(f"Starting infrastructure import for region: {region}")
try:
nodes = fetch_resources(region)
logger.info(f"Fetched {len(nodes)} resources")
result = client.ingestion.ingest_nodes_batch(nodes=nodes)
logger.info(
f"Import completed: {result['summary']['success']} succeeded, "
f"{result['summary']['failed']} failed"
)
return result
except Exception as e:
logger.error(f"Import failed: {e}", exc_info=True)
raise
Track Metrics
Copy
Ask AI
from kubiya import ControlPlaneClient
from datetime import datetime
import time
class MetricsTracker:
"""Track SDK operation metrics."""
def __init__(self):
self.operations = []
def track_operation(self, operation_name: str, duration: float, success: bool):
"""Record operation metrics."""
self.operations.append({
"name": operation_name,
"duration": duration,
"success": success,
"timestamp": datetime.utcnow().isoformat()
})
def get_stats(self):
"""Get operation statistics."""
if not self.operations:
return {}
return {
"total_operations": len(self.operations),
"success_rate": sum(1 for op in self.operations if op['success']) / len(self.operations),
"average_duration": sum(op['duration'] for op in self.operations) / len(self.operations)
}
# Usage
metrics = MetricsTracker()
client = ControlPlaneClient(api_key="your-api-key")
start = time.time()
try:
result = client.graph.intelligent_search(keywords="query")
metrics.track_operation("intelligent_search", time.time() - start, True)
except Exception:
metrics.track_operation("intelligent_search", time.time() - start, False)
print(metrics.get_stats())
Documentation
Document Functions
Copy
Ask AI
from kubiya import ControlPlaneClient
from typing import List, Dict, Optional
def search_and_analyze(
client: ControlPlaneClient,
query: str,
integration: Optional[str] = None
) -> Dict[str, any]:
"""
Search and analyze infrastructure using intelligent search.
Args:
client: Control Plane client instance
query: Natural language search query
integration: Optional integration filter (e.g., "AWS", "GitHub")
Returns:
Dictionary containing:
- answer: Natural language answer
- nodes: List of matching nodes
- analysis: Additional analysis results
Raises:
GraphError: If search operation fails
AuthenticationError: If API key is invalid
Example:
>>> client = ControlPlaneClient(api_key="your-key")
>>> result = search_and_analyze(client, "production databases")
>>> print(result["answer"])
"""
result = client.graph.intelligent_search(
keywords=query,
integration=integration
)
return {
"answer": result["answer"],
"nodes": result["nodes"],
"analysis": analyze_nodes(result["nodes"])
}
Add Code Comments for Complex Logic
Copy
Ask AI
from kubiya import ControlPlaneClient
def complex_data_migration(client: ControlPlaneClient):
"""Migrate data between datasets with validation."""
# Step 1: Fetch all memories from source dataset
# Using pagination to handle large datasets efficiently
memories = []
skip = 0
batch_size = 100
while True:
batch = client.graph.list_memories(skip=skip, limit=batch_size)
if not batch:
break
memories.extend(batch)
skip += batch_size
# Step 2: Transform memories for target dataset
# Filter out test data and normalize format
transformed = [
transform_memory(m) for m in memories
if not m.get('metadata', {}).get('is_test', False)
]
# Step 3: Batch import to target dataset
# Use async to avoid blocking on large imports
for i in range(0, len(transformed), 100):
batch = transformed[i:i+100]
client.graph.store_memory_async(
dataset_id="target-dataset",
context=batch
)