Graph Ingestion enables you to populate the context graph with custom data from your infrastructure, applications, and integrations. Import nodes and relationships in single or batch operations.
Overview
The Ingestion Service provides APIs to load data into the context graph:- Ingest Nodes: Add individual or batches of nodes (up to 1000 per batch)
- Ingest Relationships: Create connections between nodes
- Duplicate Handling: Control how duplicates are managed (error, skip, update, merge)
- Transactional Mode: All-or-nothing batch operations
- Dataset Association: Organize ingested data into logical datasets
Use batch operations for importing large amounts of data efficiently. Batch ingestion is significantly faster than individual imports.
Quick Start
Copy
Ask AI
from kubiya import ControlPlaneClient
# Initialize the client
client = ControlPlaneClient(api_key="your-api-key")
# Ingest a single node
result = client.ingestion.ingest_node(
id="server-01",
labels=["Server", "Production"],
properties={
"name": "prod-web-01",
"ip": "10.0.1.10",
"region": "us-east-1"
},
duplicate_handling="skip"
)
# Ingest nodes in batch
batch_result = client.ingestion.ingest_nodes_batch(
nodes=[
{
"id": "server-02",
"labels": ["Server"],
"properties": {"name": "prod-web-02"}
},
{
"id": "server-03",
"labels": ["Server"],
"properties": {"name": "prod-web-03"}
}
]
)
print(f"Batch: {batch_result['summary']['success']} succeeded")
Core Concepts
Nodes
Nodes represent entities in the graph:- ID: Unique identifier (string)
- Labels: Categories/types (e.g., [“Server”, “AWS”])
- Properties: Key-value attributes
Relationships
Relationships connect nodes:- Source ID: Starting node
- Target ID: Ending node
- Relationship Type: Connection type (e.g., “CONNECTS_TO”)
- Properties: Optional relationship metadata
Duplicate Handling Strategies
error: Fail if node exists (default)skip: Ignore if exists, continueupdate: Replace existing nodemerge: Merge properties with existing
Transactional Mode
true: Roll back all if any operation failsfalse: Process individually, some may succeed
Basic Usage
Ingest Single Node
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# Ingest node with duplicate handling
result = client.ingestion.ingest_node(
id="db-prod-01",
labels=["Database", "PostgreSQL", "Production"],
properties={
"name": "production-db",
"version": "14.5",
"size": "db.r5.xlarge",
"region": "us-east-1",
"multi_az": True
},
duplicate_handling="skip"
)
print(f"Success: {result['success']}")
print(f"Node ID: {result['node_id']}")
print(f"Created: {result['created']}")
Example Response
Example Response
Copy
Ask AI
{
"success": true,
"node_id": "db-prod-01",
"created": true,
"message": "Node ingested successfully"
}
Ingest Node with Dataset
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# Associate with dataset for organization
result = client.ingestion.ingest_node(
id="app-service-01",
labels=["Service", "Application"],
properties={"name": "auth-service", "port": 8080},
dataset_id="production-infrastructure",
duplicate_handling="update"
)
Ingest Nodes in Batch
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# Prepare batch of nodes
nodes = [
{
"id": "server-001",
"labels": ["Server", "AWS", "EC2"],
"properties": {
"name": "web-server-01",
"instance_type": "t3.medium",
"region": "us-east-1"
}
},
{
"id": "server-002",
"labels": ["Server", "AWS", "EC2"],
"properties": {
"name": "web-server-02",
"instance_type": "t3.medium",
"region": "us-west-2"
}
},
{
"id": "lb-001",
"labels": ["LoadBalancer", "AWS", "ALB"],
"properties": {
"name": "prod-alb",
"type": "application",
"scheme": "internet-facing"
}
}
]
# Batch ingest with options
result = client.ingestion.ingest_nodes_batch(
nodes=nodes,
duplicate_handling="skip",
transactional=False # Continue even if some fail
)
# Review results
print(f"Total: {result['summary']['total']}")
print(f"Success: {result['summary']['success']}")
print(f"Failed: {result['summary']['failed']}")
print(f"Skipped: {result['summary']['skipped']}")
if result.get('errors'):
print("\nErrors:")
for error in result['errors']:
print(f" - {error}")
Example Response
Example Response
Copy
Ask AI
{
"summary": {
"total": 3,
"success": 3,
"failed": 0,
"skipped": 0
},
"results": [
{
"node_id": "server-001",
"success": true,
"created": true
},
{
"node_id": "server-002",
"success": true,
"created": true
},
{
"node_id": "lb-001",
"success": true,
"created": true
}
],
"errors": []
}
Ingest Single Relationship
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# Create relationship between nodes
result = client.ingestion.ingest_relationship(
source_id="server-001",
target_id="db-prod-01",
relationship_type="CONNECTS_TO",
properties={
"port": 5432,
"protocol": "postgres",
"connection_pool": 20
}
)
print(f"Success: {result['success']}")
print(f"Relationship ID: {result['relationship_id']}")
Example Response
Example Response
Copy
Ask AI
{
"success": true,
"relationship_id": "rel-abc123",
"created": true
}
Ingest Relationships in Batch
Copy
Ask AI
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# Prepare batch of relationships
relationships = [
{
"source_id": "lb-001",
"target_id": "server-001",
"relationship_type": "ROUTES_TO",
"properties": {"weight": 50}
},
{
"source_id": "lb-001",
"target_id": "server-002",
"relationship_type": "ROUTES_TO",
"properties": {"weight": 50}
},
{
"source_id": "server-001",
"target_id": "db-prod-01",
"relationship_type": "CONNECTS_TO",
"properties": {"port": 5432}
}
]
# Batch ingest relationships
result = client.ingestion.ingest_relationships_batch(
relationships=relationships,
skip_missing_nodes=False, # Fail if nodes don't exist
transactional=True # All or nothing
)
print(f"Success: {result['summary']['success']}/{result['summary']['total']}")
Practical Examples
1. Import AWS Infrastructure
Import AWS resources into the graph:Copy
Ask AI
from kubiya import ControlPlaneClient
import boto3
def import_aws_infrastructure(client: ControlPlaneClient, region: str = "us-east-1"):
"""Import AWS EC2 instances and relationships."""
# Initialize AWS client
ec2 = boto3.client('ec2', region_name=region)
# Fetch instances
response = ec2.describe_instances()
nodes = []
relationships = []
for reservation in response['Reservations']:
vpc_id = reservation['Instances'][0].get('VpcId')
for instance in reservation['Instances']:
# Create node for instance
nodes.append({
"id": instance['InstanceId'],
"labels": ["EC2Instance", "AWS", "Compute"],
"properties": {
"name": next((tag['Value'] for tag in instance.get('Tags', []) if tag['Key'] == 'Name'), instance['InstanceId']),
"instance_type": instance['InstanceType'],
"state": instance['State']['Name'],
"region": region,
"availability_zone": instance['Placement']['AvailabilityZone'],
"private_ip": instance.get('PrivateIpAddress'),
"public_ip": instance.get('PublicIpAddress')
}
})
# Create relationship to VPC
if vpc_id:
relationships.append({
"source_id": instance['InstanceId'],
"target_id": vpc_id,
"relationship_type": "RUNS_IN",
"properties": {}
})
# Ingest nodes in batch
print(f"Ingesting {len(nodes)} EC2 instances...")
node_result = client.ingestion.ingest_nodes_batch(
nodes=nodes,
dataset_id="aws-infrastructure",
duplicate_handling="update"
)
print(f"✅ Nodes: {node_result['summary']['success']}/{node_result['summary']['total']}")
# Ingest relationships
if relationships:
print(f"Creating {len(relationships)} relationships...")
rel_result = client.ingestion.ingest_relationships_batch(
relationships=relationships,
skip_missing_nodes=True
)
print(f"✅ Relationships: {rel_result['summary']['success']}/{rel_result['summary']['total']}")
return node_result
# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_aws_infrastructure(client, region="us-east-1")
2. Import Kubernetes Resources
Import Kubernetes cluster data:Copy
Ask AI
from kubiya import ControlPlaneClient
from kubernetes import client as k8s_client, config
def import_kubernetes_cluster(cp_client: ControlPlaneClient, cluster_name: str):
"""Import Kubernetes deployments and services."""
# Load kubeconfig
config.load_kube_config()
v1 = k8s_client.CoreV1Api()
apps_v1 = k8s_client.AppsV1Api()
nodes = []
relationships = []
# Import deployments
deployments = apps_v1.list_deployment_for_all_namespaces()
for dep in deployments.items:
deployment_id = f"{dep.metadata.namespace}/{dep.metadata.name}"
nodes.append({
"id": deployment_id,
"labels": ["Deployment", "Kubernetes"],
"properties": {
"name": dep.metadata.name,
"namespace": dep.metadata.namespace,
"replicas": dep.spec.replicas,
"cluster": cluster_name,
"image": dep.spec.template.spec.containers[0].image if dep.spec.template.spec.containers else None
}
})
# Import services
services = v1.list_service_for_all_namespaces()
for svc in services.items:
service_id = f"{svc.metadata.namespace}/{svc.metadata.name}"
nodes.append({
"id": service_id,
"labels": ["Service", "Kubernetes"],
"properties": {
"name": svc.metadata.name,
"namespace": svc.metadata.namespace,
"type": svc.spec.type,
"cluster": cluster_name,
"ports": [p.port for p in svc.spec.ports] if svc.spec.ports else []
}
})
# Create relationships between services and deployments
if svc.spec.selector:
for dep in deployments.items:
if dep.metadata.namespace == svc.metadata.namespace:
deployment_id = f"{dep.metadata.namespace}/{dep.metadata.name}"
relationships.append({
"source_id": service_id,
"target_id": deployment_id,
"relationship_type": "ROUTES_TO",
"properties": {"selector": svc.spec.selector}
})
# Batch ingest
print(f"Ingesting {len(nodes)} Kubernetes resources...")
node_result = cp_client.ingestion.ingest_nodes_batch(
nodes=nodes,
dataset_id=f"k8s-{cluster_name}",
duplicate_handling="update"
)
print(f"✅ Nodes: {node_result['summary']['success']}/{node_result['summary']['total']}")
if relationships:
rel_result = cp_client.ingestion.ingest_relationships_batch(
relationships=relationships,
skip_missing_nodes=True
)
print(f"✅ Relationships: {rel_result['summary']['success']}/{rel_result['summary']['total']}")
return node_result
# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_kubernetes_cluster(client, cluster_name="prod-cluster")
3. Import Service Dependencies
Import application service dependencies:Copy
Ask AI
from kubiya import ControlPlaneClient
import yaml
def import_service_dependencies(client: ControlPlaneClient, dependency_file: str):
"""Import service dependencies from YAML file."""
# Load dependency configuration
with open(dependency_file, 'r') as f:
config = yaml.safe_load(f)
nodes = []
relationships = []
# Create nodes for services
for service in config['services']:
nodes.append({
"id": service['name'],
"labels": ["Service", "Application"],
"properties": {
"name": service['name'],
"type": service.get('type', 'unknown'),
"owner": service.get('owner'),
"repository": service.get('repository'),
"environment": service.get('environment', 'production')
}
})
# Create dependency relationships
for dep in service.get('dependencies', []):
relationships.append({
"source_id": service['name'],
"target_id": dep['name'],
"relationship_type": "DEPENDS_ON",
"properties": {
"type": dep.get('type', 'api'),
"required": dep.get('required', True)
}
})
# Batch ingest
print(f"Ingesting {len(nodes)} services...")
node_result = client.ingestion.ingest_nodes_batch(
nodes=nodes,
dataset_id="service-catalog",
duplicate_handling="merge"
)
print(f"✅ Services: {node_result['summary']['success']}/{node_result['summary']['total']}")
if relationships:
print(f"Creating {len(relationships)} dependencies...")
rel_result = client.ingestion.ingest_relationships_batch(
relationships=relationships
)
print(f"✅ Dependencies: {rel_result['summary']['success']}/{rel_result['summary']['total']}")
return node_result
# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_service_dependencies(client, "services.yaml")
services.yaml:
Copy
Ask AI
services:
- name: auth-service
type: api
owner: platform-team
repository: github.com/company/auth-service
dependencies:
- name: user-database
type: database
required: true
- name: redis-cache
type: cache
required: false
- name: api-gateway
type: gateway
owner: platform-team
dependencies:
- name: auth-service
type: api
required: true
4. Sync from External System
Continuously sync data from external system:Copy
Ask AI
from kubiya import ControlPlaneClient
import time
class GraphSyncer:
"""Sync external data to context graph."""
def __init__(self, client: ControlPlaneClient, dataset_id: str):
self.client = client
self.dataset_id = dataset_id
def sync_resources(self, resources: list):
"""Sync a batch of resources."""
nodes = []
for resource in resources:
nodes.append({
"id": resource['id'],
"labels": resource['labels'],
"properties": resource['properties']
})
# Use update strategy to keep data fresh
result = self.client.ingestion.ingest_nodes_batch(
nodes=nodes,
dataset_id=self.dataset_id,
duplicate_handling="update",
transactional=False
)
return result
def continuous_sync(self, fetch_func, interval_seconds: int = 300):
"""Continuously sync data at intervals."""
print(f"Starting continuous sync (interval: {interval_seconds}s)")
while True:
try:
# Fetch latest data
resources = fetch_func()
# Sync to graph
result = self.sync_resources(resources)
print(f"✅ Synced {result['summary']['success']}/{result['summary']['total']} resources")
# Wait before next sync
time.sleep(interval_seconds)
except Exception as e:
print(f"❌ Sync failed: {e}")
time.sleep(60) # Wait 1 minute on error
# Usage
client = ControlPlaneClient(api_key="your-api-key")
syncer = GraphSyncer(client, dataset_id="external-system")
def fetch_external_resources():
"""Fetch resources from external system."""
# Your external API call here
return []
# Start continuous sync
# syncer.continuous_sync(fetch_external_resources, interval_seconds=300)
Error Handling
Copy
Ask AI
from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import GraphError
client = ControlPlaneClient(api_key="your-api-key")
# Handle validation errors
try:
result = client.ingestion.ingest_node(
id="", # Empty ID - invalid
labels=[], # Empty labels - invalid
properties={}
)
except GraphError as e:
print(f"Validation failed: {e}")
# Handle batch errors
result = client.ingestion.ingest_nodes_batch(
nodes=[...],
transactional=False
)
if result['summary']['failed'] > 0:
print(f"Some imports failed:")
for error in result.get('errors', []):
print(f" - {error}")
# Handle missing nodes in relationships
try:
result = client.ingestion.ingest_relationship(
source_id="non-existent-node",
target_id="another-missing-node",
relationship_type="CONNECTS_TO"
)
except GraphError as e:
if "not found" in str(e).lower():
print("Source or target node doesn't exist")
Best Practices
1. Use Batch Operations
Copy
Ask AI
# ❌ BAD - Individual imports in loop
for node in nodes:
client.ingestion.ingest_node(
id=node['id'],
labels=node['labels'],
properties=node['properties']
)
# ✅ GOOD - Batch import
client.ingestion.ingest_nodes_batch(nodes=nodes)
2. Choose Appropriate Duplicate Handling
Copy
Ask AI
# Initial import - fail on duplicates
client.ingestion.ingest_nodes_batch(
nodes=nodes,
duplicate_handling="error"
)
# Sync/Update - skip or update
client.ingestion.ingest_nodes_batch(
nodes=nodes,
duplicate_handling="update" # or "skip"
)
3. Use Datasets for Organization
Copy
Ask AI
# Organize by source/purpose
client.ingestion.ingest_nodes_batch(
nodes=aws_nodes,
dataset_id="aws-infrastructure"
)
client.ingestion.ingest_nodes_batch(
nodes=k8s_nodes,
dataset_id="kubernetes-cluster"
)
4. Handle Large Imports
Copy
Ask AI
def chunked_import(client, nodes, chunk_size=1000):
"""Import large datasets in chunks."""
for i in range(0, len(nodes), chunk_size):
chunk = nodes[i:i + chunk_size]
result = client.ingestion.ingest_nodes_batch(
nodes=chunk,
duplicate_handling="skip",
transactional=False
)
print(f"Chunk {i // chunk_size + 1}: {result['summary']['success']} succeeded")
API Reference
Node Ingestion Methods
| Method | Description | Parameters | Returns |
|---|---|---|---|
ingest_node() | Ingest single node | id, labels, properties, dataset_id, duplicate_handling | Dict |
ingest_nodes_batch() | Ingest multiple nodes | nodes, dataset_id, duplicate_handling, transactional | Dict with summary |
Relationship Ingestion Methods
| Method | Description | Parameters | Returns |
|---|---|---|---|
ingest_relationship() | Ingest single relationship | source_id, target_id, relationship_type, properties, dataset_id | Dict |
ingest_relationships_batch() | Ingest multiple relationships | relationships, dataset_id, skip_missing_nodes, transactional | Dict with summary |
Batch Response Structure
Copy
Ask AI
{
"summary": {
"total": int,
"success": int,
"failed": int,
"skipped": int
},
"results": List[Dict],
"errors": List[Dict]
}