Skip to main content
Cognitive Memory enables agents to remember context across conversations and sessions. Store operational knowledge, incident history, and domain expertise that can be recalled later using natural language queries.

Overview

Cognitive Memory provides a persistent memory system for storing and recalling context:
  • Store Context: Save text content with metadata for later retrieval
  • Recall Memories: Find relevant memories using natural language queries
  • Relevance Scoring: Get memories ranked by relevance to your query
  • Dataset Organization: Organize memories into logical datasets
  • Async Operations: Non-blocking storage for large batches
Memories are stored in cognitive datasets and become searchable via semantic search and intelligent search features.

Quick Start

from kubiya import ControlPlaneClient

# Initialize the client
client = ControlPlaneClient(api_key="your-api-key")

# Store a memory
memory = client.graph.store_memory(
    dataset_id="prod-knowledge",
    context="Production deployment completed successfully at 2pm EST",
    metadata={"environment": "production", "team": "platform"}
)

print(f"Stored memory: {memory['memory_id']}")

# Recall memories
memories = client.graph.recall_memory(
    query="recent production deployments",
    limit=5
)

for mem in memories:
    print(f"[{mem['relevance_score']:.2f}] {mem['content']}")

Core Concepts

Memory Storage

Memories are text-based context stored with:
  • Content: The actual text to remember
  • Metadata: Optional structured data (tags, timestamps, etc.)
  • Dataset: Logical grouping for organization
  • Embeddings: Automatically generated for semantic search

Memory Recall

Recall uses semantic search to find relevant memories:
  • Query: Natural language question or keywords
  • Relevance Scoring: Memories ranked by semantic similarity
  • Filtering: Optional filtering by memory ID or metadata

Datasets

Memories are organized into datasets:
  • Scope: Organization, team, or user-level
  • Permissions: Control who can read/write
  • Lifecycle: Datasets can be created, listed, and deleted

Basic Usage

Store Memory (Blocking)

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Store operational knowledge
memory = client.graph.store_memory(
    dataset_id="ops-knowledge",
    context="""
    Incident Resolution: Database slow query issue in production.

    Symptoms: API response times increased from 200ms to 3000ms.
    Root Cause: Missing index on users table for email lookups.
    Resolution: Added index `idx_users_email` which reduced query time to 50ms.
    Impact: 45 minutes of degraded performance.
    """,
    metadata={
        "incident_id": "INC-2024-001",
        "severity": "high",
        "resolved_by": "ops-team",
        "timestamp": "2024-12-10T14:30:00Z"
    }
)

print(f"Memory ID: {memory['memory_id']}")
print(f"Dataset: {memory['dataset_id']}")
print(f"Status: {memory['status']}")
{
  "memory_id": "mem-abc123def456",
  "dataset_id": "ops-knowledge",
  "status": "completed",
  "metadata": {
    "incident_id": "INC-2024-001",
    "severity": "high",
    "resolved_by": "ops-team",
    "timestamp": "2024-12-10T14:30:00Z"
  }
}

Store Memory (Async)

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Store large context asynchronously
job = client.graph.store_memory_async(
    dataset_id="logs-dataset",
    context="Large batch of application logs...",  # Could be MB of data
    metadata={"source": "app-logs", "date": "2024-12-10"}
)

print(f"Job ID: {job['job_id']}")
print(f"Status: {job['status']}")  # "processing"

# Continue with other operations while storage completes
Use store_memory_async() for large content or when you don’t need to wait for completion. The memory will be available for recall once processing completes.

Recall Memories

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Search for relevant memories
memories = client.graph.recall_memory(
    query="database performance issues",
    limit=5
)

print(f"Found {len(memories)} relevant memories:\n")
for memory in memories:
    print(f"Relevance: {memory['relevance_score']:.2f}")
    print(f"Content: {memory['content'][:100]}...")
    print(f"Created: {memory['created_at']}")
    if memory.get('metadata'):
        print(f"Metadata: {memory['metadata']}")
    print("---\n")
[
  {
    "memory_id": "mem-abc123",
    "content": "Incident Resolution: Database slow query issue...",
    "relevance_score": 0.89,
    "metadata": {
      "incident_id": "INC-2024-001",
      "severity": "high"
    },
    "created_at": "2024-12-10T14:30:00Z"
  },
  {
    "memory_id": "mem-def456",
    "content": "Performance optimization: Added connection pooling...",
    "relevance_score": 0.76,
    "metadata": {
      "type": "optimization",
      "impact": "positive"
    },
    "created_at": "2024-12-08T09:15:00Z"
  }
]

