Graph Ingestion enables you to populate the context graph with custom data from your infrastructure, applications, and integrations. Import nodes and relationships in single or batch operations.
Overview
The Ingestion Service provides APIs to load data into the context graph:
Ingest Nodes : Add individual or batches of nodes (up to 1000 per batch)
Ingest Relationships : Create connections between nodes
Duplicate Handling : Control how duplicates are managed (error, skip, update, merge)
Transactional Mode : All-or-nothing batch operations
Dataset Association : Organize ingested data into logical datasets
Use batch operations for importing large amounts of data efficiently. Batch ingestion is significantly faster than individual imports.
Quick Start
from kubiya import ControlPlaneClient
# Initialize the client
client = ControlPlaneClient( api_key = "your-api-key" )
# Ingest a single node
result = client.ingestion.ingest_node(
id = "server-01" ,
labels = [ "Server" , "Production" ],
properties = {
"name" : "prod-web-01" ,
"ip" : "10.0.1.10" ,
"region" : "us-east-1"
},
duplicate_handling = "skip"
)
# Ingest nodes in batch
batch_result = client.ingestion.ingest_nodes_batch(
nodes = [
{
"id" : "server-02" ,
"labels" : [ "Server" ],
"properties" : { "name" : "prod-web-02" }
},
{
"id" : "server-03" ,
"labels" : [ "Server" ],
"properties" : { "name" : "prod-web-03" }
}
]
)
print ( f "Batch: { batch_result[ 'summary' ][ 'success' ] } succeeded" )
Core Concepts
Nodes
Nodes represent entities in the graph:
ID : Unique identifier (string)
Labels : Categories/types (e.g., [“Server”, “AWS”])
Properties : Key-value attributes
Relationships
Relationships connect nodes:
Source ID : Starting node
Target ID : Ending node
Relationship Type : Connection type (e.g., “CONNECTS_TO”)
Properties : Optional relationship metadata
Duplicate Handling Strategies
error : Fail if node exists (default)
skip : Ignore if exists, continue
update : Replace existing node
merge : Merge properties with existing
Transactional Mode
true : Roll back all if any operation fails
false : Process individually, some may succeed
Basic Usage
Ingest Single Node
from kubiya import ControlPlaneClient
client = ControlPlaneClient( api_key = "your-api-key" )
# Ingest node with duplicate handling
result = client.ingestion.ingest_node(
id = "db-prod-01" ,
labels = [ "Database" , "PostgreSQL" , "Production" ],
properties = {
"name" : "production-db" ,
"version" : "14.5" ,
"size" : "db.r5.xlarge" ,
"region" : "us-east-1" ,
"multi_az" : True
},
duplicate_handling = "skip"
)
print ( f "Success: { result[ 'success' ] } " )
print ( f "Node ID: { result[ 'node_id' ] } " )
print ( f "Created: { result[ 'created' ] } " )
{
"success" : true ,
"node_id" : "db-prod-01" ,
"created" : true ,
"message" : "Node ingested successfully"
}
Ingest Node with Dataset
from kubiya import ControlPlaneClient
client = ControlPlaneClient( api_key = "your-api-key" )
# Associate with dataset for organization
result = client.ingestion.ingest_node(
id = "app-service-01" ,
labels = [ "Service" , "Application" ],
properties = { "name" : "auth-service" , "port" : 8080 },
dataset_id = "production-infrastructure" ,
duplicate_handling = "update"
)
Ingest Nodes in Batch
from kubiya import ControlPlaneClient
client = ControlPlaneClient( api_key = "your-api-key" )
# Prepare batch of nodes
nodes = [
{
"id" : "server-001" ,
"labels" : [ "Server" , "AWS" , "EC2" ],
"properties" : {
"name" : "web-server-01" ,
"instance_type" : "t3.medium" ,
"region" : "us-east-1"
}
},
{
"id" : "server-002" ,
"labels" : [ "Server" , "AWS" , "EC2" ],
"properties" : {
"name" : "web-server-02" ,
"instance_type" : "t3.medium" ,
"region" : "us-west-2"
}
},
{
"id" : "lb-001" ,
"labels" : [ "LoadBalancer" , "AWS" , "ALB" ],
"properties" : {
"name" : "prod-alb" ,
"type" : "application" ,
"scheme" : "internet-facing"
}
}
]
# Batch ingest with options
result = client.ingestion.ingest_nodes_batch(
nodes = nodes,
duplicate_handling = "skip" ,
transactional = False # Continue even if some fail
)
# Review results
print ( f "Total: { result[ 'summary' ][ 'total' ] } " )
print ( f "Success: { result[ 'summary' ][ 'success' ] } " )
print ( f "Failed: { result[ 'summary' ][ 'failed' ] } " )
print ( f "Skipped: { result[ 'summary' ][ 'skipped' ] } " )
if result.get( 'errors' ):
print ( " \n Errors:" )
for error in result[ 'errors' ]:
print ( f " - { error } " )
{
"summary" : {
"total" : 3 ,
"success" : 3 ,
"failed" : 0 ,
"skipped" : 0
},
"results" : [
{
"node_id" : "server-001" ,
"success" : true ,
"created" : true
},
{
"node_id" : "server-002" ,
"success" : true ,
"created" : true
},
{
"node_id" : "lb-001" ,
"success" : true ,
"created" : true
}
],
"errors" : []
}
Ingest Single Relationship
from kubiya import ControlPlaneClient
client = ControlPlaneClient( api_key = "your-api-key" )
# Create relationship between nodes
result = client.ingestion.ingest_relationship(
source_id = "server-001" ,
target_id = "db-prod-01" ,
relationship_type = "CONNECTS_TO" ,
properties = {
"port" : 5432 ,
"protocol" : "postgres" ,
"connection_pool" : 20
}
)
print ( f "Success: { result[ 'success' ] } " )
print ( f "Relationship ID: { result[ 'relationship_id' ] } " )
{
"success" : true ,
"relationship_id" : "rel-abc123" ,
"created" : true
}
Ingest Relationships in Batch
from kubiya import ControlPlaneClient
client = ControlPlaneClient( api_key = "your-api-key" )
# Prepare batch of relationships
relationships = [
{
"source_id" : "lb-001" ,
"target_id" : "server-001" ,
"relationship_type" : "ROUTES_TO" ,
"properties" : { "weight" : 50 }
},
{
"source_id" : "lb-001" ,
"target_id" : "server-002" ,
"relationship_type" : "ROUTES_TO" ,
"properties" : { "weight" : 50 }
},
{
"source_id" : "server-001" ,
"target_id" : "db-prod-01" ,
"relationship_type" : "CONNECTS_TO" ,
"properties" : { "port" : 5432 }
}
]
# Batch ingest relationships
result = client.ingestion.ingest_relationships_batch(
relationships = relationships,
skip_missing_nodes = False , # Fail if nodes don't exist
transactional = True # All or nothing
)
print ( f "Success: { result[ 'summary' ][ 'success' ] } / { result[ 'summary' ][ 'total' ] } " )
Practical Examples
1. Import AWS Infrastructure
Import AWS resources into the graph:
from kubiya import ControlPlaneClient
import boto3
def import_aws_infrastructure ( client : ControlPlaneClient, region : str = "us-east-1" ):
"""Import AWS EC2 instances and relationships."""
# Initialize AWS client
ec2 = boto3.client( 'ec2' , region_name = region)
# Fetch instances
response = ec2.describe_instances()
nodes = []
relationships = []
for reservation in response[ 'Reservations' ]:
vpc_id = reservation[ 'Instances' ][ 0 ].get( 'VpcId' )
for instance in reservation[ 'Instances' ]:
# Create node for instance
nodes.append({
"id" : instance[ 'InstanceId' ],
"labels" : [ "EC2Instance" , "AWS" , "Compute" ],
"properties" : {
"name" : next ((tag[ 'Value' ] for tag in instance.get( 'Tags' , []) if tag[ 'Key' ] == 'Name' ), instance[ 'InstanceId' ]),
"instance_type" : instance[ 'InstanceType' ],
"state" : instance[ 'State' ][ 'Name' ],
"region" : region,
"availability_zone" : instance[ 'Placement' ][ 'AvailabilityZone' ],
"private_ip" : instance.get( 'PrivateIpAddress' ),
"public_ip" : instance.get( 'PublicIpAddress' )
}
})
# Create relationship to VPC
if vpc_id:
relationships.append({
"source_id" : instance[ 'InstanceId' ],
"target_id" : vpc_id,
"relationship_type" : "RUNS_IN" ,
"properties" : {}
})
# Ingest nodes in batch
print ( f "Ingesting { len (nodes) } EC2 instances..." )
node_result = client.ingestion.ingest_nodes_batch(
nodes = nodes,
dataset_id = "aws-infrastructure" ,
duplicate_handling = "update"
)
print ( f "✅ Nodes: { node_result[ 'summary' ][ 'success' ] } / { node_result[ 'summary' ][ 'total' ] } " )
# Ingest relationships
if relationships:
print ( f "Creating { len (relationships) } relationships..." )
rel_result = client.ingestion.ingest_relationships_batch(
relationships = relationships,
skip_missing_nodes = True
)
print ( f "✅ Relationships: { rel_result[ 'summary' ][ 'success' ] } / { rel_result[ 'summary' ][ 'total' ] } " )
return node_result
# Usage
client = ControlPlaneClient( api_key = "your-api-key" )
result = import_aws_infrastructure(client, region = "us-east-1" )
2. Import Kubernetes Resources
Import Kubernetes cluster data:
from kubiya import ControlPlaneClient
from kubernetes import client as k8s_client, config
def import_kubernetes_cluster ( cp_client : ControlPlaneClient, cluster_name : str ):
"""Import Kubernetes deployments and services."""
# Load kubeconfig
config.load_kube_config()
v1 = k8s_client.CoreV1Api()
apps_v1 = k8s_client.AppsV1Api()
nodes = []
relationships = []
# Import deployments
deployments = apps_v1.list_deployment_for_all_namespaces()
for dep in deployments.items:
deployment_id = f " { dep.metadata.namespace } / { dep.metadata.name } "
nodes.append({
"id" : deployment_id,
"labels" : [ "Deployment" , "Kubernetes" ],
"properties" : {
"name" : dep.metadata.name,
"namespace" : dep.metadata.namespace,
"replicas" : dep.spec.replicas,
"cluster" : cluster_name,
"image" : dep.spec.template.spec.containers[ 0 ].image if dep.spec.template.spec.containers else None
}
})
# Import services
services = v1.list_service_for_all_namespaces()
for svc in services.items:
service_id = f " { svc.metadata.namespace } / { svc.metadata.name } "
nodes.append({
"id" : service_id,
"labels" : [ "Service" , "Kubernetes" ],
"properties" : {
"name" : svc.metadata.name,
"namespace" : svc.metadata.namespace,
"type" : svc.spec.type,
"cluster" : cluster_name,
"ports" : [p.port for p in svc.spec.ports] if svc.spec.ports else []
}
})
# Create relationships between services and deployments
if svc.spec.selector:
for dep in deployments.items:
if dep.metadata.namespace == svc.metadata.namespace:
deployment_id = f " { dep.metadata.namespace } / { dep.metadata.name } "
relationships.append({
"source_id" : service_id,
"target_id" : deployment_id,
"relationship_type" : "ROUTES_TO" ,
"properties" : { "selector" : svc.spec.selector}
})
# Batch ingest
print ( f "Ingesting { len (nodes) } Kubernetes resources..." )
node_result = cp_client.ingestion.ingest_nodes_batch(
nodes = nodes,
dataset_id = f "k8s- { cluster_name } " ,
duplicate_handling = "update"
)
print ( f "✅ Nodes: { node_result[ 'summary' ][ 'success' ] } / { node_result[ 'summary' ][ 'total' ] } " )
if relationships:
rel_result = cp_client.ingestion.ingest_relationships_batch(
relationships = relationships,
skip_missing_nodes = True
)
print ( f "✅ Relationships: { rel_result[ 'summary' ][ 'success' ] } / { rel_result[ 'summary' ][ 'total' ] } " )
return node_result
# Usage
client = ControlPlaneClient( api_key = "your-api-key" )
result = import_kubernetes_cluster(client, cluster_name = "prod-cluster" )
3. Import Service Dependencies
Import application service dependencies:
from kubiya import ControlPlaneClient
import yaml
def import_service_dependencies ( client : ControlPlaneClient, dependency_file : str ):
"""Import service dependencies from YAML file."""
# Load dependency configuration
with open (dependency_file, 'r' ) as f:
config = yaml.safe_load(f)
nodes = []
relationships = []
# Create nodes for services
for service in config[ 'services' ]:
nodes.append({
"id" : service[ 'name' ],
"labels" : [ "Service" , "Application" ],
"properties" : {
"name" : service[ 'name' ],
"type" : service.get( 'type' , 'unknown' ),
"owner" : service.get( 'owner' ),
"repository" : service.get( 'repository' ),
"environment" : service.get( 'environment' , 'production' )
}
})
# Create dependency relationships
for dep in service.get( 'dependencies' , []):
relationships.append({
"source_id" : service[ 'name' ],
"target_id" : dep[ 'name' ],
"relationship_type" : "DEPENDS_ON" ,
"properties" : {
"type" : dep.get( 'type' , 'api' ),
"required" : dep.get( 'required' , True )
}
})
# Batch ingest
print ( f "Ingesting { len (nodes) } services..." )
node_result = client.ingestion.ingest_nodes_batch(
nodes = nodes,
dataset_id = "service-catalog" ,
duplicate_handling = "merge"
)
print ( f "✅ Services: { node_result[ 'summary' ][ 'success' ] } / { node_result[ 'summary' ][ 'total' ] } " )
if relationships:
print ( f "Creating { len (relationships) } dependencies..." )
rel_result = client.ingestion.ingest_relationships_batch(
relationships = relationships
)
print ( f "✅ Dependencies: { rel_result[ 'summary' ][ 'success' ] } / { rel_result[ 'summary' ][ 'total' ] } " )
return node_result
# Usage
client = ControlPlaneClient( api_key = "your-api-key" )
result = import_service_dependencies(client, "services.yaml" )
Example services.yaml:
services :
- name : auth-service
type : api
owner : platform-team
repository : github.com/company/auth-service
dependencies :
- name : user-database
type : database
required : true
- name : redis-cache
type : cache
required : false
- name : api-gateway
type : gateway
owner : platform-team
dependencies :
- name : auth-service
type : api
required : true
4. Sync from External System
Continuously sync data from external system:
from kubiya import ControlPlaneClient
import time
class GraphSyncer :
"""Sync external data to context graph."""
def __init__ ( self , client : ControlPlaneClient, dataset_id : str ):
self .client = client
self .dataset_id = dataset_id
def sync_resources ( self , resources : list ):
"""Sync a batch of resources."""
nodes = []
for resource in resources:
nodes.append({
"id" : resource[ 'id' ],
"labels" : resource[ 'labels' ],
"properties" : resource[ 'properties' ]
})
# Use update strategy to keep data fresh
result = self .client.ingestion.ingest_nodes_batch(
nodes = nodes,
dataset_id = self .dataset_id,
duplicate_handling = "update" ,
transactional = False
)
return result
def continuous_sync ( self , fetch_func , interval_seconds : int = 300 ):
"""Continuously sync data at intervals."""
print ( f "Starting continuous sync (interval: { interval_seconds } s)" )
while True :
try :
# Fetch latest data
resources = fetch_func()
# Sync to graph
result = self .sync_resources(resources)
print ( f "✅ Synced { result[ 'summary' ][ 'success' ] } / { result[ 'summary' ][ 'total' ] } resources" )
# Wait before next sync
time.sleep(interval_seconds)
except Exception as e:
print ( f "❌ Sync failed: { e } " )
time.sleep( 60 ) # Wait 1 minute on error
# Usage
client = ControlPlaneClient( api_key = "your-api-key" )
syncer = GraphSyncer(client, dataset_id = "external-system" )
def fetch_external_resources ():
"""Fetch resources from external system."""
# Your external API call here
return []
# Start continuous sync
# syncer.continuous_sync(fetch_external_resources, interval_seconds=300)
Error Handling
from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import GraphError
client = ControlPlaneClient( api_key = "your-api-key" )
# Handle validation errors
try :
result = client.ingestion.ingest_node(
id = "" , # Empty ID - invalid
labels = [], # Empty labels - invalid
properties = {}
)
except GraphError as e:
print ( f "Validation failed: { e } " )
# Handle batch errors
result = client.ingestion.ingest_nodes_batch(
nodes = [ ... ],
transactional = False
)
if result[ 'summary' ][ 'failed' ] > 0 :
print ( f "Some imports failed:" )
for error in result.get( 'errors' , []):
print ( f " - { error } " )
# Handle missing nodes in relationships
try :
result = client.ingestion.ingest_relationship(
source_id = "non-existent-node" ,
target_id = "another-missing-node" ,
relationship_type = "CONNECTS_TO"
)
except GraphError as e:
if "not found" in str (e).lower():
print ( "Source or target node doesn't exist" )
Best Practices
1. Use Batch Operations
# ❌ BAD - Individual imports in loop
for node in nodes:
client.ingestion.ingest_node(
id = node[ 'id' ],
labels = node[ 'labels' ],
properties = node[ 'properties' ]
)
# ✅ GOOD - Batch import
client.ingestion.ingest_nodes_batch( nodes = nodes)
2. Choose Appropriate Duplicate Handling
# Initial import - fail on duplicates
client.ingestion.ingest_nodes_batch(
nodes = nodes,
duplicate_handling = "error"
)
# Sync/Update - skip or update
client.ingestion.ingest_nodes_batch(
nodes = nodes,
duplicate_handling = "update" # or "skip"
)
3. Use Datasets for Organization
# Organize by source/purpose
client.ingestion.ingest_nodes_batch(
nodes = aws_nodes,
dataset_id = "aws-infrastructure"
)
client.ingestion.ingest_nodes_batch(
nodes = k8s_nodes,
dataset_id = "kubernetes-cluster"
)
4. Handle Large Imports
def chunked_import ( client , nodes , chunk_size = 1000 ):
"""Import large datasets in chunks."""
for i in range ( 0 , len (nodes), chunk_size):
chunk = nodes[i:i + chunk_size]
result = client.ingestion.ingest_nodes_batch(
nodes = chunk,
duplicate_handling = "skip" ,
transactional = False
)
print ( f "Chunk { i // chunk_size + 1 } : { result[ 'summary' ][ 'success' ] } succeeded" )
API Reference
Node Ingestion Methods
Method Description Parameters Returns ingest_node()Ingest single node id, labels, properties, dataset_id, duplicate_handlingDictingest_nodes_batch()Ingest multiple nodes nodes, dataset_id, duplicate_handling, transactionalDict with summary
Relationship Ingestion Methods
Method Description Parameters Returns ingest_relationship()Ingest single relationship source_id, target_id, relationship_type, properties, dataset_idDictingest_relationships_batch()Ingest multiple relationships relationships, dataset_id, skip_missing_nodes, transactionalDict with summary
Batch Response Structure
{
"summary" : {
"total" : int ,
"success" : int ,
"failed" : int ,
"skipped" : int
},
"results" : List[Dict],
"errors" : List[Dict]
}
Next Steps
Datasets Manage cognitive datasets
Context Graph Query and explore the graph
Intelligent Search AI-powered graph search
Best Practices SDK best practices guide