> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kubiya.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Graph Ingestion

> Import nodes and relationships into the context graph programmatically

<Info>
  Graph Ingestion enables you to populate the context graph with custom data from your infrastructure, applications, and integrations. Import nodes and relationships in single or batch operations.
</Info>

## Overview

The Ingestion Service provides APIs to load data into the context graph:

* **Ingest Nodes**: Add individual or batches of nodes (up to 1000 per batch)
* **Ingest Relationships**: Create connections between nodes
* **Duplicate Handling**: Control how duplicates are managed (error, skip, update, merge)
* **Transactional Mode**: All-or-nothing batch operations
* **Dataset Association**: Organize ingested data into logical datasets

<Note>
  Use batch operations for importing large amounts of data efficiently. Batch ingestion is significantly faster than individual imports.
</Note>

## Quick Start

```python theme={null}
from kubiya import ControlPlaneClient

# Initialize the client
client = ControlPlaneClient(api_key="your-api-key")

# Ingest a single node
result = client.ingestion.ingest_node(
    id="server-01",
    labels=["Server", "Production"],
    properties={
        "name": "prod-web-01",
        "ip": "10.0.1.10",
        "region": "us-east-1"
    },
    duplicate_handling="skip"
)

# Ingest nodes in batch
batch_result = client.ingestion.ingest_nodes_batch(
    nodes=[
        {
            "id": "server-02",
            "labels": ["Server"],
            "properties": {"name": "prod-web-02"}
        },
        {
            "id": "server-03",
            "labels": ["Server"],
            "properties": {"name": "prod-web-03"}
        }
    ]
)

print(f"Batch: {batch_result['summary']['success']} succeeded")
```

## Core Concepts

### Nodes

Nodes represent entities in the graph:

* **ID**: Unique identifier (string)
* **Labels**: Categories/types (e.g., \["Server", "AWS"])
* **Properties**: Key-value attributes

### Relationships

Relationships connect nodes:

* **Source ID**: Starting node
* **Target ID**: Ending node
* **Relationship Type**: Connection type (e.g., "CONNECTS\_TO")
* **Properties**: Optional relationship metadata

### Duplicate Handling Strategies

* **`error`**: Fail if node exists (default)
* **`skip`**: Ignore if exists, continue
* **`update`**: Replace existing node
* **`merge`**: Merge properties with existing

### Transactional Mode

* **`true`**: Roll back all if any operation fails
* **`false`**: Process individually, some may succeed

## Basic Usage

### Ingest Single Node

```python theme={null}
from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Ingest node with duplicate handling
result = client.ingestion.ingest_node(
    id="db-prod-01",
    labels=["Database", "PostgreSQL", "Production"],
    properties={
        "name": "production-db",
        "version": "14.5",
        "size": "db.r5.xlarge",
        "region": "us-east-1",
        "multi_az": True
    },
    duplicate_handling="skip"
)

print(f"Success: {result['success']}")
print(f"Node ID: {result['node_id']}")
print(f"Created: {result['created']}")
```

<Accordion title="Example Response">
  ```json theme={null}
  {
    "success": true,
    "node_id": "db-prod-01",
    "created": true,
    "message": "Node ingested successfully"
  }
  ```
</Accordion>

### Ingest Node with Dataset

```python theme={null}
from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Associate with dataset for organization
result = client.ingestion.ingest_node(
    id="app-service-01",
    labels=["Service", "Application"],
    properties={"name": "auth-service", "port": 8080},
    dataset_id="production-infrastructure",
    duplicate_handling="update"
)
```

### Ingest Nodes in Batch