Recall Specific Memory

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Retrieve specific memory by ID
memories = client.graph.recall_memory(
    query="",  # Empty query when using memory_id
    memory_id="mem-abc123def456"
)

if memories:
    memory = memories[0]
    print(f"Memory Content: {memory['content']}")

Retrieve Memories

To retrieve memories, use recall_memory() with a query:
from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Recall memories using a query (returns most relevant matches)
memories = client.graph.recall_memory(
    query="incident OR deployment OR error",  # Broad query to retrieve various memories
    limit=20
)

print(f"Found {len(memories)} memories\n")
for memory in memories:
    print(f"Memory ID: {memory['memory_id']}")
    print(f"Relevance: {memory['relevance_score']:.2f}")
    print(f"Created: {memory['created_at']}")
    print(f"Preview: {memory['content'][:80]}...")
    print("---")
The SDK does not provide a list_memories() method. Use recall_memory() with appropriate queries to retrieve memories.

Practical Examples

1. Store Incident History

Build a searchable incident knowledge base:
from kubiya import ControlPlaneClient
from datetime import datetime

def store_incident(
    client: ControlPlaneClient,
    incident_data: dict
):
    """Store incident details as a memory."""

    context = f"""
    Incident: {incident_data['title']}

    Description: {incident_data['description']}

    Timeline:
    - Detected: {incident_data['detected_at']}
    - Resolved: {incident_data['resolved_at']}
    - Duration: {incident_data['duration']}

    Root Cause: {incident_data['root_cause']}

    Resolution Steps:
    {chr(10).join(f"  {i+1}. {step}" for i, step in enumerate(incident_data['resolution_steps']))}

    Preventive Measures:
    {chr(10).join(f"  - {measure}" for measure in incident_data['preventive_measures'])}

    Impact: {incident_data['impact']}
    """

    memory = client.graph.store_memory(
        dataset_id="incident-history",
        context=context,
        metadata={
            "incident_id": incident_data['id'],
            "severity": incident_data['severity'],
            "affected_services": incident_data['affected_services'],
            "resolved_by": incident_data['resolved_by'],
            "timestamp": datetime.utcnow().isoformat()
        }
    )

    print(f"✅ Stored incident {incident_data['id']} as memory {memory['memory_id']}")
    return memory

# Usage
client = ControlPlaneClient(api_key="your-api-key")

incident = {
    "id": "INC-2024-042",
    "title": "API Gateway Rate Limiting Issue",
    "description": "API gateway rejecting legitimate requests due to misconfigured rate limits",
    "detected_at": "2024-12-10T10:15:00Z",
    "resolved_at": "2024-12-10T11:30:00Z",
    "duration": "1 hour 15 minutes",
    "root_cause": "Rate limit configured as 100 req/sec instead of 1000 req/sec",
    "resolution_steps": [
        "Identified rate limit configuration issue in Terraform",
        "Updated rate limit to 1000 req/sec",
        "Applied configuration via terraform apply",
        "Verified traffic resumed normally"
    ],
    "preventive_measures": [
        "Add rate limit validation in CI/CD pipeline",
        "Set up alerts for rejected requests > 5%"
    ],
    "impact": "5% of API requests rejected for 75 minutes",
    "severity": "high",
    "affected_services": ["api-gateway", "auth-service"],
    "resolved_by": "ops-team"
}

stored = store_incident(client, incident)

2. Recall Similar Incidents

Find similar historical incidents for current issues:
from kubiya import ControlPlaneClient

def find_similar_incidents(
    client: ControlPlaneClient,
    current_issue: str,
    limit: int = 3
):
    """Find similar historical incidents."""

    memories = client.graph.recall_memory(
        query=current_issue,
        limit=limit
    )

    if not memories:
        print("No similar incidents found in history")
        return []

    print(f"=== Similar Historical Incidents ===\n")
    print(f"Current Issue: {current_issue}\n")

    for i, memory in enumerate(memories, 1):
        print(f"{i}. Relevance Score: {memory['relevance_score']:.2f}")

        # Extract key information from metadata
        metadata = memory.get('metadata', {})
        print(f"   Incident ID: {metadata.get('incident_id', 'N/A')}")
        print(f"   Severity: {metadata.get('severity', 'N/A')}")
        print(f"   Date: {memory['created_at']}")

        # Show excerpt
        lines = memory['content'].split('\n')
        root_cause = next((line for line in lines if 'Root Cause:' in line), '')
        resolution = next((line for line in lines if 'Resolution Steps:' in line), '')

        if root_cause:
            print(f"   {root_cause.strip()}")
        print()

    return memories

