> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kubiya.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Context Graph Service

> Query and explore your knowledge graph with nodes and relationships from various data sources

The Context Graph Service provides access to the knowledge graph containing entities and their relationships from various data sources. Query AWS, Azure, Github, Custom integrations, and any other data source to understand topology, find dependencies, and enable intelligent agent operations.

## Overview

The Context Graph is a graph database that stores:

* **Nodes**: Entities from any source (cloud resources, databases, custom data, CSV imports, etc.)
* **Relationships**: Connections between entities (CONTAINS, USES, MEMBER\_OF, custom relationships, etc.)
* **Properties**: Entity metadata and attributes from the source system

## Quick Start

```python theme={null}
from kubiya import ControlPlaneClient

cp_client = ControlPlaneClient(api_key="your-api-key")

# Get graph statistics
stats = cp_client.graph.get_stats()
print(f"Nodes: {stats['node_count']}")
print(f"Relationships: {stats['relationship_count']}")

# List entities from AWS integration
nodes = cp_client.graph.list_nodes(integration="aws", limit=10)
for node in nodes['nodes']:
    print(f"Entity: {node['id']}")

# Search for production entities
production_entities = cp_client.graph.search_nodes_by_text({
    "property_name": "resourcegroup",
    "search_text": "PRODUCTION"
})
print(f"Found {production_entities['count']} production entities")
```

## Graph Statistics

Get overview statistics about the graph:

```python theme={null}
stats = cp_client.graph.get_stats()

print(f"Total Nodes: {stats['node_count']}")
print(f"Total Relationships: {stats['relationship_count']}")
print(f"Node Types: {len(stats['labels'])}")
print(f"Relationship Types: {len(stats['relationship_types'])}")

# Example output:
# Total Nodes: 3306
# Total Relationships: 4937
# Node Types: 100
# Relationship Types: 57
```

### Filter by Integration

```python theme={null}
# AWS statistics
aws_stats = cp_client.graph.get_stats(integration="aws")

# Azure statistics
azure_stats = cp_client.graph.get_stats(integration="azure")
```

## List Nodes

### Basic Listing

```python theme={null}
# List all nodes (paginated)
nodes = cp_client.graph.list_nodes(skip=0, limit=100)

print(f"Total count: {nodes['count']}")
print(f"Returned: {len(nodes['nodes'])}")

for node in nodes['nodes']:
    print(f"ID: {node['id']}")
    print(f"Labels: {node['labels']}")
    print(f"Properties: {node.get('name', 'N/A')}")
```

### Filter by Integration

```python theme={null}
# List only AWS integration nodes
aws_nodes = cp_client.graph.list_nodes(
    integration="aws",
    skip=0,
    limit=50
)

# List only Azure integration nodes
azure_nodes = cp_client.graph.list_nodes(
    integration="azure",
    skip=0,
    limit=50
)

# List CSV integration nodes
csv_nodes = cp_client.graph.list_nodes(
    integration="csv",
    skip=0,
    limit=50
)

# List custom integration nodes
custom_nodes = cp_client.graph.list_nodes(
    integration="custom",
    skip=0,
    limit=50
)
```

### Pagination Example

```python theme={null}
def fetch_all_nodes(cp_client, integration=None):
    """Fetch all nodes with pagination"""
    all_nodes = []
    skip = 0
    limit = 100

    while True:
        result = cp_client.graph.list_nodes(
            integration=integration,
            skip=skip,
            limit=limit
        )

        all_nodes.extend(result['nodes'])

        if len(result['nodes']) < limit:
            break

        skip += limit

    return all_nodes

# Fetch all AWS nodes
aws_nodes = fetch_all_nodes(cp_client, integration="aws")
print(f"Total AWS nodes: {len(aws_nodes)}")
```

## Get Specific Node

Retrieve details for a specific node by ID:

