Complete guide to the Kubiya Runners service for managing and monitoring Kubiya runners
list()
: Retrieve all runners with their health statusmanifest()
: Generate Kubernetes manifests for runner deploymentfrom kubiya_workflow_sdk import KubiyaClient
from kubiya_workflow_sdk.kubiya_services.exceptions import RunnerError, RunnerNotFoundError
# Initialize client
client = KubiyaClient(
api_key="your-api-key",
base_url="https://api.kubiya.ai"
)
try:
# List all runners with health status
runners = client.runners.list()
print(f"Found {len(runners)} runners:")
for runner in runners:
name = runner.get('name', 'Unknown')
version = runner.get('version', 'Unknown')
namespace = runner.get('namespace', 'default')
# Check overall health
runner_health = runner.get('runner_health', {})
health_status = runner_health.get('status', 'unknown')
print(f" - {name} (v{version}) in {namespace}: {health_status}")
# Check component health
tool_manager = runner.get('tool_manager_health', {})
agent_manager = runner.get('agent_manager_health', {})
print(f" Tool Manager: {tool_manager.get('status', 'unknown')}")
print(f" Agent Manager: {agent_manager.get('status', 'unknown')}")
except RunnerError as e:
print(f"Failed to list runners: {e}")
try:
# Generate Kubernetes manifest for a specific runner
runner_name = "my-production-runner"
manifest_info = client.runners.manifest(runner_name)
if manifest_info.get('url'):
manifest_url = manifest_info['url']
print(f"Kubernetes manifest for '{runner_name}' available at: {manifest_url}")
# You can download and apply the manifest
# kubectl apply -f {manifest_url}
else:
print("Manifest generation failed: no URL returned")
except RunnerNotFoundError as e:
print(f"Runner '{runner_name}' not found: {e}")
except RunnerError as e:
print(f"Failed to generate manifest: {e}")
# Monitor runner health status
def monitor_runner_health():
try:
runners = client.runners.list()
healthy_runners = []
unhealthy_runners = []
for runner in runners:
runner_name = runner.get('name', 'Unknown')
runner_health = runner.get('runner_health', {})
health_status = runner_health.get('status', 'unknown')
if health_status.lower() in ['healthy', 'ok', 'running']:
healthy_runners.append(runner_name)
else:
unhealthy_runners.append({
'name': runner_name,
'status': health_status,
'error': runner_health.get('error', 'No error details')
})
print(f"✅ Healthy runners: {len(healthy_runners)}")
for name in healthy_runners:
print(f" - {name}")
if unhealthy_runners:
print(f"❌ Unhealthy runners: {len(unhealthy_runners)}")
for runner in unhealthy_runners:
print(f" - {runner['name']}: {runner['status']}")
if runner['error']:
print(f" Error: {runner['error']}")
return len(healthy_runners), len(unhealthy_runners)
except RunnerError as e:
print(f"Health monitoring failed: {e}")
return 0, 0
# Run monitoring
healthy_count, unhealthy_count = monitor_runner_health()
try:
runners = client.runners.list()
except RunnerError as e:
print(f"Runner operation failed: {e}")
# Access error details if available
if hasattr(e, 'details') and e.details:
print(f"Error details: {e.details}")
try:
manifest = client.runners.manifest("non-existent-runner")
except RunnerNotFoundError as e:
print(f"Runner not found: {e}")
print("Available runners:")
# List available runners as fallback
try:
runners = client.runners.list()
for runner in runners:
print(f" - {runner.get('name', 'Unknown')}")
except RunnerError:
print(" Could not fetch runner list")
try:
runners = client.runners.list()
# Health checks are performed automatically during list()
except RunnerHealthError as e:
print(f"Health check failed: {e}")
print("Some runners may not have current health status")
import time
def list_runners_with_retry(max_retries=3, delay=1):
"""List runners with retry logic for network issues"""
for attempt in range(max_retries):
try:
return client.runners.list()
except RunnerError as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
time.sleep(delay)
delay *= 2 # Exponential backoff
else:
raise e
runners = list_runners_with_retry()
def get_healthy_runners():
"""Get only healthy runners for task execution"""
try:
all_runners = client.runners.list()
healthy_runners = []
for runner in all_runners:
runner_health = runner.get('runner_health', {})
health_status = runner_health.get('status', 'unknown').lower()
# Consider multiple health indicators
if health_status in ['healthy', 'ok', 'running']:
# Also check component health
tool_manager = runner.get('tool_manager_health', {})
agent_manager = runner.get('agent_manager_health', {})
if (tool_manager.get('status', '').lower() in ['healthy', 'ok', 'running'] and
agent_manager.get('status', '').lower() in ['healthy', 'ok', 'running']):
healthy_runners.append(runner)
return healthy_runners
except RunnerError as e:
print(f"Failed to get healthy runners: {e}")
return []
# Use only healthy runners for critical operations
healthy_runners = get_healthy_runners()
if healthy_runners:
print(f"Found {len(healthy_runners)} healthy runners")
else:
print("No healthy runners available")
def check_runner_versions():
"""Check runner versions for compatibility"""
try:
runners = client.runners.list()
version_groups = {}
for runner in runners:
version = runner.get('version', 'unknown')
if version not in version_groups:
version_groups[version] = []
version_groups[version].append(runner.get('name', 'Unknown'))
print("Runner version distribution:")
for version, runner_names in version_groups.items():
print(f" {version}: {len(runner_names)} runners")
for name in runner_names:
print(f" - {name}")
# Check for outdated versions
if 'v1' in version_groups:
print(f"⚠️ Warning: {len(version_groups['v1'])} runners on legacy v1")
return version_groups
except RunnerError as e:
print(f"Version check failed: {e}")
return {}
version_info = check_runner_versions()
def generate_and_validate_manifest(runner_name):
"""Generate manifest with validation"""
try:
# First check if runner exists
runners = client.runners.list()
runner_names = [r.get('name') for r in runners]
if runner_name not in runner_names:
raise RunnerNotFoundError(f"Runner '{runner_name}' not found")
# Generate manifest
manifest_info = client.runners.manifest(runner_name)
if not manifest_info.get('url'):
raise RunnerError("Manifest generation failed: no URL returned")
manifest_url = manifest_info['url']
print(f"✅ Manifest generated successfully: {manifest_url}")
# Basic validation
if not manifest_url.startswith(('http://', 'https://')):
print("⚠️ Warning: Manifest URL may not be valid")
return manifest_url
except (RunnerError, RunnerNotFoundError) as e:
print(f"❌ Manifest generation failed: {e}")
return None
# Generate manifests for all healthy runners
healthy_runners = get_healthy_runners()
for runner in healthy_runners:
runner_name = runner.get('name')
if runner_name:
generate_and_validate_manifest(runner_name)
# Use healthy runners in workflow execution
def execute_workflow_on_healthy_runner(workflow_config):
healthy_runners = get_healthy_runners()
if not healthy_runners:
raise RunnerError("No healthy runners available for workflow execution")
# Select first healthy runner
selected_runner = healthy_runners[0]
runner_name = selected_runner.get('name')
print(f"Executing workflow on runner: {runner_name}")
# Execute workflow using the workflow service
# result = client.workflows.execute(workflow_config, runner=runner_name)
return runner_name
# Monitor infrastructure health across runners
def infrastructure_health_report():
try:
runners = client.runners.list()
report = {
'total_runners': len(runners),
'healthy_runners': 0,
'unhealthy_runners': 0,
'version_distribution': {},
'namespace_distribution': {},
'component_health': {
'tool_manager': {'healthy': 0, 'unhealthy': 0},
'agent_manager': {'healthy': 0, 'unhealthy': 0}
}
}
for runner in runners:
# Version tracking
version = runner.get('version', 'unknown')
report['version_distribution'][version] = report['version_distribution'].get(version, 0) + 1
# Namespace tracking
namespace = runner.get('namespace', 'default')
report['namespace_distribution'][namespace] = report['namespace_distribution'].get(namespace, 0) + 1
# Health tracking
runner_health = runner.get('runner_health', {})
if runner_health.get('status', '').lower() in ['healthy', 'ok', 'running']:
report['healthy_runners'] += 1
else:
report['unhealthy_runners'] += 1
# Component health tracking
tool_manager = runner.get('tool_manager_health', {})
if tool_manager.get('status', '').lower() in ['healthy', 'ok', 'running']:
report['component_health']['tool_manager']['healthy'] += 1
else:
report['component_health']['tool_manager']['unhealthy'] += 1
agent_manager = runner.get('agent_manager_health', {})
if agent_manager.get('status', '').lower() in ['healthy', 'ok', 'running']:
report['component_health']['agent_manager']['healthy'] += 1
else:
report['component_health']['agent_manager']['unhealthy'] += 1
return report
except RunnerError as e:
print(f"Health report generation failed: {e}")
return None
# Generate and display report
report = infrastructure_health_report()
if report:
print("Infrastructure Health Report:")
print(f" Total Runners: {report['total_runners']}")
print(f" Healthy: {report['healthy_runners']}")
print(f" Unhealthy: {report['unhealthy_runners']}")
print(f" Version Distribution: {report['version_distribution']}")
print(f" Namespace Distribution: {report['namespace_distribution']}")
def custom_runner_validation(runner):
"""Custom validation logic for runners"""
issues = []
# Check basic runner info
if not runner.get('name'):
issues.append("Missing runner name")
if not runner.get('version'):
issues.append("Missing version information")
# Check health status
runner_health = runner.get('runner_health', {})
if runner_health.get('status', '').lower() not in ['healthy', 'ok', 'running']:
issues.append(f"Unhealthy status: {runner_health.get('status', 'unknown')}")
# Check component health
tool_manager = runner.get('tool_manager_health', {})
if tool_manager.get('status', '').lower() not in ['healthy', 'ok', 'running']:
issues.append(f"Tool manager unhealthy: {tool_manager.get('status', 'unknown')}")
agent_manager = runner.get('agent_manager_health', {})
if agent_manager.get('status', '').lower() not in ['healthy', 'ok', 'running']:
issues.append(f"Agent manager unhealthy: {agent_manager.get('status', 'unknown')}")
return issues
# Validate all runners
runners = client.runners.list()
for runner in runners:
runner_name = runner.get('name', 'Unknown')
issues = custom_runner_validation(runner)
if issues:
print(f"❌ {runner_name} has issues:")
for issue in issues:
print(f" - {issue}")
else:
print(f"✅ {runner_name} passed validation")
Was this page helpful?