Monitor and manage Temporal workers and task queues in the Control Plane
The Workers Service provides access to Temporal worker management, allowing you to monitor worker health, manage registrations, and control worker lifecycle operations.
Use the Workers Service when you need deeper visibility into how your workers are behaving in different environments, want to proactively detect unhealthy or stale workers, or manage custom worker deployments where you control registration and lifecycle from your own automation or dashboards.
Workers in Kubiya are Temporal workers that execute agent tasks and workflows. The Workers Service enables you to:
Monitor Workers: List and inspect all registered workers in your organization
Worker Registration: Register new workers with the Control Plane
Health Management: Send heartbeats and monitor worker health
Lifecycle Control: Start and disconnect workers programmatically
Workers are typically managed automatically by the Kubiya platform, but the Workers Service provides programmatic access for advanced monitoring and management scenarios.
from kubiya import ControlPlaneClient# Initialize the clientclient = ControlPlaneClient(api_key="your-api-key")# List all workersworkers = client.workers.list()for worker in workers: print(f"Worker: {worker['runner_name']} - Status: {worker['status']}")# Get specific worker detailsworker_info = client.workers.get(runner_name="production-worker-01")print(f"Worker health: {worker_info['health_status']}")
Worker registration is typically handled automatically by the Kubiya worker deployment process. Manual registration is only needed for custom worker implementations.
The following examples show how to use the Workers Service for real-world scenarios, such as monitoring worker health, managing registrations, and handling worker lifecycle operations. Each example includes a short explanation of when and why you might use it.
Use this pattern to build periodic health checks or dashboards that surface unhealthy or stale workers before they impact SLAs or user-facing workflows.Monitor all workers and alert on unhealthy workers:
Copy
Ask AI
from kubiya import ControlPlaneClientfrom datetime import datetime, timedeltafrom kubiya.resources.exceptions import WorkerErrordef monitor_worker_health(client: ControlPlaneClient): """Monitor worker health and identify unhealthy workers.""" try: workers = client.workers.list() unhealthy_workers = [] stale_workers = [] now = datetime.utcnow() for worker in workers: # Check health status if worker['health_status'] != 'healthy': unhealthy_workers.append(worker) # Check for stale heartbeats (>5 minutes) last_heartbeat = datetime.fromisoformat(worker['last_heartbeat'].replace('Z', '+00:00')) if now - last_heartbeat > timedelta(minutes=5): stale_workers.append(worker) # Report findings if unhealthy_workers: print(f"⚠️ {len(unhealthy_workers)} unhealthy workers found:") for worker in unhealthy_workers: print(f" - {worker['runner_name']}: {worker['health_status']}") if stale_workers: print(f"🔴 {len(stale_workers)} workers with stale heartbeats:") for worker in stale_workers: minutes_since = (now - datetime.fromisoformat(worker['last_heartbeat'].replace('Z', '+00:00'))).seconds // 60 print(f" - {worker['runner_name']}: {minutes_since} minutes since last heartbeat") if not unhealthy_workers and not stale_workers: print(f"✅ All {len(workers)} workers are healthy") return { "total_workers": len(workers), "healthy_workers": len(workers) - len(unhealthy_workers), "unhealthy_workers": unhealthy_workers, "stale_workers": stale_workers } except WorkerError as e: print(f"Failed to monitor workers: {e}") return None# Usageclient = ControlPlaneClient(api_key="your-api-key")health_report = monitor_worker_health(client)
Use this analysis when you need to understand how workers are distributed across task queues and capabilities so you can plan capacity, rebalance load, or justify scaling decisions.Analyze worker capacity and utilization:
Copy
Ask AI
from kubiya import ControlPlaneClientfrom typing import Dict, Anydef analyze_worker_capacity(client: ControlPlaneClient) -> Dict[str, Any]: """Analyze worker capacity and utilization.""" workers = client.workers.list() total_workers = len(workers) active_workers = sum(1 for w in workers if w['status'] == 'active') # Group by task queue queue_distribution = {} for worker in workers: queue = worker['task_queue'] if queue not in queue_distribution: queue_distribution[queue] = 0 queue_distribution[queue] += 1 # Group by capabilities capability_distribution = {} for worker in workers: for cap in worker.get('capabilities', []): if cap not in capability_distribution: capability_distribution[cap] = 0 capability_distribution[cap] += 1 print("Worker Capacity Analysis") print("=" * 50) print(f"Total Workers: {total_workers}") print(f"Active Workers: {active_workers}") print(f"Utilization: {(active_workers / total_workers * 100):.1f}%") print("\nTask Queue Distribution:") for queue, count in queue_distribution.items(): print(f" {queue}: {count} workers") print("\nCapability Distribution:") for cap, count in capability_distribution.items(): print(f" {cap}: {count} workers") return { "total_workers": total_workers, "active_workers": active_workers, "utilization": active_workers / total_workers, "queue_distribution": queue_distribution, "capability_distribution": capability_distribution }# Usageclient = ControlPlaneClient(api_key="your-api-key")capacity_report = analyze_worker_capacity(client)
Use this approach when you own the worker deployment pipeline and want resilient registration logic that automatically retries transient failures instead of failing a rollout.Register a worker with automatic retry on failure:
Copy
Ask AI
from kubiya import ControlPlaneClientfrom kubiya.resources.exceptions import WorkerErrorimport timedef register_worker_with_retry( client: ControlPlaneClient, registration_data: dict, max_retries: int = 3) -> dict: """Register worker with automatic retry on failure.""" for attempt in range(max_retries): try: response = client.workers.register(registration_data) print(f"✅ Worker registered successfully: {response['worker_id']}") return response except WorkerError as e: if attempt == max_retries - 1: print(f"❌ Failed to register worker after {max_retries} attempts") raise wait_time = 2 ** attempt # Exponential backoff print(f"⚠️ Registration attempt {attempt + 1} failed: {e}") print(f"Retrying in {wait_time} seconds...") time.sleep(wait_time)# Usageclient = ControlPlaneClient(api_key="your-api-key")registration_data = { "runner_name": "resilient-worker-04", "task_queue": "kubiya-task-queue", "capabilities": ["agno-runtime"], "metadata": { "environment": "production", "version": "1.0.0" }}worker = register_worker_with_retry(client, registration_data)
Use this strategy when you need to drain workers for maintenance or deployments without dropping tasks or interrupting long-running jobs.Implement graceful worker shutdown:
Copy
Ask AI
from kubiya import ControlPlaneClientfrom kubiya.resources.exceptions import WorkerErrorimport timedef graceful_worker_shutdown( client: ControlPlaneClient, worker_id: str, max_wait_seconds: int = 300) -> bool: """Gracefully shut down a worker, waiting for tasks to complete.""" try: # Get current worker status workers = client.workers.list() worker = next((w for w in workers if w['worker_id'] == worker_id), None) if not worker: print(f"Worker {worker_id} not found") return False print(f"Initiating graceful shutdown for {worker['runner_name']}") # Disconnect worker (stops accepting new tasks) disconnect_response = client.workers.disconnect(worker_id=worker_id) pending_tasks = disconnect_response.get('pending_tasks', 0) if pending_tasks == 0: print("✅ Worker disconnected successfully (no pending tasks)") return True # Wait for pending tasks to complete print(f"Waiting for {pending_tasks} pending tasks to complete...") start_time = time.time() while time.time() - start_time < max_wait_seconds: # Check worker status current_worker = client.workers.get(runner_name=worker['runner_name']) if current_worker['status'] == 'disconnected': print("✅ Worker shut down gracefully") return True time.sleep(5) # Check every 5 seconds print(f"⚠️ Timeout after {max_wait_seconds}s - worker may still have pending tasks") return False except WorkerError as e: print(f"Failed to shut down worker: {e}") return False# Usageclient = ControlPlaneClient(api_key="your-api-key")success = graceful_worker_shutdown( client, worker_id="worker-abc123", max_wait_seconds=300)
Worker operations can fail for several reasons: a worker might not exist anymore, registration data can be invalid, or network and authentication issues can prevent the Control Plane from responding. The following patterns show how to handle worker-specific WorkerError exceptions alongside common SDK errors, so your monitoring and management scripts fail gracefully.
Copy
Ask AI
from kubiya import ControlPlaneClientfrom kubiya.resources.exceptions import WorkerErrorfrom kubiya.core.exceptions import ( APIError as KubiyaAPIError, AuthenticationError as KubiyaAuthenticationError, TimeoutError as KubiyaTimeoutError)client = ControlPlaneClient(api_key="your-api-key")# Handle worker-specific errorstry: workers = client.workers.list()except WorkerError as e: print(f"Worker operation failed: {e}")except KubiyaAuthenticationError as e: print(f"Authentication failed: {e}")except KubiyaTimeoutError as e: print(f"Request timed out: {e}")except KubiyaAPIError as e: print(f"API error: {e.status_code} - {e.message}")# Handle worker not foundtry: worker = client.workers.get(runner_name="non-existent-worker")except WorkerError as e: if "not found" in str(e).lower(): print("Worker not found") else: print(f"Error getting worker: {e}")# Handle registration failurestry: response = client.workers.register({ "runner_name": "", # Invalid empty name "task_queue": "kubiya-task-queue" })except WorkerError as e: print(f"Registration failed: {e}") # Handle validation errors appropriately
Run lightweight periodic health checks so you can catch unhealthy or stale workers early and react before they cause incidents.
Copy
Ask AI
import schedulefrom kubiya import ControlPlaneClientclient = ControlPlaneClient(api_key="your-api-key")def check_worker_health(): workers = client.workers.list() unhealthy = [w for w in workers if w['health_status'] != 'healthy'] if unhealthy: # Alert or take action print(f"Alert: {len(unhealthy)} unhealthy workers")# Schedule health checks every 5 minutesschedule.every(5).minutes.do(check_worker_health)
Implement dedicated heartbeat loops in your worker processes so the Control Plane always has an up-to-date view of their status, even when they are idle.
Copy
Ask AI
import timefrom kubiya import ControlPlaneClientfrom kubiya.resources.exceptions import WorkerErrordef worker_heartbeat_loop(client: ControlPlaneClient, worker_id: str, interval: int = 30): """Send periodic heartbeats from worker.""" while True: try: client.workers.heartbeat_simple(worker_id=worker_id) time.sleep(interval) except WorkerError as e: print(f"Heartbeat failed: {e}") time.sleep(interval) # Continue trying
Include rich metadata when registering workers so you can slice and filter your fleet by environment, region, version, or team when debugging or planning capacity.