Skip to main content
Graph Ingestion enables you to populate the context graph with custom data from your infrastructure, applications, and integrations. Import nodes and relationships in single or batch operations.

Overview

The Ingestion Service provides APIs to load data into the context graph:
  • Ingest Nodes: Add individual or batches of nodes (up to 1000 per batch)
  • Ingest Relationships: Create connections between nodes
  • Duplicate Handling: Control how duplicates are managed (error, skip, update, merge)
  • Transactional Mode: All-or-nothing batch operations
  • Dataset Association: Organize ingested data into logical datasets
Use batch operations for importing large amounts of data efficiently. Batch ingestion is significantly faster than individual imports.

Quick Start

from kubiya import ControlPlaneClient

# Initialize the client
client = ControlPlaneClient(api_key="your-api-key")

# Ingest a single node
result = client.ingestion.ingest_node(
    id="server-01",
    labels=["Server", "Production"],
    properties={
        "name": "prod-web-01",
        "ip": "10.0.1.10",
        "region": "us-east-1"
    },
    duplicate_handling="skip"
)

# Ingest nodes in batch
batch_result = client.ingestion.ingest_nodes_batch(
    nodes=[
        {
            "id": "server-02",
            "labels": ["Server"],
            "properties": {"name": "prod-web-02"}
        },
        {
            "id": "server-03",
            "labels": ["Server"],
            "properties": {"name": "prod-web-03"}
        }
    ]
)

print(f"Batch: {batch_result['summary']['success']} succeeded")

Core Concepts

Nodes

Nodes represent entities in the graph:
  • ID: Unique identifier (string)
  • Labels: Categories/types (e.g., [“Server”, “AWS”])
  • Properties: Key-value attributes

Relationships

Relationships connect nodes:
  • Source ID: Starting node
  • Target ID: Ending node
  • Relationship Type: Connection type (e.g., “CONNECTS_TO”)
  • Properties: Optional relationship metadata

Duplicate Handling Strategies

  • error: Fail if node exists (default)
  • skip: Ignore if exists, continue
  • update: Replace existing node
  • merge: Merge properties with existing

Transactional Mode

  • true: Roll back all if any operation fails
  • false: Process individually, some may succeed

Basic Usage

Ingest Single Node

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Ingest node with duplicate handling
result = client.ingestion.ingest_node(
    id="db-prod-01",
    labels=["Database", "PostgreSQL", "Production"],
    properties={
        "name": "production-db",
        "version": "14.5",
        "size": "db.r5.xlarge",
        "region": "us-east-1",
        "multi_az": True
    },
    duplicate_handling="skip"
)

print(f"Success: {result['success']}")
print(f"Node ID: {result['node_id']}")
print(f"Created: {result['created']}")
{
  "success": true,
  "node_id": "db-prod-01",
  "created": true,
  "message": "Node ingested successfully"
}

Ingest Node with Dataset

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Associate with dataset for organization
result = client.ingestion.ingest_node(
    id="app-service-01",
    labels=["Service", "Application"],
    properties={"name": "auth-service", "port": 8080},
    dataset_id="production-infrastructure",
    duplicate_handling="update"
)

Ingest Nodes in Batch

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Prepare batch of nodes
nodes = [
    {
        "id": "server-001",
        "labels": ["Server", "AWS", "EC2"],
        "properties": {
            "name": "web-server-01",
            "instance_type": "t3.medium",
            "region": "us-east-1"
        }
    },
    {
        "id": "server-002",
        "labels": ["Server", "AWS", "EC2"],
        "properties": {
            "name": "web-server-02",
            "instance_type": "t3.medium",
            "region": "us-west-2"
        }
    },
    {
        "id": "lb-001",
        "labels": ["LoadBalancer", "AWS", "ALB"],
        "properties": {
            "name": "prod-alb",
            "type": "application",
            "scheme": "internet-facing"
        }
    }
]

# Batch ingest with options
result = client.ingestion.ingest_nodes_batch(
    nodes=nodes,
    duplicate_handling="skip",
    transactional=False  # Continue even if some fail
)

# Review results
print(f"Total: {result['summary']['total']}")
print(f"Success: {result['summary']['success']}")
print(f"Failed: {result['summary']['failed']}")
print(f"Skipped: {result['summary']['skipped']}")

