Skip to main content
The Workers Service provides access to Temporal worker management, allowing you to monitor worker health, manage registrations, and control worker lifecycle operations.

Overview

Workers in Kubiya are Temporal workers that execute agent tasks and workflows. The Workers Service enables you to:
  • Monitor Workers: List and inspect all registered workers in your organization
  • Worker Registration: Register new workers with the Control Plane
  • Health Management: Send heartbeats and monitor worker health
  • Lifecycle Control: Start and disconnect workers programmatically
Workers are typically managed automatically by the Kubiya platform, but the Workers Service provides programmatic access for advanced monitoring and management scenarios.

Quick Start

from kubiya import ControlPlaneClient

# Initialize the client
client = ControlPlaneClient(api_key="your-api-key")

# List all workers
workers = client.workers.list()
for worker in workers:
    print(f"Worker: {worker['runner_name']} - Status: {worker['status']}")

# Get specific worker details
worker_info = client.workers.get(runner_name="production-worker-01")
print(f"Worker health: {worker_info['health_status']}")

Core Concepts

Workers vs Task Queues

In Temporal terminology, workers poll task queues for work. In Kubiya:
  • Worker: A physical process that executes agent tasks
  • Task Queue: A logical queue that workers poll for tasks
  • Runner: The deployment unit that hosts workers

Worker Lifecycle

  1. Registration: Worker registers with Control Plane on startup
  2. Heartbeats: Worker sends periodic heartbeats to indicate health
  3. Active: Worker is polling for and executing tasks
  4. Disconnected: Worker gracefully shuts down or loses connection

Basic Usage

List All Workers

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# List all workers
workers = client.workers.list()

print(f"Total workers: {len(workers)}")
for worker in workers:
    print(f"Runner: {worker['runner_name']}")
    print(f"Status: {worker['status']}")
    print(f"Task Queue: {worker['task_queue']}")
    print(f"Last Heartbeat: {worker['last_heartbeat']}")
    print("---")
[
  {
    "runner_name": "production-worker-01",
    "task_queue": "kubiya-task-queue",
    "status": "active",
    "health_status": "healthy",
    "last_heartbeat": "2025-12-11T10:30:00Z",
    "registered_at": "2025-12-11T08:00:00Z",
    "worker_id": "worker-abc123",
    "capabilities": ["agno-runtime", "claude-code"]
  },
  {
    "runner_name": "staging-worker-02",
    "task_queue": "kubiya-task-queue-staging",
    "status": "active",
    "health_status": "healthy",
    "last_heartbeat": "2025-12-11T10:29:45Z",
    "registered_at": "2025-12-11T09:15:00Z",
    "worker_id": "worker-def456",
    "capabilities": ["agno-runtime"]
  }
]

Get Worker Details

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Get specific worker by runner name
worker = client.workers.get(runner_name="production-worker-01")

print(f"Worker ID: {worker['worker_id']}")
print(f"Status: {worker['status']}")
print(f"Health: {worker['health_status']}")
print(f"Task Queue: {worker['task_queue']}")
print(f"Capabilities: {', '.join(worker['capabilities'])}")
print(f"Last Heartbeat: {worker['last_heartbeat']}")

Worker Management

Register a Worker

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Register new worker
registration_data = {
    "runner_name": "new-worker-03",
    "task_queue": "kubiya-task-queue",
    "capabilities": ["agno-runtime", "claude-code"],
    "metadata": {
        "environment": "production",
        "region": "us-east-1",
        "version": "1.0.0"
    }
}

response = client.workers.register(registration_data)
print(f"Worker registered: {response['worker_id']}")
print(f"Registration status: {response['status']}")
Worker registration is typically handled automatically by the Kubiya worker deployment process. Manual registration is only needed for custom worker implementations.

Send Worker Heartbeat

from kubiya import ControlPlaneClient
import time

client = ControlPlaneClient(api_key="your-api-key")

# Send detailed heartbeat
heartbeat_data = {
    "worker_id": "worker-abc123",
    "status": "active",
    "health_metrics": {
        "cpu_usage": 45.2,
        "memory_usage": 68.5,
        "task_count": 3,
        "queue_depth": 12
    },
    "timestamp": int(time.time())
}

