Documentation Index Fetch the complete documentation index at: https://docs.kubiya.ai/llms.txt
Use this file to discover all available pages before exploring further.
The Context Graph Service provides access to the knowledge graph containing entities and their relationships from various data sources. Query AWS, Azure, Github, Custom integrations, and any other data source to understand topology, find dependencies, and enable intelligent agent operations.
Overview
The Context Graph is a graph database that stores:
Nodes : Entities from any source (cloud resources, databases, custom data, CSV imports, etc.)
Relationships : Connections between entities (CONTAINS, USES, MEMBER_OF, custom relationships, etc.)
Properties : Entity metadata and attributes from the source system
Quick Start
from kubiya import ControlPlaneClient
cp_client = ControlPlaneClient( api_key = "your-api-key" )
# Get graph statistics
stats = cp_client.graph.get_stats()
print ( f "Nodes: { stats[ 'node_count' ] } " )
print ( f "Relationships: { stats[ 'relationship_count' ] } " )
# List entities from AWS integration
nodes = cp_client.graph.list_nodes( integration = "aws" , limit = 10 )
for node in nodes[ 'nodes' ]:
print ( f "Entity: { node[ 'id' ] } " )
# Search for production entities
production_entities = cp_client.graph.search_nodes_by_text({
"property_name" : "resourcegroup" ,
"search_text" : "PRODUCTION"
})
print ( f "Found { production_entities[ 'count' ] } production entities" )
Graph Statistics
Get overview statistics about the graph:
stats = cp_client.graph.get_stats()
print ( f "Total Nodes: { stats[ 'node_count' ] } " )
print ( f "Total Relationships: { stats[ 'relationship_count' ] } " )
print ( f "Node Types: { len (stats[ 'labels' ]) } " )
print ( f "Relationship Types: { len (stats[ 'relationship_types' ]) } " )
# Example output:
# Total Nodes: 3306
# Total Relationships: 4937
# Node Types: 100
# Relationship Types: 57
Filter by Integration
# AWS statistics
aws_stats = cp_client.graph.get_stats( integration = "aws" )
# Azure statistics
azure_stats = cp_client.graph.get_stats( integration = "azure" )
List Nodes
Basic Listing
# List all nodes (paginated)
nodes = cp_client.graph.list_nodes( skip = 0 , limit = 100 )
print ( f "Total count: { nodes[ 'count' ] } " )
print ( f "Returned: { len (nodes[ 'nodes' ]) } " )
for node in nodes[ 'nodes' ]:
print ( f "ID: { node[ 'id' ] } " )
print ( f "Labels: { node[ 'labels' ] } " )
print ( f "Properties: { node.get( 'name' , 'N/A' ) } " )
Filter by Integration
# List only AWS integration nodes
aws_nodes = cp_client.graph.list_nodes(
integration = "aws" ,
skip = 0 ,
limit = 50
)
# List only Azure integration nodes
azure_nodes = cp_client.graph.list_nodes(
integration = "azure" ,
skip = 0 ,
limit = 50
)
# List CSV integration nodes
csv_nodes = cp_client.graph.list_nodes(
integration = "csv" ,
skip = 0 ,
limit = 50
)
# List custom integration nodes
custom_nodes = cp_client.graph.list_nodes(
integration = "custom" ,
skip = 0 ,
limit = 50
)
def fetch_all_nodes ( cp_client , integration = None ):
"""Fetch all nodes with pagination"""
all_nodes = []
skip = 0
limit = 100
while True :
result = cp_client.graph.list_nodes(
integration = integration,
skip = skip,
limit = limit
)
all_nodes.extend(result[ 'nodes' ])
if len (result[ 'nodes' ]) < limit:
break
skip += limit
return all_nodes
# Fetch all AWS nodes
aws_nodes = fetch_all_nodes(cp_client, integration = "aws" )
print ( f "Total AWS nodes: { len (aws_nodes) } " )
Get Specific Node
Retrieve details for a specific node by ID:
# Get node by ID
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:0"
node = cp_client.graph.get_node(node_id)
print ( f "Node ID: { node[ 'id' ] } " )
print ( f "Labels: { node[ 'labels' ] } " )
print ( f "Properties: { node } " )
# With integration filter
node = cp_client.graph.get_node(
node_id,
integration = "aws"
)
Search Nodes
Structured Search
Search nodes using structured filters:
# Search with filters
search_data = {
"filters" : {
"labels" : [ "EC2Instance" ],
"properties" : {
"state" : "running"
}
}
}
results = cp_client.graph.search_nodes(
search_data = search_data,
limit = 50
)
print ( f "Found { len (results[ 'nodes' ]) } running EC2 instances" )
Text Search
Search nodes by property values:
# Search by resource group
production_nodes = cp_client.graph.search_nodes_by_text({
"property_name" : "resourcegroup" ,
"search_text" : "PRODUCTION"
})
print ( f "Found { production_nodes[ 'count' ] } production resources" )
for node in production_nodes[ 'nodes' ]:
print ( f "- { node.get( 'name' ) } : { node.get( 'resourcegroup' ) } " )
# Search by name
db_nodes = cp_client.graph.search_nodes_by_text({
"property_name" : "name" ,
"search_text" : "database"
})
# Search by tags
tagged_nodes = cp_client.graph.search_nodes_by_text({
"property_name" : "tags" ,
"search_text" : "production"
})
def search_all_text ( cp_client , property_name , search_text , integration = None ):
"""Search all nodes matching text criteria"""
all_nodes = []
skip = 0
limit = 100
while True :
result = cp_client.graph.search_nodes_by_text(
text_query = {
"property_name" : property_name,
"search_text" : search_text
},
integration = integration,
skip = skip,
limit = limit
)
all_nodes.extend(result[ 'nodes' ])
if len (result[ 'nodes' ]) < limit:
break
skip += limit
return all_nodes
# Find all production resources
prod_resources = search_all_text(
cp_client,
property_name = "environment" ,
search_text = "production"
)
Node Labels (Types)
List all node types in the graph:
labels = cp_client.graph.list_labels()
print ( f "Total node types: { labels[ 'count' ] } " )
print ( "Available node types:" )
for label in labels[ 'labels' ]:
print ( f "- { label } " )
# Example output:
# - EC2Instance
# - S3Bucket
# - RDSInstance
# - AzureDisk
# - AzureStorageAccount
# - KMSKey
# - EKSCluster
Filter Labels by Integration
# AWS labels only
aws_labels = cp_client.graph.list_labels( integration = "aws" )
# Azure labels only
azure_labels = cp_client.graph.list_labels( integration = "azure" )
Common Node Types
AWS Integration:
EC2Instance - EC2 virtual machines
S3Bucket - S3 storage buckets
RDSInstance - RDS databases
EKSCluster - Kubernetes clusters
KMSKey - Encryption keys
AWSRole - IAM roles
AWSUser - IAM users
AWSPolicy - IAM policies
AWSVpc - Virtual private clouds
EC2SecurityGroup - Security groups
LoadBalancerV2 - Application/Network load balancers
Azure Integration:
AzureDisk - Managed disks
AzureStorageAccount - Storage accounts
AzureStorageBlobContainer - Blob containers
AzureResourceGroup - Resource groups
AzureSubscription - Azure subscriptions
Custom Integrations:
Custom node types defined by your data source
Types are determined by the data structure and labels in your integration
Relationships
Get Node Relationships
# Get all relationships for a node
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:0"
relationships = cp_client.graph.get_relationships(node_id)
print ( f "Found { len (relationships[ 'relationships' ]) } relationships" )
for rel in relationships[ 'relationships' ]:
print ( f "Type: { rel[ 'type' ] } " )
print ( f "From: { rel[ 'from' ] } " )
print ( f "To: { rel[ 'to' ] } " )
Filter by Direction
# Only incoming relationships
incoming = cp_client.graph.get_relationships(
node_id = node_id,
direction = "incoming"
)
# Only outgoing relationships
outgoing = cp_client.graph.get_relationships(
node_id = node_id,
direction = "outgoing"
)
# Both directions (default)
both = cp_client.graph.get_relationships(
node_id = node_id,
direction = "both"
)
Filter by Relationship Type
# Only CONTAINS relationships
contains_rels = cp_client.graph.get_relationships(
node_id = node_id,
relationship_type = "CONTAINS"
)
# Only MEMBER_OF relationships
member_rels = cp_client.graph.get_relationships(
node_id = node_id,
relationship_type = "MEMBER_OF_AWS_VPC"
)
List All Relationship Types
rel_types = cp_client.graph.list_relationship_types()
print ( f "Total relationship types: { rel_types[ 'count' ] } " )
for rel_type in rel_types[ 'relationship_types' ]:
print ( f "- { rel_type } " )
# Example output:
# - CONTAINS
# - USES
# - MEMBER_OF_AWS_VPC
# - ATTACHED_TO
# - ASSOCIATED_WITH
# - POLICY
# - TAGGED
Common Relationship Types
CONTAINS - Container/containment relationships
USES - Usage dependencies
MEMBER_OF_AWS_VPC - VPC membership
ATTACHED_TO - Attachment relationships (volumes, NICs)
ASSOCIATED_WITH - General associations
POLICY - Policy attachments
TAGGED - Tag relationships
TRUSTS_AWS_PRINCIPAL - IAM trust relationships
ALLOWS_TRAFFIC_FROM - Security group rules
ROUTES_TO_GATEWAY - Routing relationships
Subgraph Queries
Get a subgraph (portion of the graph) starting from a specific node:
# Get subgraph with depth 3
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:100"
subgraph = cp_client.graph.get_subgraph(
node_id = node_id,
depth = 3
)
print ( f "Subgraph contains { len (subgraph[ 'nodes' ]) } nodes" )
print ( f "Subgraph contains { len (subgraph[ 'relationships' ]) } relationships" )
# Visualize subgraph
for node in subgraph[ 'nodes' ]:
print ( f "Node: { node[ 'id' ] } - { node[ 'labels' ] } " )
for rel in subgraph[ 'relationships' ]:
print ( f "Relationship: { rel[ 'from' ] } - { rel[ 'type' ] } -> { rel[ 'to' ] } " )
Use Cases for Subgraphs
# Find all resources connected to a VPC
vpc_id = "vpc-123456"
vpc_subgraph = cp_client.graph.get_subgraph(
node_id = vpc_id,
depth = 2
)
print ( f "VPC contains { len (vpc_subgraph[ 'nodes' ]) } connected resources" )
# Find all resources in a resource group
rg_id = "rg-production"
rg_subgraph = cp_client.graph.get_subgraph(
node_id = rg_id,
depth = 1
)
print ( f "Resource group contains { len (rg_subgraph[ 'nodes' ]) } resources" )
# Analyze security group dependencies
sg_id = "sg-123456"
sg_subgraph = cp_client.graph.get_subgraph(
node_id = sg_id,
depth = 2 ,
integration = "aws"
)
Custom Cypher Queries
Execute custom Cypher queries for advanced graph operations:
# Count all nodes
query = {
"query" : "MATCH (n) RETURN count(n) as count"
}
result = cp_client.graph.execute_query(query)
print ( f "Total nodes: { result[ 'results' ][ 0 ][ 'count' ] } " )
# Find specific node types
query = {
"query" : """
MATCH (n:EC2Instance)
WHERE n.state = 'running'
RETURN n.id, n.name, n.instanceType
LIMIT 10
"""
}
result = cp_client.graph.execute_query(query)
for row in result[ 'results' ]:
print ( f "Instance: { row[ 'n.name' ] } , Type: { row[ 'n.instanceType' ] } " )
# Find relationships
query = {
"query" : """
MATCH (s:S3Bucket)-[r:CONTAINS]->(o)
RETURN s.name, type(r), o.name
LIMIT 20
"""
}
result = cp_client.graph.execute_query(query)
for row in result[ 'results' ]:
print ( f " { row[ 's.name' ] } - { row[ 'type(r)' ] } -> { row[ 'o.name' ] } " )
Advanced Cypher Examples
# Find all EC2 instances in a specific VPC
query = {
"query" : """
MATCH (vpc:AWSVpc {id: $vpc_id} )-[:CONTAINS*]->(ec2:EC2Instance)
RETURN ec2.id, ec2.name, ec2.state
""" ,
"parameters" : {
"vpc_id" : "vpc-123456"
}
}
result = cp_client.graph.execute_query(query)
# Find security group rules
query = {
"query" : """
MATCH (sg:EC2SecurityGroup)-[r:ALLOWS_TRAFFIC_FROM]->(source)
WHERE sg.name CONTAINS 'production'
RETURN sg.name, r.port, r.protocol, source.name
"""
}
result = cp_client.graph.execute_query(query)
# Find orphaned resources
query = {
"query" : """
MATCH (n)
WHERE NOT (n)-[]-()
RETURN labels(n), n.id, n.name
LIMIT 50
"""
}
result = cp_client.graph.execute_query(query)
Integration Management
List available integrations in the graph:
integrations = cp_client.graph.list_integrations()
print ( f "Total integrations: { integrations[ 'count' ] } " )
for integration in integrations[ 'integrations' ]:
print ( f "- { integration } " )
# Example output:
# - Aws
# - Azure
# - Csv
# - Custom
Health Check
Check the health of the Context Graph service:
health = cp_client.graph.health()
print ( f "Graph service status: { health[ 'status' ] } " )
# Output: healthy
Practical Use Cases
1. Find All Production Entities
def find_production_entities ( cp_client ):
"""Find all entities tagged or named with 'production'"""
# Search by resource group
rg_results = cp_client.graph.search_nodes_by_text({
"property_name" : "resourcegroup" ,
"search_text" : "production"
})
# Search by tags
tag_results = cp_client.graph.search_nodes_by_text({
"property_name" : "tags" ,
"search_text" : "production"
})
# Search by name
name_results = cp_client.graph.search_nodes_by_text({
"property_name" : "name" ,
"search_text" : "production"
})
# Combine results
all_nodes = (
rg_results[ 'nodes' ] +
tag_results[ 'nodes' ] +
name_results[ 'nodes' ]
)
# Deduplicate by ID
unique_nodes = {node[ 'id' ]: node for node in all_nodes}
return list (unique_nodes.values())
production_entities = find_production_entities(cp_client)
print ( f "Found { len (production_entities) } production entities" )
2. Analyze Entity Dependencies
def analyze_dependencies ( cp_client , entity_id ):
"""Analyze dependencies for an entity"""
# Get the entity
entity = cp_client.graph.get_node(entity_id)
# Get all relationships
relationships = cp_client.graph.get_relationships(
entity_id,
direction = "both"
)
# Categorize relationships
dependencies = {
'incoming' : [],
'outgoing' : []
}
for rel in relationships[ 'relationships' ]:
if rel[ 'from' ] == entity_id:
dependencies[ 'outgoing' ].append(rel)
else :
dependencies[ 'incoming' ].append(rel)
print ( f "Entity: { entity[ 'id' ] } " )
print ( f "Incoming dependencies: { len (dependencies[ 'incoming' ]) } " )
print ( f "Outgoing dependencies: { len (dependencies[ 'outgoing' ]) } " )
return dependencies
deps = analyze_dependencies(cp_client, "vpc-123456" )
3. Find Unused Entities
def find_unused_entities ( cp_client , node_type ):
"""Find entities with no relationships"""
query = {
"query" : f """
MATCH (n: { node_type } )
WHERE NOT (n)-[]-()
RETURN n.id, n.name, labels(n)
"""
}
result = cp_client.graph.execute_query(query)
unused = []
for row in result[ 'results' ]:
unused.append({
'id' : row[ 'n.id' ],
'name' : row.get( 'n.name' , 'N/A' ),
'labels' : row[ 'labels(n)' ]
})
return unused
# Find unused S3 buckets
unused_buckets = find_unused_entities(cp_client, "S3Bucket" )
print ( f "Unused S3 buckets: { len (unused_buckets) } " )
4. Security Audit
def audit_security_groups ( cp_client ):
"""Audit security groups for overly permissive rules"""
query = {
"query" : """
MATCH (sg:EC2SecurityGroup)-[r:ALLOWS_TRAFFIC_FROM]->(source)
WHERE r.cidr = '0.0.0.0/0'
RETURN sg.name, sg.id, r.port, r.protocol
"""
}
result = cp_client.graph.execute_query(query)
risky_groups = []
for row in result[ 'results' ]:
risky_groups.append({
'name' : row[ 'sg.name' ],
'id' : row[ 'sg.id' ],
'port' : row[ 'r.port' ],
'protocol' : row[ 'r.protocol' ]
})
return risky_groups
risky = audit_security_groups(cp_client)
print ( f "Found { len (risky) } security groups with open access" )
Error Handling
from kubiya.resources.exceptions import GraphError
try :
nodes = cp_client.graph.list_nodes( limit = 100 )
except GraphError as e:
print ( f "Graph operation failed: { e } " )
# Handle error (retry, log, alert, etc.)
Best Practices
# Always paginate for large result sets
skip = 0
limit = 100
while True :
nodes = cp_client.graph.list_nodes( skip = skip, limit = limit)
if not nodes[ 'nodes' ]:
break
process_nodes(nodes[ 'nodes' ])
skip += limit
2. Filter by Integration
# Filter early to reduce data transfer
aws_nodes = cp_client.graph.list_nodes( integration = "aws" , limit = 50 )
azure_nodes = cp_client.graph.list_nodes( integration = "azure" , limit = 50 )
3. Use Specific Queries
# Instead of fetching all nodes and filtering client-side
# Use text search or Cypher queries
# Bad (fetches all nodes)
all_nodes = cp_client.graph.list_nodes( limit = 10000 )
prod_nodes = [n for n in all_nodes[ 'nodes' ] if 'prod' in n.get( 'name' , '' )]
# Good (server-side filtering)
prod_nodes = cp_client.graph.search_nodes_by_text({
"property_name" : "name" ,
"search_text" : "prod"
})
API Reference
Methods
Method Description health()Check graph service health list_nodes(integration, skip, limit)List all nodes with pagination get_node(node_id, integration)Get specific node by ID search_nodes(search_data, integration, skip, limit)Structured node search search_nodes_by_text(text_query, integration, skip, limit)Text-based node search get_relationships(node_id, direction, relationship_type, integration, skip, limit)Get node relationships get_subgraph(node_id, depth, integration)Get subgraph from node list_labels(integration, skip, limit)List all node types list_relationship_types(integration, skip, limit)List all relationship types get_stats(integration, skip, limit)Get graph statistics execute_query(query)Execute custom Cypher query list_integrations(skip, limit)List available integrations
Next Steps
Control Plane Overview → Learn about Control Plane services
API Reference → Complete SDK API reference