# Usage
client = ControlPlaneClient(api_key="your-api-key")

# Find similar incidents for current problem
similar = find_similar_incidents(
    client,
    "API returning 500 errors under high load",
    limit=5
)

3. Store Deployment Context

Remember successful deployments and rollback procedures:
from kubiya import ControlPlaneClient

def store_deployment_context(
    client: ControlPlaneClient,
    deployment_data: dict
):
    """Store deployment as memory for future reference."""

    context = f"""
    Deployment: {deployment_data['service']} v{deployment_data['version']}

    Environment: {deployment_data['environment']}
    Deployed By: {deployment_data['deployed_by']}
    Deployment Time: {deployment_data['deployed_at']}

    Changes:
    {chr(10).join(f"  - {change}" for change in deployment_data['changes'])}

    Health Checks:
    {chr(10).join(f"  ✓ {check}" for check in deployment_data['health_checks'])}

    Rollback Command:
    {deployment_data['rollback_command']}

    Notes: {deployment_data.get('notes', 'None')}
    """

    memory = client.graph.store_memory(
        dataset_id="deployment-history",
        context=context,
        metadata={
            "service": deployment_data['service'],
            "version": deployment_data['version'],
            "environment": deployment_data['environment'],
            "status": deployment_data['status'],
            "deployed_by": deployment_data['deployed_by']
        }
    )

    return memory

# Usage
client = ControlPlaneClient(api_key="your-api-key")

deployment = {
    "service": "auth-service",
    "version": "2.5.0",
    "environment": "production",
    "deployed_by": "deploy-bot",
    "deployed_at": "2024-12-10T15:00:00Z",
    "changes": [
        "Added OAuth2 support",
        "Improved session management",
        "Fixed password reset bug"
    ],
    "health_checks": [
        "All pods healthy",
        "API endpoints responding",
        "Database connections stable"
    ],
    "rollback_command": "kubectl rollout undo deployment/auth-service -n production",
    "status": "successful",
    "notes": "Smooth deployment, no issues observed"
}

memory = store_deployment_context(client, deployment)
print(f"Deployment stored: {memory['memory_id']}")

4. Build Team Knowledge Base

Create searchable team knowledge:
from kubiya import ControlPlaneClient

def add_to_knowledge_base(
    client: ControlPlaneClient,
    title: str,
    content: str,
    category: str,
    tags: list
):
    """Add entry to team knowledge base."""

    formatted_content = f"""
    {title}
    {'=' * len(title)}

    {content}

    Category: {category}
    Tags: {', '.join(tags)}
    """

    memory = client.graph.store_memory(
        dataset_id="team-knowledge",
        context=formatted_content,
        metadata={
            "title": title,
            "category": category,
            "tags": tags
        }
    )

    print(f"✅ Added to knowledge base: {title}")
    return memory

# Usage
client = ControlPlaneClient(api_key="your-api-key")

# Add runbook
add_to_knowledge_base(
    client,
    title="How to Scale RDS Instance",
    content="""
    1. Create RDS snapshot for safety
    2. Modify instance class in AWS console or via CLI
    3. Apply changes during maintenance window
    4. Monitor performance after scaling
    5. Verify application connectivity

    Estimated Downtime: 5-10 minutes
    Cost Impact: Calculate based on new instance size
    """,
    category="runbook",
    tags=["rds", "scaling", "aws"]
)

# Add best practice
add_to_knowledge_base(
    client,
    title="Database Connection Pool Sizing",
    content="""
    Recommended formula: connections = ((core_count * 2) + effective_spindle_count)

    For most web applications:
    - Minimum pool size: 5
    - Maximum pool size: 20
    - Connection timeout: 30 seconds
    - Idle timeout: 10 minutes

    Monitor connection usage and adjust based on load patterns.
    """,
    category="best-practice",
    tags=["database", "performance", "configuration"]
)

5. Automated Memory Collection

Automatically store important events as memories:
from kubiya import ControlPlaneClient
import asyncio