response = client.workers.heartbeat(heartbeat_data)
print(f"Heartbeat acknowledged: {response['acknowledged']}")

Send Simple Heartbeat

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Send simple heartbeat (just worker_id)
response = client.workers.heartbeat_simple(worker_id="worker-abc123")
print(f"Heartbeat status: {response['status']}")
print(f"Next heartbeat expected in: {response['interval']} seconds")
Use heartbeat_simple() for lightweight health checks. Use heartbeat() when you need to send detailed health metrics and status information.

Start a Worker

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Start worker with configuration
start_data = {
    "max_concurrent_tasks": 10,
    "poll_interval_ms": 100,
    "capabilities": ["agno-runtime"]
}

response = client.workers.start(
    worker_id="worker-abc123",
    start_data=start_data
)
print(f"Worker started: {response['status']}")

Disconnect a Worker

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Gracefully disconnect worker
response = client.workers.disconnect(worker_id="worker-abc123")
print(f"Worker disconnected: {response['status']}")
print(f"Pending tasks: {response.get('pending_tasks', 0)}")
Disconnecting a worker will prevent it from accepting new tasks. Ensure any in-flight tasks are completed before disconnecting to avoid task failures.

Practical Examples

1. Worker Health Monitor

Monitor all workers and alert on unhealthy workers:
from kubiya import ControlPlaneClient
from datetime import datetime, timedelta
from kubiya.resources.exceptions import WorkerError

def monitor_worker_health(client: ControlPlaneClient):
    """Monitor worker health and identify unhealthy workers."""
    try:
        workers = client.workers.list()

        unhealthy_workers = []
        stale_workers = []
        now = datetime.utcnow()

        for worker in workers:
            # Check health status
            if worker['health_status'] != 'healthy':
                unhealthy_workers.append(worker)

            # Check for stale heartbeats (>5 minutes)
            last_heartbeat = datetime.fromisoformat(worker['last_heartbeat'].replace('Z', '+00:00'))
            if now - last_heartbeat > timedelta(minutes=5):
                stale_workers.append(worker)

        # Report findings
        if unhealthy_workers:
            print(f"⚠️  {len(unhealthy_workers)} unhealthy workers found:")
            for worker in unhealthy_workers:
                print(f"  - {worker['runner_name']}: {worker['health_status']}")

        if stale_workers:
            print(f"🔴 {len(stale_workers)} workers with stale heartbeats:")
            for worker in stale_workers:
                minutes_since = (now - datetime.fromisoformat(worker['last_heartbeat'].replace('Z', '+00:00'))).seconds // 60
                print(f"  - {worker['runner_name']}: {minutes_since} minutes since last heartbeat")

        if not unhealthy_workers and not stale_workers:
            print(f"✅ All {len(workers)} workers are healthy")

        return {
            "total_workers": len(workers),
            "healthy_workers": len(workers) - len(unhealthy_workers),
            "unhealthy_workers": unhealthy_workers,
            "stale_workers": stale_workers
        }

    except WorkerError as e:
        print(f"Failed to monitor workers: {e}")
        return None

# Usage
client = ControlPlaneClient(api_key="your-api-key")
health_report = monitor_worker_health(client)

2. Worker Capacity Planner

Analyze worker capacity and utilization:
from kubiya import ControlPlaneClient
from typing import Dict, Any

def analyze_worker_capacity(client: ControlPlaneClient) -> Dict[str, Any]:
    """Analyze worker capacity and utilization."""
    workers = client.workers.list()

    total_workers = len(workers)
    active_workers = sum(1 for w in workers if w['status'] == 'active')

    # Group by task queue
    queue_distribution = {}
    for worker in workers:
        queue = worker['task_queue']
        if queue not in queue_distribution:
            queue_distribution[queue] = 0
        queue_distribution[queue] += 1

    # Group by capabilities
    capability_distribution = {}
    for worker in workers:
        for cap in worker.get('capabilities', []):
            if cap not in capability_distribution:
                capability_distribution[cap] = 0
            capability_distribution[cap] += 1

    print("Worker Capacity Analysis")
    print("=" * 50)
    print(f"Total Workers: {total_workers}")
    print(f"Active Workers: {active_workers}")
    print(f"Utilization: {(active_workers / total_workers * 100):.1f}%")
    print("\nTask Queue Distribution:")
    for queue, count in queue_distribution.items():
        print(f"  {queue}: {count} workers")
    print("\nCapability Distribution:")
    for cap, count in capability_distribution.items():
        print(f"  {cap}: {count} workers")

    return {
        "total_workers": total_workers,
        "active_workers": active_workers,
        "utilization": active_workers / total_workers,
        "queue_distribution": queue_distribution,
        "capability_distribution": capability_distribution
    }