```python theme={null}
from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Prepare batch of nodes
nodes = [
    {
        "id": "server-001",
        "labels": ["Server", "AWS", "EC2"],
        "properties": {
            "name": "web-server-01",
            "instance_type": "t3.medium",
            "region": "us-east-1"
        }
    },
    {
        "id": "server-002",
        "labels": ["Server", "AWS", "EC2"],
        "properties": {
            "name": "web-server-02",
            "instance_type": "t3.medium",
            "region": "us-west-2"
        }
    },
    {
        "id": "lb-001",
        "labels": ["LoadBalancer", "AWS", "ALB"],
        "properties": {
            "name": "prod-alb",
            "type": "application",
            "scheme": "internet-facing"
        }
    }
]

# Batch ingest with options
result = client.ingestion.ingest_nodes_batch(
    nodes=nodes,
    duplicate_handling="skip",
    transactional=False  # Continue even if some fail
)

# Review results
print(f"Total: {result['summary']['total']}")
print(f"Success: {result['summary']['success']}")
print(f"Failed: {result['summary']['failed']}")
print(f"Skipped: {result['summary']['skipped']}")

if result.get('errors'):
    print("\nErrors:")
    for error in result['errors']:
        print(f"  - {error}")
```

<Accordion title="Example Response">
  ```json theme={null}
  {
    "summary": {
      "total": 3,
      "success": 3,
      "failed": 0,
      "skipped": 0
    },
    "results": [
      {
        "node_id": "server-001",
        "success": true,
        "created": true
      },
      {
        "node_id": "server-002",
        "success": true,
        "created": true
      },
      {
        "node_id": "lb-001",
        "success": true,
        "created": true
      }
    ],
    "errors": []
  }
  ```
</Accordion>

### Ingest Single Relationship

```python theme={null}
from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Create relationship between nodes
result = client.ingestion.ingest_relationship(
    source_id="server-001",
    target_id="db-prod-01",
    relationship_type="CONNECTS_TO",
    properties={
        "port": 5432,
        "protocol": "postgres",
        "connection_pool": 20
    }
)

print(f"Success: {result['success']}")
print(f"Relationship ID: {result['relationship_id']}")
```

<Accordion title="Example Response">
  ```json theme={null}
  {
    "success": true,
    "relationship_id": "rel-abc123",
    "created": true
  }
  ```
</Accordion>

### Ingest Relationships in Batch

```python theme={null}
from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Prepare batch of relationships
relationships = [
    {
        "source_id": "lb-001",
        "target_id": "server-001",
        "relationship_type": "ROUTES_TO",
        "properties": {"weight": 50}
    },
    {
        "source_id": "lb-001",
        "target_id": "server-002",
        "relationship_type": "ROUTES_TO",
        "properties": {"weight": 50}
    },
    {
        "source_id": "server-001",
        "target_id": "db-prod-01",
        "relationship_type": "CONNECTS_TO",
        "properties": {"port": 5432}
    }
]

# Batch ingest relationships
result = client.ingestion.ingest_relationships_batch(
    relationships=relationships,
    skip_missing_nodes=False,  # Fail if nodes don't exist
    transactional=True  # All or nothing
)

print(f"Success: {result['summary']['success']}/{result['summary']['total']}")
```

## Practical Examples

### 1. Import AWS Infrastructure

Import AWS resources into the graph:

