Skip to main content
Follow these best practices to build reliable, maintainable, and performant applications with the Kubiya SDK.

Overview

This guide covers best practices across:
  • Authentication & Security: Protect credentials and handle permissions
  • Performance: Optimize API usage and reduce latency
  • Error Handling: Build resilient applications
  • Resource Management: Clean up resources properly
  • Code Organization: Structure your code effectively
  • Testing: Ensure reliability through testing

Authentication & Security

Use Environment Variables for Credentials

# ✅ GOOD - Environment variables
import os
from kubiya import ControlPlaneClient

api_key = os.getenv("KUBIYA_API_KEY")
client = ControlPlaneClient(api_key=api_key)

# ❌ BAD - Hardcoded credentials
client = ControlPlaneClient(api_key="hardcoded-key-here")  # Don't do this!

Use .env Files for Local Development

# .env
KUBIYA_API_KEY=your-api-key-here
KUBIYA_BASE_URL=https://control-plane.kubiya.ai
from dotenv import load_dotenv
from kubiya import ControlPlaneClient
import os

load_dotenv()

client = ControlPlaneClient(
    api_key=os.getenv("KUBIYA_API_KEY"),
    base_url=os.getenv("KUBIYA_BASE_URL")
)

Never Log Sensitive Data

import logging
from kubiya import ControlPlaneClient

logger = logging.getLogger(__name__)
client = ControlPlaneClient(api_key="your-api-key")

# ❌ BAD - Logging sensitive data
secret = client.secrets.get_value(name="api-token")
logger.info(f"Got secret: {secret['value']}")  # NEVER LOG SECRETS!

# ✅ GOOD - Log metadata only
secret = client.secrets.get_value(name="api-token")
logger.info(f"Retrieved secret: {secret['name']}")

Clear Sensitive Data from Memory

from kubiya import ControlPlaneClient
import gc

client = ControlPlaneClient(api_key="your-api-key")

# Get secret
secret = client.secrets.get_value(name="api-token")
token = secret['value']

# Use token
# ... perform operations ...

# Clear from memory
token = None
secret = None
gc.collect()

Performance Optimization

Use Batch Operations

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# ❌ BAD - Individual operations in loop
for node in nodes:
    client.ingestion.ingest_node(
        id=node['id'],
        labels=node['labels'],
        properties=node['properties']
    )

# ✅ GOOD - Batch operation
client.ingestion.ingest_nodes_batch(nodes=nodes)

Implement Caching

from kubiya import ControlPlaneClient
from datetime import datetime, timedelta
from typing import Optional, Dict, Any

class CachedClient:
    """Wrapper with caching for frequently accessed data."""

    def __init__(self, client: ControlPlaneClient, ttl_seconds: int = 300):
        self.client = client
        self.ttl_seconds = ttl_seconds
        self._cache: Dict[str, tuple[Any, datetime]] = {}

    def list_models(self, use_cache: bool = True):
        """List models with caching."""
        cache_key = "models_list"

        if use_cache and cache_key in self._cache:
            data, expiry = self._cache[cache_key]
            if datetime.utcnow() < expiry:
                return data

        # Fetch from API
        models = self.client.models.list()

        # Cache result
        expiry = datetime.utcnow() + timedelta(seconds=self.ttl_seconds)
        self._cache[cache_key] = (models, expiry)

        return models

# Usage
from kubiya import ControlPlaneClient

base_client = ControlPlaneClient(api_key="your-api-key")
cached_client = CachedClient(base_client, ttl_seconds=300)

# First call hits API
models1 = cached_client.list_models()

# Second call uses cache (within TTL)
models2 = cached_client.list_models()

Paginate Large Result Sets

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

def fetch_all_memories(client: ControlPlaneClient, batch_size: int = 100):
    """Fetch all memories with pagination."""
    all_memories = []
    skip = 0

    while True:
        batch = client.graph.list_memories(skip=skip, limit=batch_size)

        if not batch:
            break

        all_memories.extend(batch)
        skip += batch_size

        # Process batch immediately instead of loading all into memory
        # process_batch(batch)

    return all_memories