# Usage
client = ControlPlaneClient(api_key="your-api-key")
capacity_report = analyze_worker_capacity(client)

3. Worker Registration with Retry

Register a worker with automatic retry on failure:
from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import WorkerError
import time

def register_worker_with_retry(
    client: ControlPlaneClient,
    registration_data: dict,
    max_retries: int = 3
) -> dict:
    """Register worker with automatic retry on failure."""
    for attempt in range(max_retries):
        try:
            response = client.workers.register(registration_data)
            print(f"✅ Worker registered successfully: {response['worker_id']}")
            return response

        except WorkerError as e:
            if attempt == max_retries - 1:
                print(f"❌ Failed to register worker after {max_retries} attempts")
                raise

            wait_time = 2 ** attempt  # Exponential backoff
            print(f"⚠️  Registration attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)

# Usage
client = ControlPlaneClient(api_key="your-api-key")

registration_data = {
    "runner_name": "resilient-worker-04",
    "task_queue": "kubiya-task-queue",
    "capabilities": ["agno-runtime"],
    "metadata": {
        "environment": "production",
        "version": "1.0.0"
    }
}

worker = register_worker_with_retry(client, registration_data)

4. Graceful Worker Shutdown

Implement graceful worker shutdown:
from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import WorkerError
import time

def graceful_worker_shutdown(
    client: ControlPlaneClient,
    worker_id: str,
    max_wait_seconds: int = 300
) -> bool:
    """Gracefully shut down a worker, waiting for tasks to complete."""
    try:
        # Get current worker status
        workers = client.workers.list()
        worker = next((w for w in workers if w['worker_id'] == worker_id), None)

        if not worker:
            print(f"Worker {worker_id} not found")
            return False

        print(f"Initiating graceful shutdown for {worker['runner_name']}")

        # Disconnect worker (stops accepting new tasks)
        disconnect_response = client.workers.disconnect(worker_id=worker_id)
        pending_tasks = disconnect_response.get('pending_tasks', 0)

        if pending_tasks == 0:
            print("✅ Worker disconnected successfully (no pending tasks)")
            return True

        # Wait for pending tasks to complete
        print(f"Waiting for {pending_tasks} pending tasks to complete...")
        start_time = time.time()

        while time.time() - start_time < max_wait_seconds:
            # Check worker status
            current_worker = client.workers.get(runner_name=worker['runner_name'])

            if current_worker['status'] == 'disconnected':
                print("✅ Worker shut down gracefully")
                return True

            time.sleep(5)  # Check every 5 seconds

        print(f"⚠️  Timeout after {max_wait_seconds}s - worker may still have pending tasks")
        return False

    except WorkerError as e:
        print(f"Failed to shut down worker: {e}")
        return False

# Usage
client = ControlPlaneClient(api_key="your-api-key")
success = graceful_worker_shutdown(
    client,
    worker_id="worker-abc123",
    max_wait_seconds=300
)

Error Handling

from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import WorkerError
from kubiya.core.exceptions import (
    APIError as KubiyaAPIError,
    AuthenticationError as KubiyaAuthenticationError,
    TimeoutError as KubiyaTimeoutError
)

client = ControlPlaneClient(api_key="your-api-key")

# Handle worker-specific errors
try:
    workers = client.workers.list()
except WorkerError as e:
    print(f"Worker operation failed: {e}")
except KubiyaAuthenticationError as e:
    print(f"Authentication failed: {e}")
except KubiyaTimeoutError as e:
    print(f"Request timed out: {e}")
except KubiyaAPIError as e:
    print(f"API error: {e.status_code} - {e.message}")

# Handle worker not found
try:
    worker = client.workers.get(runner_name="non-existent-worker")
except WorkerError as e:
    if "not found" in str(e).lower():
        print("Worker not found")
    else:
        print(f"Error getting worker: {e}")

# Handle registration failures
try:
    response = client.workers.register({
        "runner_name": "",  # Invalid empty name
        "task_queue": "kubiya-task-queue"
    })
except WorkerError as e:
    print(f"Registration failed: {e}")
    # Handle validation errors appropriately

Best Practices

1. Monitor Worker Health Regularly

import schedule
from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

def check_worker_health():
    workers = client.workers.list()
    unhealthy = [w for w in workers if w['health_status'] != 'healthy']

    if unhealthy:
        # Alert or take action
        print(f"Alert: {len(unhealthy)} unhealthy workers")

# Schedule health checks every 5 minutes
schedule.every(5).minutes.do(check_worker_health)

2. Implement Heartbeat Loops

import time
from kubiya import ControlPlaneClient
from kubiya.resources.exceptions import WorkerError

def worker_heartbeat_loop(client: ControlPlaneClient, worker_id: str, interval: int = 30):
    """Send periodic heartbeats from worker."""
    while True:
        try:
            client.workers.heartbeat_simple(worker_id=worker_id)
            time.sleep(interval)
        except WorkerError as e:
            print(f"Heartbeat failed: {e}")
            time.sleep(interval)  # Continue trying

3. Handle Worker Disconnections Gracefully

from kubiya import ControlPlaneClient

def safe_disconnect(client: ControlPlaneClient, worker_id: str):
    """Safely disconnect worker with error handling."""
    try:
        response = client.workers.disconnect(worker_id=worker_id)

        if response.get('pending_tasks', 0) > 0:
            print(f"Warning: Worker has {response['pending_tasks']} pending tasks")

        return True
    except Exception as e:
        print(f"Failed to disconnect worker: {e}")
        return False

4. Use Metadata for Worker Tracking

from kubiya import ControlPlaneClient

# Include useful metadata during registration
registration_data = {
    "runner_name": "prod-worker-01",
    "task_queue": "kubiya-task-queue",
    "capabilities": ["agno-runtime"],
    "metadata": {
        "environment": "production",
        "region": "us-east-1",
        "version": "1.2.3",
        "deployment_id": "deploy-abc123",
        "team": "platform"
    }
}

client = ControlPlaneClient(api_key="your-api-key")
client.workers.register(registration_data)

API Reference

Methods

MethodDescriptionParametersReturns
list()List all registered workersNoneList[Dict[str, Any]]
get(runner_name)Get worker detailsrunner_name: strDict[str, Any]
register(registration_data)Register new workerregistration_data: Dict[str, Any]Dict[str, Any]
heartbeat(heartbeat_data)Send worker heartbeatheartbeat_data: Dict[str, Any]Dict[str, Any]
heartbeat_simple(worker_id)Send simple heartbeatworker_id: strDict[str, Any]
start(worker_id, start_data)Start a workerworker_id: str, start_data: Optional[Dict]Dict[str, Any]
disconnect(worker_id)Disconnect a workerworker_id: strDict[str, Any]

Worker Object Structure

{
    "worker_id": "worker-abc123",
    "runner_name": "production-worker-01",
    "task_queue": "kubiya-task-queue",
    "status": "active",  # active, idle, disconnected
    "health_status": "healthy",  # healthy, unhealthy, unknown
    "last_heartbeat": "2025-12-11T10:30:00Z",
    "registered_at": "2025-12-11T08:00:00Z",
    "capabilities": ["agno-runtime", "claude-code"],
    "metadata": {
        "environment": "production",
        "region": "us-east-1",
        "version": "1.0.0"
    }
}

Registration Data Structure

{
    "runner_name": "worker-name",
    "task_queue": "kubiya-task-queue",
    "capabilities": ["agno-runtime", "claude-code"],
    "metadata": {
        "environment": "production",
        "region": "us-east-1",
        "version": "1.0.0"
    }
}

Heartbeat Data Structure

{
    "worker_id": "worker-abc123",
    "status": "active",
    "health_metrics": {
        "cpu_usage": 45.2,
        "memory_usage": 68.5,
        "task_count": 3,
        "queue_depth": 12
    },
    "timestamp": 1702301400
}

Next Steps