if result.get('errors'):
    print("\nErrors:")
    for error in result['errors']:
        print(f"  - {error}")
{
  "summary": {
    "total": 3,
    "success": 3,
    "failed": 0,
    "skipped": 0
  },
  "results": [
    {
      "node_id": "server-001",
      "success": true,
      "created": true
    },
    {
      "node_id": "server-002",
      "success": true,
      "created": true
    },
    {
      "node_id": "lb-001",
      "success": true,
      "created": true
    }
  ],
  "errors": []
}

Ingest Single Relationship

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Create relationship between nodes
result = client.ingestion.ingest_relationship(
    source_id="server-001",
    target_id="db-prod-01",
    relationship_type="CONNECTS_TO",
    properties={
        "port": 5432,
        "protocol": "postgres",
        "connection_pool": 20
    }
)

print(f"Success: {result['success']}")
print(f"Relationship ID: {result['relationship_id']}")
{
  "success": true,
  "relationship_id": "rel-abc123",
  "created": true
}

Ingest Relationships in Batch

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Prepare batch of relationships
relationships = [
    {
        "source_id": "lb-001",
        "target_id": "server-001",
        "relationship_type": "ROUTES_TO",
        "properties": {"weight": 50}
    },
    {
        "source_id": "lb-001",
        "target_id": "server-002",
        "relationship_type": "ROUTES_TO",
        "properties": {"weight": 50}
    },
    {
        "source_id": "server-001",
        "target_id": "db-prod-01",
        "relationship_type": "CONNECTS_TO",
        "properties": {"port": 5432}
    }
]

# Batch ingest relationships
result = client.ingestion.ingest_relationships_batch(
    relationships=relationships,
    skip_missing_nodes=False,  # Fail if nodes don't exist
    transactional=True  # All or nothing
)

print(f"Success: {result['summary']['success']}/{result['summary']['total']}")

Practical Examples

1. Import AWS Infrastructure

Import AWS resources into the graph:
from kubiya import ControlPlaneClient
import boto3

def import_aws_infrastructure(client: ControlPlaneClient, region: str = "us-east-1"):
    """Import AWS EC2 instances and relationships."""

    # Initialize AWS client
    ec2 = boto3.client('ec2', region_name=region)

    # Fetch instances
    response = ec2.describe_instances()

    nodes = []
    relationships = []

    for reservation in response['Reservations']:
        vpc_id = reservation['Instances'][0].get('VpcId')

        for instance in reservation['Instances']:
            # Create node for instance
            nodes.append({
                "id": instance['InstanceId'],
                "labels": ["EC2Instance", "AWS", "Compute"],
                "properties": {
                    "name": next((tag['Value'] for tag in instance.get('Tags', []) if tag['Key'] == 'Name'), instance['InstanceId']),
                    "instance_type": instance['InstanceType'],
                    "state": instance['State']['Name'],
                    "region": region,
                    "availability_zone": instance['Placement']['AvailabilityZone'],
                    "private_ip": instance.get('PrivateIpAddress'),
                    "public_ip": instance.get('PublicIpAddress')
                }
            })

            # Create relationship to VPC
            if vpc_id:
                relationships.append({
                    "source_id": instance['InstanceId'],
                    "target_id": vpc_id,
                    "relationship_type": "RUNS_IN",
                    "properties": {}
                })

    # Ingest nodes in batch
    print(f"Ingesting {len(nodes)} EC2 instances...")
    node_result = client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id="aws-infrastructure",
        duplicate_handling="update"
    )

    print(f"✅ Nodes: {node_result['summary']['success']}/{node_result['summary']['total']}")

    # Ingest relationships
    if relationships:
        print(f"Creating {len(relationships)} relationships...")
        rel_result = client.ingestion.ingest_relationships_batch(
            relationships=relationships,
            skip_missing_nodes=True
        )

        print(f"✅ Relationships: {rel_result['summary']['success']}/{rel_result['summary']['total']}")

    return node_result

# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_aws_infrastructure(client, region="us-east-1")

2. Import Kubernetes Resources

Import Kubernetes cluster data:
from kubiya import ControlPlaneClient
from kubernetes import client as k8s_client, config