```python theme={null}
# Get node by ID
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:0"
node = cp_client.graph.get_node(node_id)

print(f"Node ID: {node['id']}")
print(f"Labels: {node['labels']}")
print(f"Properties: {node}")

# With integration filter
node = cp_client.graph.get_node(
    node_id,
    integration="aws"
)
```

## Search Nodes

### Structured Search

Search nodes using structured filters:

```python theme={null}
# Search with filters
search_data = {
    "filters": {
        "labels": ["EC2Instance"],
        "properties": {
            "state": "running"
        }
    }
}

results = cp_client.graph.search_nodes(
    search_data=search_data,
    limit=50
)

print(f"Found {len(results['nodes'])} running EC2 instances")
```

### Text Search

Search nodes by property values:

```python theme={null}
# Search by resource group
production_nodes = cp_client.graph.search_nodes_by_text({
    "property_name": "resourcegroup",
    "search_text": "PRODUCTION"
})

print(f"Found {production_nodes['count']} production resources")
for node in production_nodes['nodes']:
    print(f"- {node.get('name')}: {node.get('resourcegroup')}")

# Search by name
db_nodes = cp_client.graph.search_nodes_by_text({
    "property_name": "name",
    "search_text": "database"
})

# Search by tags
tagged_nodes = cp_client.graph.search_nodes_by_text({
    "property_name": "tags",
    "search_text": "production"
})
```

### Advanced Search with Pagination

```python theme={null}
def search_all_text(cp_client, property_name, search_text, integration=None):
    """Search all nodes matching text criteria"""
    all_nodes = []
    skip = 0
    limit = 100

    while True:
        result = cp_client.graph.search_nodes_by_text(
            text_query={
                "property_name": property_name,
                "search_text": search_text
            },
            integration=integration,
            skip=skip,
            limit=limit
        )

        all_nodes.extend(result['nodes'])

        if len(result['nodes']) < limit:
            break

        skip += limit

    return all_nodes

# Find all production resources
prod_resources = search_all_text(
    cp_client,
    property_name="environment",
    search_text="production"
)
```

## Node Labels (Types)

List all node types in the graph:

```python theme={null}
labels = cp_client.graph.list_labels()

print(f"Total node types: {labels['count']}")
print("Available node types:")

for label in labels['labels']:
    print(f"- {label}")

# Example output:
# - EC2Instance
# - S3Bucket
# - RDSInstance
# - AzureDisk
# - AzureStorageAccount
# - KMSKey
# - EKSCluster
```

### Filter Labels by Integration

```python theme={null}
# AWS labels only
aws_labels = cp_client.graph.list_labels(integration="aws")

# Azure labels only
azure_labels = cp_client.graph.list_labels(integration="azure")
```

### Common Node Types

**AWS Integration:**

* `EC2Instance` - EC2 virtual machines
* `S3Bucket` - S3 storage buckets
* `RDSInstance` - RDS databases
* `EKSCluster` - Kubernetes clusters
* `KMSKey` - Encryption keys
* `AWSRole` - IAM roles
* `AWSUser` - IAM users
* `AWSPolicy` - IAM policies
* `AWSVpc` - Virtual private clouds
* `EC2SecurityGroup` - Security groups
* `LoadBalancerV2` - Application/Network load balancers

**Azure Integration:**

* `AzureDisk` - Managed disks
* `AzureStorageAccount` - Storage accounts
* `AzureStorageBlobContainer` - Blob containers
* `AzureResourceGroup` - Resource groups
* `AzureSubscription` - Azure subscriptions

**Custom Integrations:**

* Custom node types defined by your data source
* Types are determined by the data structure and labels in your integration

## Relationships

### Get Node Relationships

```python theme={null}
# Get all relationships for a node
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:0"
relationships = cp_client.graph.get_relationships(node_id)

print(f"Found {len(relationships['relationships'])} relationships")

for rel in relationships['relationships']:
    print(f"Type: {rel['type']}")
    print(f"From: {rel['from']}")
    print(f"To: {rel['to']}")
```

### Filter by Direction