```python theme={null}
from kubiya import ControlPlaneClient
import boto3

def import_aws_infrastructure(client: ControlPlaneClient, region: str = "us-east-1"):
    """Import AWS EC2 instances and relationships."""

    # Initialize AWS client
    ec2 = boto3.client('ec2', region_name=region)

    # Fetch instances
    response = ec2.describe_instances()

    nodes = []
    relationships = []

    for reservation in response['Reservations']:
        vpc_id = reservation['Instances'][0].get('VpcId')

        for instance in reservation['Instances']:
            # Create node for instance
            nodes.append({
                "id": instance['InstanceId'],
                "labels": ["EC2Instance", "AWS", "Compute"],
                "properties": {
                    "name": next((tag['Value'] for tag in instance.get('Tags', []) if tag['Key'] == 'Name'), instance['InstanceId']),
                    "instance_type": instance['InstanceType'],
                    "state": instance['State']['Name'],
                    "region": region,
                    "availability_zone": instance['Placement']['AvailabilityZone'],
                    "private_ip": instance.get('PrivateIpAddress'),
                    "public_ip": instance.get('PublicIpAddress')
                }
            })

            # Create relationship to VPC
            if vpc_id:
                relationships.append({
                    "source_id": instance['InstanceId'],
                    "target_id": vpc_id,
                    "relationship_type": "RUNS_IN",
                    "properties": {}
                })

    # Ingest nodes in batch
    print(f"Ingesting {len(nodes)} EC2 instances...")
    node_result = client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id="aws-infrastructure",
        duplicate_handling="update"
    )

    print(f"✅ Nodes: {node_result['summary']['success']}/{node_result['summary']['total']}")

    # Ingest relationships
    if relationships:
        print(f"Creating {len(relationships)} relationships...")
        rel_result = client.ingestion.ingest_relationships_batch(
            relationships=relationships,
            skip_missing_nodes=True
        )

        print(f"✅ Relationships: {rel_result['summary']['success']}/{rel_result['summary']['total']}")

    return node_result

# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_aws_infrastructure(client, region="us-east-1")
```

### 2. Import Kubernetes Resources

Import Kubernetes cluster data:

```python theme={null}
from kubiya import ControlPlaneClient
from kubernetes import client as k8s_client, config

def import_kubernetes_cluster(cp_client: ControlPlaneClient, cluster_name: str):
    """Import Kubernetes deployments and services."""

    # Load kubeconfig
    config.load_kube_config()
    v1 = k8s_client.CoreV1Api()
    apps_v1 = k8s_client.AppsV1Api()

    nodes = []
    relationships = []

    # Import deployments
    deployments = apps_v1.list_deployment_for_all_namespaces()

    for dep in deployments.items:
        deployment_id = f"{dep.metadata.namespace}/{dep.metadata.name}"

        nodes.append({
            "id": deployment_id,
            "labels": ["Deployment", "Kubernetes"],
            "properties": {
                "name": dep.metadata.name,
                "namespace": dep.metadata.namespace,
                "replicas": dep.spec.replicas,
                "cluster": cluster_name,
                "image": dep.spec.template.spec.containers[0].image if dep.spec.template.spec.containers else None
            }
        })

    # Import services
    services = v1.list_service_for_all_namespaces()

    for svc in services.items:
        service_id = f"{svc.metadata.namespace}/{svc.metadata.name}"

        nodes.append({
            "id": service_id,
            "labels": ["Service", "Kubernetes"],
            "properties": {
                "name": svc.metadata.name,
                "namespace": svc.metadata.namespace,
                "type": svc.spec.type,
                "cluster": cluster_name,
                "ports": [p.port for p in svc.spec.ports] if svc.spec.ports else []
            }
        })

        # Create relationships between services and deployments
        if svc.spec.selector:
            for dep in deployments.items:
                if dep.metadata.namespace == svc.metadata.namespace:
                    deployment_id = f"{dep.metadata.namespace}/{dep.metadata.name}"

                    relationships.append({
                        "source_id": service_id,
                        "target_id": deployment_id,
                        "relationship_type": "ROUTES_TO",
                        "properties": {"selector": svc.spec.selector}
                    })

    # Batch ingest
    print(f"Ingesting {len(nodes)} Kubernetes resources...")
    node_result = cp_client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id=f"k8s-{cluster_name}",
        duplicate_handling="update"
    )

    print(f"✅ Nodes: {node_result['summary']['success']}/{node_result['summary']['total']}")

    if relationships:
        rel_result = cp_client.ingestion.ingest_relationships_batch(
            relationships=relationships,
            skip_missing_nodes=True
        )

        print(f"✅ Relationships: {rel_result['summary']['success']}/{rel_result['summary']['total']}")

    return node_result

# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_kubernetes_cluster(client, cluster_name="prod-cluster")
```

### 3. Import Service Dependencies

Import application service dependencies:

```python theme={null}
from kubiya import ControlPlaneClient
import yaml

def import_service_dependencies(client: ControlPlaneClient, dependency_file: str):
    """Import service dependencies from YAML file."""

    # Load dependency configuration
    with open(dependency_file, 'r') as f:
        config = yaml.safe_load(f)

    nodes = []
    relationships = []

    # Create nodes for services
    for service in config['services']:
        nodes.append({
            "id": service['name'],
            "labels": ["Service", "Application"],
            "properties": {
                "name": service['name'],
                "type": service.get('type', 'unknown'),
                "owner": service.get('owner'),
                "repository": service.get('repository'),
                "environment": service.get('environment', 'production')
            }
        })

        # Create dependency relationships
        for dep in service.get('dependencies', []):
            relationships.append({
                "source_id": service['name'],
                "target_id": dep['name'],
                "relationship_type": "DEPENDS_ON",
                "properties": {
                    "type": dep.get('type', 'api'),
                    "required": dep.get('required', True)
                }
            })

    # Batch ingest
    print(f"Ingesting {len(nodes)} services...")
    node_result = client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id="service-catalog",
        duplicate_handling="merge"
    )

    print(f"✅ Services: {node_result['summary']['success']}/{node_result['summary']['total']}")

    if relationships:
        print(f"Creating {len(relationships)} dependencies...")
        rel_result = client.ingestion.ingest_relationships_batch(
            relationships=relationships
        )

        print(f"✅ Dependencies: {rel_result['summary']['success']}/{rel_result['summary']['total']}")

    return node_result

# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = import_service_dependencies(client, "services.yaml")
```

Example `services.yaml`:

```yaml theme={null}
services:
  - name: auth-service
    type: api
    owner: platform-team
    repository: github.com/company/auth-service
    dependencies:
      - name: user-database
        type: database
        required: true
      - name: redis-cache
        type: cache
        required: false

  - name: api-gateway
    type: gateway
    owner: platform-team
    dependencies:
      - name: auth-service
        type: api
        required: true
```

### 4. Sync from External System

Continuously sync data from external system:

```python theme={null}
from kubiya import ControlPlaneClient
import time

class GraphSyncer:
    """Sync external data to context graph."""

    def __init__(self, client: ControlPlaneClient, dataset_id: str):
        self.client = client
        self.dataset_id = dataset_id

    def sync_resources(self, resources: list):
        """Sync a batch of resources."""

        nodes = []

        for resource in resources:
            nodes.append({
                "id": resource['id'],
                "labels": resource['labels'],
                "properties": resource['properties']
            })

        # Use update strategy to keep data fresh
        result = self.client.ingestion.ingest_nodes_batch(
            nodes=nodes,
            dataset_id=self.dataset_id,
            duplicate_handling="update",
            transactional=False
        )

        return result

    def continuous_sync(self, fetch_func, interval_seconds: int = 300):
        """Continuously sync data at intervals."""

        print(f"Starting continuous sync (interval: {interval_seconds}s)")

        while True:
            try:
                # Fetch latest data
                resources = fetch_func()

                # Sync to graph
                result = self.sync_resources(resources)

                print(f"✅ Synced {result['summary']['success']}/{result['summary']['total']} resources")

                # Wait before next sync
                time.sleep(interval_seconds)

            except Exception as e:
                print(f"❌ Sync failed: {e}")
                time.sleep(60)  # Wait 1 minute on error

# Usage
client = ControlPlaneClient(api_key="your-api-key")
syncer = GraphSyncer(client, dataset_id="external-system")

def fetch_external_resources():
    """Fetch resources from external system."""
    # Your external API call here
    return []

# Start continuous sync
# syncer.continuous_sync(fetch_external_resources, interval_seconds=300)
```

## Error Handling