Use Async Storage for Large Data

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# ✅ GOOD - Async for large content
job = client.graph.store_memory_async(
    dataset_id="logs",
    context=large_log_data,  # Multiple MB of data
    metadata={"source": "application"}
)

# Continue with other work while storage completes
# ...

# ❌ BAD - Blocking for large content
memory = client.graph.store_memory(
    dataset_id="logs",
    context=large_log_data  # Blocks until complete
)

Error Handling

Catch Specific Exceptions

from kubiya import ControlPlaneClient
from kubiya.core.exceptions import (
    AuthenticationError as KubiyaAuthenticationError,
    TimeoutError as KubiyaTimeoutError,
    APIError as KubiyaAPIError
)
from kubiya.resources.exceptions import GraphError

client = ControlPlaneClient(api_key="your-api-key")

try:
    result = client.graph.intelligent_search(keywords="query")

except GraphError as e:
    # Handle graph-specific errors
    print(f"Graph error: {e}")

except KubiyaAuthenticationError as e:
    # Handle auth errors
    print(f"Authentication failed: {e}")

except KubiyaTimeoutError as e:
    # Handle timeouts
    print(f"Request timed out: {e}")

except KubiyaAPIError as e:
    # Handle general API errors
    print(f"API error: {e}")

Implement Retry Logic

from kubiya import ControlPlaneClient
from kubiya.core.exceptions import (
    ConnectionError as KubiyaConnectionError,
    TimeoutError as KubiyaTimeoutError
)
import time