```python theme={null}
# Only incoming relationships
incoming = cp_client.graph.get_relationships(
    node_id=node_id,
    direction="incoming"
)

# Only outgoing relationships
outgoing = cp_client.graph.get_relationships(
    node_id=node_id,
    direction="outgoing"
)

# Both directions (default)
both = cp_client.graph.get_relationships(
    node_id=node_id,
    direction="both"
)
```

### Filter by Relationship Type

```python theme={null}
# Only CONTAINS relationships
contains_rels = cp_client.graph.get_relationships(
    node_id=node_id,
    relationship_type="CONTAINS"
)

# Only MEMBER_OF relationships
member_rels = cp_client.graph.get_relationships(
    node_id=node_id,
    relationship_type="MEMBER_OF_AWS_VPC"
)
```

### List All Relationship Types

```python theme={null}
rel_types = cp_client.graph.list_relationship_types()

print(f"Total relationship types: {rel_types['count']}")
for rel_type in rel_types['relationship_types']:
    print(f"- {rel_type}")

# Example output:
# - CONTAINS
# - USES
# - MEMBER_OF_AWS_VPC
# - ATTACHED_TO
# - ASSOCIATED_WITH
# - POLICY
# - TAGGED
```

### Common Relationship Types

* `CONTAINS` - Container/containment relationships
* `USES` - Usage dependencies
* `MEMBER_OF_AWS_VPC` - VPC membership
* `ATTACHED_TO` - Attachment relationships (volumes, NICs)
* `ASSOCIATED_WITH` - General associations
* `POLICY` - Policy attachments
* `TAGGED` - Tag relationships
* `TRUSTS_AWS_PRINCIPAL` - IAM trust relationships
* `ALLOWS_TRAFFIC_FROM` - Security group rules
* `ROUTES_TO_GATEWAY` - Routing relationships

## Subgraph Queries

Get a subgraph (portion of the graph) starting from a specific node:

```python theme={null}
# Get subgraph with depth 3
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:100"
subgraph = cp_client.graph.get_subgraph(
    node_id=node_id,
    depth=3
)

print(f"Subgraph contains {len(subgraph['nodes'])} nodes")
print(f"Subgraph contains {len(subgraph['relationships'])} relationships")

# Visualize subgraph
for node in subgraph['nodes']:
    print(f"Node: {node['id']} - {node['labels']}")

for rel in subgraph['relationships']:
    print(f"Relationship: {rel['from']} -{rel['type']}-> {rel['to']}")
```

### Use Cases for Subgraphs

```python theme={null}
# Find all resources connected to a VPC
vpc_id = "vpc-123456"
vpc_subgraph = cp_client.graph.get_subgraph(
    node_id=vpc_id,
    depth=2
)
print(f"VPC contains {len(vpc_subgraph['nodes'])} connected resources")

# Find all resources in a resource group
rg_id = "rg-production"
rg_subgraph = cp_client.graph.get_subgraph(
    node_id=rg_id,
    depth=1
)
print(f"Resource group contains {len(rg_subgraph['nodes'])} resources")

# Analyze security group dependencies
sg_id = "sg-123456"
sg_subgraph = cp_client.graph.get_subgraph(
    node_id=sg_id,
    depth=2,
    integration="aws"
)
```

## Custom Cypher Queries

Execute custom Cypher queries for advanced graph operations:

```python theme={null}
# Count all nodes
query = {
    "query": "MATCH (n) RETURN count(n) as count"
}
result = cp_client.graph.execute_query(query)
print(f"Total nodes: {result['results'][0]['count']}")

# Find specific node types
query = {
    "query": """
        MATCH (n:EC2Instance)
        WHERE n.state = 'running'
        RETURN n.id, n.name, n.instanceType
        LIMIT 10
    """
}
result = cp_client.graph.execute_query(query)
for row in result['results']:
    print(f"Instance: {row['n.name']}, Type: {row['n.instanceType']}")

# Find relationships
query = {
    "query": """
        MATCH (s:S3Bucket)-[r:CONTAINS]->(o)
        RETURN s.name, type(r), o.name
        LIMIT 20
    """
}
result = cp_client.graph.execute_query(query)
for row in result['results']:
    print(f"{row['s.name']} -{row['type(r)']}-> {row['o.name']}")
```

