Monitor and manage Temporal workers and task queues in the Control Plane
The Workers Service provides access to Temporal worker management, allowing you to monitor worker health, manage registrations, and control worker lifecycle operations.
Workers in Kubiya are Temporal workers that execute agent tasks and workflows. The Workers Service enables you to:
Monitor Workers: List and inspect all registered workers in your organization
Worker Registration: Register new workers with the Control Plane
Health Management: Send heartbeats and monitor worker health
Lifecycle Control: Start and disconnect workers programmatically
Workers are typically managed automatically by the Kubiya platform, but the Workers Service provides programmatic access for advanced monitoring and management scenarios.
from kubiya import ControlPlaneClient# Initialize the clientclient = ControlPlaneClient(api_key="your-api-key")# List all workersworkers = client.workers.list()for worker in workers: print(f"Worker: {worker['runner_name']} - Status: {worker['status']}")# Get specific worker detailsworker_info = client.workers.get(runner_name="production-worker-01")print(f"Worker health: {worker_info['health_status']}")
Worker registration is typically handled automatically by the Kubiya worker deployment process. Manual registration is only needed for custom worker implementations.
Monitor all workers and alert on unhealthy workers:
Copy
Ask AI
from kubiya import ControlPlaneClientfrom datetime import datetime, timedeltafrom kubiya.resources.exceptions import WorkerErrordef monitor_worker_health(client: ControlPlaneClient): """Monitor worker health and identify unhealthy workers.""" try: workers = client.workers.list() unhealthy_workers = [] stale_workers = [] now = datetime.utcnow() for worker in workers: # Check health status if worker['health_status'] != 'healthy': unhealthy_workers.append(worker) # Check for stale heartbeats (>5 minutes) last_heartbeat = datetime.fromisoformat(worker['last_heartbeat'].replace('Z', '+00:00')) if now - last_heartbeat > timedelta(minutes=5): stale_workers.append(worker) # Report findings if unhealthy_workers: print(f"⚠️ {len(unhealthy_workers)} unhealthy workers found:") for worker in unhealthy_workers: print(f" - {worker['runner_name']}: {worker['health_status']}") if stale_workers: print(f"🔴 {len(stale_workers)} workers with stale heartbeats:") for worker in stale_workers: minutes_since = (now - datetime.fromisoformat(worker['last_heartbeat'].replace('Z', '+00:00'))).seconds // 60 print(f" - {worker['runner_name']}: {minutes_since} minutes since last heartbeat") if not unhealthy_workers and not stale_workers: print(f"✅ All {len(workers)} workers are healthy") return { "total_workers": len(workers), "healthy_workers": len(workers) - len(unhealthy_workers), "unhealthy_workers": unhealthy_workers, "stale_workers": stale_workers } except WorkerError as e: print(f"Failed to monitor workers: {e}") return None# Usageclient = ControlPlaneClient(api_key="your-api-key")health_report = monitor_worker_health(client)
from kubiya import ControlPlaneClientfrom kubiya.resources.exceptions import WorkerErrorimport timedef graceful_worker_shutdown( client: ControlPlaneClient, worker_id: str, max_wait_seconds: int = 300) -> bool: """Gracefully shut down a worker, waiting for tasks to complete.""" try: # Get current worker status workers = client.workers.list() worker = next((w for w in workers if w['worker_id'] == worker_id), None) if not worker: print(f"Worker {worker_id} not found") return False print(f"Initiating graceful shutdown for {worker['runner_name']}") # Disconnect worker (stops accepting new tasks) disconnect_response = client.workers.disconnect(worker_id=worker_id) pending_tasks = disconnect_response.get('pending_tasks', 0) if pending_tasks == 0: print("✅ Worker disconnected successfully (no pending tasks)") return True # Wait for pending tasks to complete print(f"Waiting for {pending_tasks} pending tasks to complete...") start_time = time.time() while time.time() - start_time < max_wait_seconds: # Check worker status current_worker = client.workers.get(runner_name=worker['runner_name']) if current_worker['status'] == 'disconnected': print("✅ Worker shut down gracefully") return True time.sleep(5) # Check every 5 seconds print(f"⚠️ Timeout after {max_wait_seconds}s - worker may still have pending tasks") return False except WorkerError as e: print(f"Failed to shut down worker: {e}") return False# Usageclient = ControlPlaneClient(api_key="your-api-key")success = graceful_worker_shutdown( client, worker_id="worker-abc123", max_wait_seconds=300)
import schedulefrom kubiya import ControlPlaneClientclient = ControlPlaneClient(api_key="your-api-key")def check_worker_health(): workers = client.workers.list() unhealthy = [w for w in workers if w['health_status'] != 'healthy'] if unhealthy: # Alert or take action print(f"Alert: {len(unhealthy)} unhealthy workers")# Schedule health checks every 5 minutesschedule.every(5).minutes.do(check_worker_health)