def retry_operation(operation, max_attempts=3, backoff_factor=2):
    """Retry operation with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            return operation()

        except (KubiyaConnectionError, KubiyaTimeoutError) as e:
            if attempt == max_attempts - 1:
                raise

            delay = backoff_factor ** attempt
            print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
            time.sleep(delay)

# Usage
client = ControlPlaneClient(api_key="your-api-key")

result = retry_operation(
    lambda: client.graph.intelligent_search(keywords="query")
)

Clean Up Resources

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")
session_id = None

try:
    result = client.graph.intelligent_search(keywords="query")
    session_id = result["session_id"]

    # Use session...

finally:
    # Always clean up
    if session_id:
        try:
            client.graph.delete_search_session(session_id)
        except Exception:
            pass  # Ignore cleanup errors

Resource Management

Use Context Managers

from contextlib import contextmanager
from kubiya import ControlPlaneClient

@contextmanager
def temp_dataset(client: ControlPlaneClient, name: str):
    """Context manager for temporary dataset."""
    dataset = client.datasets.create_dataset(name=name, scope="user")

    try:
        yield dataset
    finally:
        # Automatic cleanup
        try:
            client.datasets.delete_dataset(dataset_id=dataset['id'])
        except Exception:
            pass

# Usage
client = ControlPlaneClient(api_key="your-api-key")

with temp_dataset(client, "temp-data") as dataset:
    # Use dataset
    client.graph.store_memory(
        dataset_id=dataset['id'],
        context="temporary data"
    )
# Dataset automatically deleted

Clean Up Search Sessions

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Track sessions for cleanup
active_sessions = []

try:
    # Create searches
    result1 = client.graph.intelligent_search(keywords="query1")
    active_sessions.append(result1["session_id"])

    result2 = client.graph.intelligent_search(keywords="query2")
    active_sessions.append(result2["session_id"])

finally:
    # Clean up all sessions
    for session_id in active_sessions:
        try:
            client.graph.delete_search_session(session_id)
        except Exception:
            pass

Monitor Dataset Status

from kubiya import ControlPlaneClient
import time

client = ControlPlaneClient(api_key="your-api-key")

# Create dataset
dataset = client.datasets.create_dataset(name="large-dataset", scope="org")

# Store large amount of data
for data in large_data_batches:
    client.graph.store_memory_async(
        dataset_id=dataset['id'],
        context=data
    )

# Wait for processing to complete
while True:
    status = client.datasets.get_dataset_status(dataset_id=dataset['id'])

    if status['status'] == 'ready':
        break

    time.sleep(5)

# Now safe to query
results = client.graph.semantic_search(
    query="test",
    filters={"dataset_id": dataset['id']}
)

Code Organization

Use Type Hints

from kubiya import ControlPlaneClient
from typing import List, Dict, Optional

def search_infrastructure(
    client: ControlPlaneClient,
    query: str,
    integration: Optional[str] = None
) -> Dict[str, any]:
    """Search infrastructure with type hints."""
    return client.graph.intelligent_search(
        keywords=query,
        integration=integration
    )

def batch_import_nodes(
    client: ControlPlaneClient,
    nodes: List[Dict[str, any]],
    dataset_id: str
) -> Dict[str, any]:
    """Import nodes with type hints."""
    return client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id=dataset_id
    )

Create Reusable Functions

from kubiya import ControlPlaneClient
from typing import List, Dict

def create_infrastructure_dataset(
    client: ControlPlaneClient,
    environment: str
) -> Dict:
    """Create dataset for specific environment."""
    return client.datasets.create_dataset(
        name=f"{environment}-infrastructure",
        description=f"Infrastructure data for {environment} environment",
        scope="org"
    )

def import_aws_resources(
    client: ControlPlaneClient,
    region: str,
    dataset_id: str
) -> Dict:
    """Import AWS resources from region."""
    # Fetch resources
    nodes = fetch_aws_resources(region)

    # Import to graph
    return client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id=dataset_id,
        duplicate_handling="update"
    )

Use Configuration Classes

from dataclasses import dataclass
from kubiya import ControlPlaneClient
from typing import Optional

@dataclass
class SearchConfig:
    """Configuration for intelligent search."""
    max_turns: int = 5
    temperature: float = 0.7
    enable_semantic_search: bool = True
    enable_cypher_queries: bool = False
    integration: Optional[str] = None

def search_with_config(
    client: ControlPlaneClient,
    query: str,
    config: SearchConfig
) -> dict:
    """Search using configuration object."""
    return client.graph.intelligent_search(
        keywords=query,
        max_turns=config.max_turns,
        temperature=config.temperature,
        enable_semantic_search=config.enable_semantic_search,
        enable_cypher_queries=config.enable_cypher_queries,
        integration=config.integration
    )

# Usage
client = ControlPlaneClient(api_key="your-api-key")
config = SearchConfig(max_turns=10, temperature=0.5)
result = search_with_config(client, "test query", config)

Testing

Write Unit Tests

import pytest
from kubiya import ControlPlaneClient
from unittest.mock import Mock

@pytest.fixture
def mock_client():
    """Create mock client for testing."""
    client = Mock(spec=ControlPlaneClient)
    client.graph.intelligent_search.return_value = {
        "answer": "Test answer",
        "nodes": [],
        "confidence": "high"
    }
    return client

def test_search_function(mock_client):
    """Test function using mocked client."""
    result = search_infrastructure(mock_client, "test query")

    assert result["answer"] == "Test answer"
    mock_client.graph.intelligent_search.assert_called_once()

Test Error Handling

import pytest
from kubiya.resources.exceptions import GraphError

def test_error_handling():
    """Test that errors are handled correctly."""
    # Your error handling test
    with pytest.raises(GraphError):
        # Operation that should fail
        pass

Use Test Fixtures

import pytest
from kubiya import ControlPlaneClient

@pytest.fixture(scope="session")
def client():
    """Provide test client."""
    return ControlPlaneClient(api_key="test-key")

@pytest.fixture
def test_dataset(client):
    """Create and cleanup test dataset."""
    dataset = client.datasets.create_dataset(
        name="test-dataset",
        scope="user"
    )

    yield dataset

    # Cleanup
    client.datasets.delete_dataset(dataset_id=dataset['id'])

Monitoring & Logging

Log Important Operations

import logging
from kubiya import ControlPlaneClient

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

client = ControlPlaneClient(api_key="your-api-key")

def import_infrastructure(region: str):
    """Import infrastructure with logging."""
    logger.info(f"Starting infrastructure import for region: {region}")

    try:
        nodes = fetch_resources(region)
        logger.info(f"Fetched {len(nodes)} resources")

        result = client.ingestion.ingest_nodes_batch(nodes=nodes)

        logger.info(
            f"Import completed: {result['summary']['success']} succeeded, "
            f"{result['summary']['failed']} failed"
        )

        return result

    except Exception as e:
        logger.error(f"Import failed: {e}", exc_info=True)
        raise

Track Metrics

from kubiya import ControlPlaneClient
from datetime import datetime
import time

class MetricsTracker:
    """Track SDK operation metrics."""

    def __init__(self):
        self.operations = []

    def track_operation(self, operation_name: str, duration: float, success: bool):
        """Record operation metrics."""
        self.operations.append({
            "name": operation_name,
            "duration": duration,
            "success": success,
            "timestamp": datetime.utcnow().isoformat()
        })

    def get_stats(self):
        """Get operation statistics."""
        if not self.operations:
            return {}

        return {
            "total_operations": len(self.operations),
            "success_rate": sum(1 for op in self.operations if op['success']) / len(self.operations),
            "average_duration": sum(op['duration'] for op in self.operations) / len(self.operations)
        }

# Usage
metrics = MetricsTracker()
client = ControlPlaneClient(api_key="your-api-key")

start = time.time()
try:
    result = client.graph.intelligent_search(keywords="query")
    metrics.track_operation("intelligent_search", time.time() - start, True)
except Exception:
    metrics.track_operation("intelligent_search", time.time() - start, False)

print(metrics.get_stats())

Documentation

Document Functions

from kubiya import ControlPlaneClient
from typing import List, Dict, Optional

def search_and_analyze(
    client: ControlPlaneClient,
    query: str,
    integration: Optional[str] = None
) -> Dict[str, any]:
    """
    Search and analyze infrastructure using intelligent search.

    Args:
        client: Control Plane client instance
        query: Natural language search query
        integration: Optional integration filter (e.g., "AWS", "GitHub")

    Returns:
        Dictionary containing:
        - answer: Natural language answer
        - nodes: List of matching nodes
        - analysis: Additional analysis results

    Raises:
        GraphError: If search operation fails
        AuthenticationError: If API key is invalid

    Example:
        >>> client = ControlPlaneClient(api_key="your-key")
        >>> result = search_and_analyze(client, "production databases")
        >>> print(result["answer"])
    """
    result = client.graph.intelligent_search(
        keywords=query,
        integration=integration
    )

    return {
        "answer": result["answer"],
        "nodes": result["nodes"],
        "analysis": analyze_nodes(result["nodes"])
    }

Add Code Comments for Complex Logic

from kubiya import ControlPlaneClient

def complex_data_migration(client: ControlPlaneClient):
    """Migrate data between datasets with validation."""

    # Step 1: Fetch all memories from source dataset
    # Using pagination to handle large datasets efficiently
    memories = []
    skip = 0
    batch_size = 100

    while True:
        batch = client.graph.list_memories(skip=skip, limit=batch_size)
        if not batch:
            break

        memories.extend(batch)
        skip += batch_size

    # Step 2: Transform memories for target dataset
    # Filter out test data and normalize format
    transformed = [
        transform_memory(m) for m in memories
        if not m.get('metadata', {}).get('is_test', False)
    ]

    # Step 3: Batch import to target dataset
    # Use async to avoid blocking on large imports
    for i in range(0, len(transformed), 100):
        batch = transformed[i:i+100]

        client.graph.store_memory_async(
            dataset_id="target-dataset",
            context=batch
        )

Next Steps