class MemoryCollector:
    """Collect and store operational events as memories."""

    def __init__(self, client: ControlPlaneClient, dataset_id: str):
        self.client = client
        self.dataset_id = dataset_id

    def record_event(self, event_type: str, details: dict):
        """Record an event as a memory (async)."""
        context = f"{event_type}: {details.get('message', '')}"

        job = self.client.graph.store_memory_async(
            dataset_id=self.dataset_id,
            context=context,
            metadata={
                "event_type": event_type,
                **details
            }
        )

        print(f"📝 Recording {event_type} (job: {job['job_id']})")
        return job

    def recall_recent_events(self, event_type: str, limit: int = 10):
        """Recall recent events of a specific type."""
        query = f"{event_type} events"

        memories = self.client.graph.recall_memory(
            query=query,
            limit=limit
        )

        return [m for m in memories if m.get('metadata', {}).get('event_type') == event_type]

# Usage
client = ControlPlaneClient(api_key="your-api-key")
collector = MemoryCollector(client, "operational-events")

# Record various events
collector.record_event("deployment", {
    "message": "Deployed auth-service v2.5.0 to production",
    "service": "auth-service",
    "version": "2.5.0",
    "environment": "production"
})

collector.record_event("alert", {
    "message": "High CPU usage on web-server-03",
    "severity": "warning",
    "host": "web-server-03",
    "metric": "cpu",
    "value": "85%"
})

# Recall deployment events
deployments = collector.recall_recent_events("deployment", limit=5)
print(f"Recent deployments: {len(deployments)}")

Error Handling

from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import GraphError

client = ControlPlaneClient(api_key="your-api-key")

# Handle storage errors
try:
    memory = client.graph.store_memory(
        dataset_id="my-dataset",
        context="Important information"
    )
except GraphError as e:
    if "dataset not found" in str(e).lower():
        print("Dataset doesn't exist - create it first")
    else:
        print(f"Failed to store memory: {e}")

# Handle recall errors
try:
    memories = client.graph.recall_memory(query="test")
except GraphError as e:
    print(f"Failed to recall memories: {e}")

# Handle empty results
memories = client.graph.recall_memory(
    query="very specific query with no matches",
    limit=10
)

if not memories:
    print("No relevant memories found")
elif memories[0]['relevance_score'] < 0.5:
    print("Low relevance - consider rephrasing query")

Best Practices

1. Use Descriptive Context

# ❌ BAD - Too brief
client.graph.store_memory(
    dataset_id="ops",
    context="Fixed bug"
)

# ✅ GOOD - Detailed and searchable
client.graph.store_memory(
    dataset_id="ops",
    context="""
    Bug Fix: Authentication service returning 401 for valid tokens

    Issue: JWT validation failing due to clock skew between services
    Solution: Increased token validation tolerance to 30 seconds
    Impact: Resolved intermittent auth failures for 2% of requests
    """
)

2. Add Rich Metadata

memory = client.graph.store_memory(
    dataset_id="incidents",
    context="Incident details...",
    metadata={
        "incident_id": "INC-123",
        "severity": "high",
        "service": "api-gateway",
        "resolved_by": "ops-team",
        "duration_minutes": 45,
        "timestamp": "2024-12-10T10:00:00Z"
    }
)

3. Organize with Datasets

# Separate datasets for different purposes
client.graph.store_memory(dataset_id="incident-history", context="...")
client.graph.store_memory(dataset_id="deployment-logs", context="...")
client.graph.store_memory(dataset_id="team-knowledge", context="...")
client.graph.store_memory(dataset_id="customer-feedback", context="...")

4. Use Async for Large Batches

# Store large batches asynchronously
for log_entry in large_log_batch:
    client.graph.store_memory_async(
        dataset_id="logs",
        context=log_entry['message'],
        metadata=log_entry['metadata']
    )
# Continue with other work - storage happens in background

API Reference

Store Memory Methods

MethodDescriptionBlockingParametersReturns
store_memory()Store memory (synchronous)Yesdataset_id, context, metadataDict with memory_id
store_memory_async()Store memory (asynchronous)Nodataset_id, context, metadataDict with job_id

Recall Memory Methods

MethodDescriptionParametersReturns
recall_memory()Search for relevant memoriesquery, memory_id, limitList[Dict]

Memory Object Structure

{
    "memory_id": str,
    "content": str,
    "relevance_score": float,  # Only in recall_memory results
    "metadata": Dict,
    "created_at": str
}

Next Steps