```python theme={null}
from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import GraphError

client = ControlPlaneClient(api_key="your-api-key")

# Handle validation errors
try:
    result = client.ingestion.ingest_node(
        id="",  # Empty ID - invalid
        labels=[],  # Empty labels - invalid
        properties={}
    )
except GraphError as e:
    print(f"Validation failed: {e}")

# Handle batch errors
result = client.ingestion.ingest_nodes_batch(
    nodes=[...],
    transactional=False
)

if result['summary']['failed'] > 0:
    print(f"Some imports failed:")
    for error in result.get('errors', []):
        print(f"  - {error}")

# Handle missing nodes in relationships
try:
    result = client.ingestion.ingest_relationship(
        source_id="non-existent-node",
        target_id="another-missing-node",
        relationship_type="CONNECTS_TO"
    )
except GraphError as e:
    if "not found" in str(e).lower():
        print("Source or target node doesn't exist")
```

## Best Practices

### 1. Use Batch Operations

```python theme={null}
# ❌ BAD - Individual imports in loop
for node in nodes:
    client.ingestion.ingest_node(
        id=node['id'],
        labels=node['labels'],
        properties=node['properties']
    )

# ✅ GOOD - Batch import
client.ingestion.ingest_nodes_batch(nodes=nodes)
```

### 2. Choose Appropriate Duplicate Handling

```python theme={null}
# Initial import - fail on duplicates
client.ingestion.ingest_nodes_batch(
    nodes=nodes,
    duplicate_handling="error"
)

# Sync/Update - skip or update
client.ingestion.ingest_nodes_batch(
    nodes=nodes,
    duplicate_handling="update"  # or "skip"
)
```

### 3. Use Datasets for Organization

```python theme={null}
# Organize by source/purpose
client.ingestion.ingest_nodes_batch(
    nodes=aws_nodes,
    dataset_id="aws-infrastructure"
)

client.ingestion.ingest_nodes_batch(
    nodes=k8s_nodes,
    dataset_id="kubernetes-cluster"
)
```

### 4. Handle Large Imports

```python theme={null}
def chunked_import(client, nodes, chunk_size=1000):
    """Import large datasets in chunks."""

    for i in range(0, len(nodes), chunk_size):
        chunk = nodes[i:i + chunk_size]

        result = client.ingestion.ingest_nodes_batch(
            nodes=chunk,
            duplicate_handling="skip",
            transactional=False
        )

        print(f"Chunk {i // chunk_size + 1}: {result['summary']['success']} succeeded")
```

## API Reference

### Node Ingestion Methods

| Method                 | Description           | Parameters                                                       | Returns             |
| ---------------------- | --------------------- | ---------------------------------------------------------------- | ------------------- |
| `ingest_node()`        | Ingest single node    | `id`, `labels`, `properties`, `dataset_id`, `duplicate_handling` | `Dict`              |
| `ingest_nodes_batch()` | Ingest multiple nodes | `nodes`, `dataset_id`, `duplicate_handling`, `transactional`     | `Dict` with summary |

### Relationship Ingestion Methods

| Method                         | Description                   | Parameters                                                                | Returns             |
| ------------------------------ | ----------------------------- | ------------------------------------------------------------------------- | ------------------- |
| `ingest_relationship()`        | Ingest single relationship    | `source_id`, `target_id`, `relationship_type`, `properties`, `dataset_id` | `Dict`              |
| `ingest_relationships_batch()` | Ingest multiple relationships | `relationships`, `dataset_id`, `skip_missing_nodes`, `transactional`      | `Dict` with summary |

### Batch Response Structure

```python theme={null}
{
    "summary": {
        "total": int,
        "success": int,
        "failed": int,
        "skipped": int
    },
    "results": List[Dict],
    "errors": List[Dict]
}
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Datasets" icon="database" href="/sdk/context-graph-datasets">
    Manage cognitive datasets
  </Card>

  <Card title="Context Graph" icon="project-diagram" href="/sdk/context-graph">
    Query and explore the graph
  </Card>

  <Card title="Intelligent Search" icon="brain" href="/sdk/context-graph-intelligent-search">
    AI-powered graph search
  </Card>

  <Card title="Best Practices" icon="star" href="/sdk/best-practices">
    SDK best practices guide
  </Card>
</CardGroup>