### Advanced Cypher Examples

```python theme={null}
# Find all EC2 instances in a specific VPC
query = {
    "query": """
        MATCH (vpc:AWSVpc {id: $vpc_id})-[:CONTAINS*]->(ec2:EC2Instance)
        RETURN ec2.id, ec2.name, ec2.state
    """,
    "parameters": {
        "vpc_id": "vpc-123456"
    }
}
result = cp_client.graph.execute_query(query)

# Find security group rules
query = {
    "query": """
        MATCH (sg:EC2SecurityGroup)-[r:ALLOWS_TRAFFIC_FROM]->(source)
        WHERE sg.name CONTAINS 'production'
        RETURN sg.name, r.port, r.protocol, source.name
    """
}
result = cp_client.graph.execute_query(query)

# Find orphaned resources
query = {
    "query": """
        MATCH (n)
        WHERE NOT (n)-[]-()
        RETURN labels(n), n.id, n.name
        LIMIT 50
    """
}
result = cp_client.graph.execute_query(query)
```

## Integration Management

List available integrations in the graph:

```python theme={null}
integrations = cp_client.graph.list_integrations()

print(f"Total integrations: {integrations['count']}")
for integration in integrations['integrations']:
    print(f"- {integration}")

# Example output:
# - Aws
# - Azure
# - Csv
# - Custom
```

## Health Check

Check the health of the Context Graph service:

```python theme={null}
health = cp_client.graph.health()
print(f"Graph service status: {health['status']}")
# Output: healthy
```

## Practical Use Cases

### 1. Find All Production Entities

```python theme={null}
def find_production_entities(cp_client):
    """Find all entities tagged or named with 'production'"""
    # Search by resource group
    rg_results = cp_client.graph.search_nodes_by_text({
        "property_name": "resourcegroup",
        "search_text": "production"
    })

    # Search by tags
    tag_results = cp_client.graph.search_nodes_by_text({
        "property_name": "tags",
        "search_text": "production"
    })

    # Search by name
    name_results = cp_client.graph.search_nodes_by_text({
        "property_name": "name",
        "search_text": "production"
    })

    # Combine results
    all_nodes = (
        rg_results['nodes'] +
        tag_results['nodes'] +
        name_results['nodes']
    )

    # Deduplicate by ID
    unique_nodes = {node['id']: node for node in all_nodes}
    return list(unique_nodes.values())

production_entities = find_production_entities(cp_client)
print(f"Found {len(production_entities)} production entities")
```

### 2. Analyze Entity Dependencies

```python theme={null}
def analyze_dependencies(cp_client, entity_id):
    """Analyze dependencies for an entity"""
    # Get the entity
    entity = cp_client.graph.get_node(entity_id)

    # Get all relationships
    relationships = cp_client.graph.get_relationships(
        entity_id,
        direction="both"
    )

    # Categorize relationships
    dependencies = {
        'incoming': [],
        'outgoing': []
    }

    for rel in relationships['relationships']:
        if rel['from'] == entity_id:
            dependencies['outgoing'].append(rel)
        else:
            dependencies['incoming'].append(rel)

    print(f"Entity: {entity['id']}")
    print(f"Incoming dependencies: {len(dependencies['incoming'])}")
    print(f"Outgoing dependencies: {len(dependencies['outgoing'])}")

    return dependencies

deps = analyze_dependencies(cp_client, "vpc-123456")
```

### 3. Find Unused Entities