def import_kubernetes_cluster(cp_client: ControlPlaneClient, cluster_name: str):
    """Import Kubernetes deployments and services."""

    # Load kubeconfig
    config.load_kube_config()
    v1 = k8s_client.CoreV1Api()
    apps_v1 = k8s_client.AppsV1Api()

    nodes = []
    relationships = []

    # Import deployments
    deployments = apps_v1.list_deployment_for_all_namespaces()

    for dep in deployments.items:
        deployment_id = f"{dep.metadata.namespace}/{dep.metadata.name}"

        nodes.append({
            "id": deployment_id,
            "labels": ["Deployment", "Kubernetes"],
            "properties": {
                "name": dep.metadata.name,
                "namespace": dep.metadata.namespace,
                "replicas": dep.spec.replicas,
                "cluster": cluster_name,
                "image": dep.spec.template.spec.containers[0].image if dep.spec.template.spec.containers else None
            }
        })

    # Import services
    services = v1.list_service_for_all_namespaces()

    for svc in services.items:
        service_id = f"{svc.metadata.namespace}/{svc.metadata.name}"

        nodes.append({
            "id": service_id,
            "labels": ["Service", "Kubernetes"],
            "properties": {
                "name": svc.metadata.name,
                "namespace": svc.metadata.namespace,
                "type": svc.spec.type,
                "cluster": cluster_name,
                "ports": [p.port for p in svc.spec.ports] if svc.spec.ports else []
            }
        })

        # Create relationships between services and deployments
        if svc.spec.selector:
            for dep in deployments.items:
                if dep.metadata.namespace == svc.metadata.namespace:
                    deployment_id = f"{dep.metadata.namespace}/{dep.metadata.name}"

                    relationships.append({
                        "source_id": service_id,
                        "target_id": deployment_id,
                        "relationship_type": "ROUTES_TO",
                        "properties": {"selector": svc.spec.selector}
                    })

    # Batch ingest
    print(f"Ingesting {len(nodes)} Kubernetes resources...")
    node_result = cp_client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id=f"k8s-{cluster_name}",
        duplicate_handling="update"
    )

    print(f"✅ Nodes: {node_result['summary']['success']}/{node_result['summary']['total']}")

    if relationships:
        rel_result = cp_client.ingestion.ingest_relationships_batch(
            relationships=relationships,
            skip_missing_nodes=True
        )

        print(f"✅ Relationships: {rel_result['summary']['success']}/{rel_result['summary']['total']}")

    return node_result

# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_kubernetes_cluster(client, cluster_name="prod-cluster")

3. Import Service Dependencies

Import application service dependencies:
from kubiya import ControlPlaneClient
import yaml

def import_service_dependencies(client: ControlPlaneClient, dependency_file: str):
    """Import service dependencies from YAML file."""

    # Load dependency configuration
    with open(dependency_file, 'r') as f:
        config = yaml.safe_load(f)

    nodes = []
    relationships = []

    # Create nodes for services
    for service in config['services']:
        nodes.append({
            "id": service['name'],
            "labels": ["Service", "Application"],
            "properties": {
                "name": service['name'],
                "type": service.get('type', 'unknown'),
                "owner": service.get('owner'),
                "repository": service.get('repository'),
                "environment": service.get('environment', 'production')
            }
        })

        # Create dependency relationships
        for dep in service.get('dependencies', []):
            relationships.append({
                "source_id": service['name'],
                "target_id": dep['name'],
                "relationship_type": "DEPENDS_ON",
                "properties": {
                    "type": dep.get('type', 'api'),
                    "required": dep.get('required', True)
                }
            })

    # Batch ingest
    print(f"Ingesting {len(nodes)} services...")
    node_result = client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id="service-catalog",
        duplicate_handling="merge"
    )

    print(f"✅ Services: {node_result['summary']['success']}/{node_result['summary']['total']}")

    if relationships:
        print(f"Creating {len(relationships)} dependencies...")
        rel_result = client.ingestion.ingest_relationships_batch(
            relationships=relationships
        )

        print(f"✅ Dependencies: {rel_result['summary']['success']}/{rel_result['summary']['total']}")

    return node_result

# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_service_dependencies(client, "services.yaml")
Example services.yaml:
services:
  - name: auth-service
    type: api
    owner: platform-team
    repository: github.com/company/auth-service
    dependencies:
      - name: user-database
        type: database
        required: true
      - name: redis-cache
        type: cache
        required: false

  - name: api-gateway
    type: gateway
    owner: platform-team
    dependencies:
      - name: auth-service
        type: api
        required: true

4. Sync from External System

Continuously sync data from external system:
from kubiya import ControlPlaneClient
import time

class GraphSyncer:
    """Sync external data to context graph."""

    def __init__(self, client: ControlPlaneClient, dataset_id: str):
        self.client = client
        self.dataset_id = dataset_id

    def sync_resources(self, resources: list):
        """Sync a batch of resources."""

        nodes = []

        for resource in resources:
            nodes.append({
                "id": resource['id'],
                "labels": resource['labels'],
                "properties": resource['properties']
            })

        # Use update strategy to keep data fresh
        result = self.client.ingestion.ingest_nodes_batch(
            nodes=nodes,
            dataset_id=self.dataset_id,
            duplicate_handling="update",
            transactional=False
        )

        return result

    def continuous_sync(self, fetch_func, interval_seconds: int = 300):
        """Continuously sync data at intervals."""

        print(f"Starting continuous sync (interval: {interval_seconds}s)")

        while True:
            try:
                # Fetch latest data
                resources = fetch_func()

                # Sync to graph
                result = self.sync_resources(resources)

                print(f"✅ Synced {result['summary']['success']}/{result['summary']['total']} resources")

                # Wait before next sync
                time.sleep(interval_seconds)

            except Exception as e:
                print(f"❌ Sync failed: {e}")
                time.sleep(60)  # Wait 1 minute on error

# Usage
client = ControlPlaneClient(api_key="your-api-key")
syncer = GraphSyncer(client, dataset_id="external-system")

def fetch_external_resources():
    """Fetch resources from external system."""
    # Your external API call here
    return []

# Start continuous sync
# syncer.continuous_sync(fetch_external_resources, interval_seconds=300)

Error Handling

from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import GraphError

client = ControlPlaneClient(api_key="your-api-key")

# Handle validation errors
try:
    result = client.ingestion.ingest_node(
        id="",  # Empty ID - invalid
        labels=[],  # Empty labels - invalid
        properties={}
    )
except GraphError as e:
    print(f"Validation failed: {e}")

# Handle batch errors
result = client.ingestion.ingest_nodes_batch(
    nodes=[...],
    transactional=False
)

if result['summary']['failed'] > 0:
    print(f"Some imports failed:")
    for error in result.get('errors', []):
        print(f"  - {error}")

# Handle missing nodes in relationships
try:
    result = client.ingestion.ingest_relationship(
        source_id="non-existent-node",
        target_id="another-missing-node",
        relationship_type="CONNECTS_TO"
    )
except GraphError as e:
    if "not found" in str(e).lower():
        print("Source or target node doesn't exist")

Best Practices

1. Use Batch Operations

# ❌ BAD - Individual imports in loop
for node in nodes:
    client.ingestion.ingest_node(
        id=node['id'],
        labels=node['labels'],
        properties=node['properties']
    )

# ✅ GOOD - Batch import
client.ingestion.ingest_nodes_batch(nodes=nodes)

2. Choose Appropriate Duplicate Handling

# Initial import - fail on duplicates
client.ingestion.ingest_nodes_batch(
    nodes=nodes,
    duplicate_handling="error"
)

# Sync/Update - skip or update
client.ingestion.ingest_nodes_batch(
    nodes=nodes,
    duplicate_handling="update"  # or "skip"
)

3. Use Datasets for Organization

# Organize by source/purpose
client.ingestion.ingest_nodes_batch(
    nodes=aws_nodes,
    dataset_id="aws-infrastructure"
)

client.ingestion.ingest_nodes_batch(
    nodes=k8s_nodes,
    dataset_id="kubernetes-cluster"
)

4. Handle Large Imports

def chunked_import(client, nodes, chunk_size=1000):
    """Import large datasets in chunks."""

    for i in range(0, len(nodes), chunk_size):
        chunk = nodes[i:i + chunk_size]

        result = client.ingestion.ingest_nodes_batch(
            nodes=chunk,
            duplicate_handling="skip",
            transactional=False
        )

        print(f"Chunk {i // chunk_size + 1}: {result['summary']['success']} succeeded")

API Reference

Node Ingestion Methods

MethodDescriptionParametersReturns
ingest_node()Ingest single nodeid, labels, properties, dataset_id, duplicate_handlingDict
ingest_nodes_batch()Ingest multiple nodesnodes, dataset_id, duplicate_handling, transactionalDict with summary

Relationship Ingestion Methods

MethodDescriptionParametersReturns
ingest_relationship()Ingest single relationshipsource_id, target_id, relationship_type, properties, dataset_idDict
ingest_relationships_batch()Ingest multiple relationshipsrelationships, dataset_id, skip_missing_nodes, transactionalDict with summary

Batch Response Structure

{
    "summary": {
        "total": int,
        "success": int,
        "failed": int,
        "skipped": int
    },
    "results": List[Dict],
    "errors": List[Dict]
}

Next Steps