```python theme={null}
def find_unused_entities(cp_client, node_type):
    """Find entities with no relationships"""
    query = {
        "query": f"""
            MATCH (n:{node_type})
            WHERE NOT (n)-[]-()
            RETURN n.id, n.name, labels(n)
        """
    }

    result = cp_client.graph.execute_query(query)

    unused = []
    for row in result['results']:
        unused.append({
            'id': row['n.id'],
            'name': row.get('n.name', 'N/A'),
            'labels': row['labels(n)']
        })

    return unused

# Find unused S3 buckets
unused_buckets = find_unused_entities(cp_client, "S3Bucket")
print(f"Unused S3 buckets: {len(unused_buckets)}")
```

### 4. Security Audit

```python theme={null}
def audit_security_groups(cp_client):
    """Audit security groups for overly permissive rules"""
    query = {
        "query": """
            MATCH (sg:EC2SecurityGroup)-[r:ALLOWS_TRAFFIC_FROM]->(source)
            WHERE r.cidr = '0.0.0.0/0'
            RETURN sg.name, sg.id, r.port, r.protocol
        """
    }

    result = cp_client.graph.execute_query(query)

    risky_groups = []
    for row in result['results']:
        risky_groups.append({
            'name': row['sg.name'],
            'id': row['sg.id'],
            'port': row['r.port'],
            'protocol': row['r.protocol']
        })

    return risky_groups

risky = audit_security_groups(cp_client)
print(f"Found {len(risky)} security groups with open access")
```

## Error Handling

```python theme={null}
from kubiya.resources.exceptions import GraphError

try:
    nodes = cp_client.graph.list_nodes(limit=100)
except GraphError as e:
    print(f"Graph operation failed: {e}")
    # Handle error (retry, log, alert, etc.)
```

## Best Practices

### 1. Use Pagination

```python theme={null}
# Always paginate for large result sets
skip = 0
limit = 100

while True:
    nodes = cp_client.graph.list_nodes(skip=skip, limit=limit)
    if not nodes['nodes']:
        break

    process_nodes(nodes['nodes'])
    skip += limit
```

### 2. Filter by Integration

```python theme={null}
# Filter early to reduce data transfer
aws_nodes = cp_client.graph.list_nodes(integration="aws", limit=50)
azure_nodes = cp_client.graph.list_nodes(integration="azure", limit=50)
```

### 3. Use Specific Queries

```python theme={null}
# Instead of fetching all nodes and filtering client-side
# Use text search or Cypher queries

# Bad (fetches all nodes)
all_nodes = cp_client.graph.list_nodes(limit=10000)
prod_nodes = [n for n in all_nodes['nodes'] if 'prod' in n.get('name', '')]

# Good (server-side filtering)
prod_nodes = cp_client.graph.search_nodes_by_text({
    "property_name": "name",
    "search_text": "prod"
})
```

## API Reference

### Methods

| Method                                                                               | Description                    |
| ------------------------------------------------------------------------------------ | ------------------------------ |
| `health()`                                                                           | Check graph service health     |
| `list_nodes(integration, skip, limit)`                                               | List all nodes with pagination |
| `get_node(node_id, integration)`                                                     | Get specific node by ID        |
| `search_nodes(search_data, integration, skip, limit)`                                | Structured node search         |
| `search_nodes_by_text(text_query, integration, skip, limit)`                         | Text-based node search         |
| `get_relationships(node_id, direction, relationship_type, integration, skip, limit)` | Get node relationships         |
| `get_subgraph(node_id, depth, integration)`                                          | Get subgraph from node         |
| `list_labels(integration, skip, limit)`                                              | List all node types            |
| `list_relationship_types(integration, skip, limit)`                                  | List all relationship types    |
| `get_stats(integration, skip, limit)`                                                | Get graph statistics           |
| `execute_query(query)`                                                               | Execute custom Cypher query    |
| `list_integrations(skip, limit)`                                                     | List available integrations    |

## Next Steps

<CardGroup cols={2}>
  <Card title="Control Plane Overview →" href="/sdk/control-plane-client-overview" icon="network-wired">
    Learn about Control Plane services
  </Card>

  <Card title="API Reference →" href="/sdk/api-reference" icon="book">
    Complete SDK API reference
  </Card>
</CardGroup>
