Runners Service Overview

The Kubiya Runners service provides a powerful interface for managing and monitoring Kubiya runners through the platform. It enables you to list runners with their health status and generate Kubernetes manifests for deployment with comprehensive error handling and real-time health monitoring.

Features

  • Runner Management: List all available runners with detailed information
  • Health Monitoring: Real-time health status checks for runners and their components
  • Kubernetes Integration: Generate deployment manifests for runners
  • Concurrent Health Checks: Efficient batch health status fetching
  • Comprehensive Error Handling: Detailed error reporting with runner-specific context

Core Components

RunnerService

The main service class provides two core operations:
  • list(): Retrieve all runners with their health status
  • manifest(): Generate Kubernetes manifests for runner deployment

Quick Start

Basic Usage

from kubiya_workflow_sdk import KubiyaClient
from kubiya_workflow_sdk.kubiya_services.exceptions import RunnerError, RunnerNotFoundError

# Initialize client
client = KubiyaClient(
    api_key="your-api-key",
    base_url="https://api.kubiya.ai"
)

try:
    # List all runners with health status
    runners = client.runners.list()
    
    print(f"Found {len(runners)} runners:")
    for runner in runners:
        name = runner.get('name', 'Unknown')
        version = runner.get('version', 'Unknown')
        namespace = runner.get('namespace', 'default')
        
        # Check overall health
        runner_health = runner.get('runner_health', {})
        health_status = runner_health.get('status', 'unknown')
        
        print(f"  - {name} (v{version}) in {namespace}: {health_status}")
        
        # Check component health
        tool_manager = runner.get('tool_manager_health', {})
        agent_manager = runner.get('agent_manager_health', {})
        
        print(f"    Tool Manager: {tool_manager.get('status', 'unknown')}")
        print(f"    Agent Manager: {agent_manager.get('status', 'unknown')}")
        
except RunnerError as e:
    print(f"Failed to list runners: {e}")

Generate Runner Manifest

try:
    # Generate Kubernetes manifest for a specific runner
    runner_name = "my-production-runner"
    manifest_info = client.runners.manifest(runner_name)
    
    if manifest_info.get('url'):
        manifest_url = manifest_info['url']
        print(f"Kubernetes manifest for '{runner_name}' available at: {manifest_url}")
        
        # You can download and apply the manifest
        # kubectl apply -f {manifest_url}
    else:
        print("Manifest generation failed: no URL returned")
        
except RunnerNotFoundError as e:
    print(f"Runner '{runner_name}' not found: {e}")
except RunnerError as e:
    print(f"Failed to generate manifest: {e}")

Health Status Monitoring

# Monitor runner health status
def monitor_runner_health():
    try:
        runners = client.runners.list()
        
        healthy_runners = []
        unhealthy_runners = []
        
        for runner in runners:
            runner_name = runner.get('name', 'Unknown')
            runner_health = runner.get('runner_health', {})
            health_status = runner_health.get('status', 'unknown')
            
            if health_status.lower() in ['healthy', 'ok', 'running']:
                healthy_runners.append(runner_name)
            else:
                unhealthy_runners.append({
                    'name': runner_name,
                    'status': health_status,
                    'error': runner_health.get('error', 'No error details')
                })
        
        print(f"✅ Healthy runners: {len(healthy_runners)}")
        for name in healthy_runners:
            print(f"  - {name}")
            
        if unhealthy_runners:
            print(f"❌ Unhealthy runners: {len(unhealthy_runners)}")
            for runner in unhealthy_runners:
                print(f"  - {runner['name']}: {runner['status']}")
                if runner['error']:
                    print(f"    Error: {runner['error']}")
        
        return len(healthy_runners), len(unhealthy_runners)
        
    except RunnerError as e:
        print(f"Health monitoring failed: {e}")
        return 0, 0

# Run monitoring
healthy_count, unhealthy_count = monitor_runner_health()

Error Handling

The Runners service provides specialized exceptions for different failure scenarios:

RunnerError

Base exception for all runner-related operations:
try:
    runners = client.runners.list()
except RunnerError as e:
    print(f"Runner operation failed: {e}")
    
    # Access error details if available
    if hasattr(e, 'details') and e.details:
        print(f"Error details: {e.details}")

RunnerNotFoundError

Thrown when a specific runner cannot be found:
try:
    manifest = client.runners.manifest("non-existent-runner")
except RunnerNotFoundError as e:
    print(f"Runner not found: {e}")
    print("Available runners:")
    
    # List available runners as fallback
    try:
        runners = client.runners.list()
        for runner in runners:
            print(f"  - {runner.get('name', 'Unknown')}")
    except RunnerError:
        print("  Could not fetch runner list")

RunnerHealthError

Thrown when health status checks fail:
try:
    runners = client.runners.list()
    # Health checks are performed automatically during list()
except RunnerHealthError as e:
    print(f"Health check failed: {e}")
    print("Some runners may not have current health status")

Best Practices

1. Handle Network Timeouts Gracefully

import time

def list_runners_with_retry(max_retries=3, delay=1):
    """List runners with retry logic for network issues"""
    for attempt in range(max_retries):
        try:
            return client.runners.list()
        except RunnerError as e:
            if attempt < max_retries - 1:
                print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
                time.sleep(delay)
                delay *= 2  # Exponential backoff
            else:
                raise e

runners = list_runners_with_retry()

2. Filter Runners by Health Status

def get_healthy_runners():
    """Get only healthy runners for task execution"""
    try:
        all_runners = client.runners.list()
        
        healthy_runners = []
        for runner in all_runners:
            runner_health = runner.get('runner_health', {})
            health_status = runner_health.get('status', 'unknown').lower()
            
            # Consider multiple health indicators
            if health_status in ['healthy', 'ok', 'running']:
                # Also check component health
                tool_manager = runner.get('tool_manager_health', {})
                agent_manager = runner.get('agent_manager_health', {})
                
                if (tool_manager.get('status', '').lower() in ['healthy', 'ok', 'running'] and
                    agent_manager.get('status', '').lower() in ['healthy', 'ok', 'running']):
                    healthy_runners.append(runner)
        
        return healthy_runners
        
    except RunnerError as e:
        print(f"Failed to get healthy runners: {e}")
        return []

# Use only healthy runners for critical operations
healthy_runners = get_healthy_runners()
if healthy_runners:
    print(f"Found {len(healthy_runners)} healthy runners")
else:
    print("No healthy runners available")

3. Monitor Version Compatibility

def check_runner_versions():
    """Check runner versions for compatibility"""
    try:
        runners = client.runners.list()
        
        version_groups = {}
        for runner in runners:
            version = runner.get('version', 'unknown')
            if version not in version_groups:
                version_groups[version] = []
            version_groups[version].append(runner.get('name', 'Unknown'))
        
        print("Runner version distribution:")
        for version, runner_names in version_groups.items():
            print(f"  {version}: {len(runner_names)} runners")
            for name in runner_names:
                print(f"    - {name}")
        
        # Check for outdated versions
        if 'v1' in version_groups:
            print(f"⚠️  Warning: {len(version_groups['v1'])} runners on legacy v1")
            
        return version_groups
        
    except RunnerError as e:
        print(f"Version check failed: {e}")
        return {}

version_info = check_runner_versions()

4. Validate Manifest Generation

def generate_and_validate_manifest(runner_name):
    """Generate manifest with validation"""
    try:
        # First check if runner exists
        runners = client.runners.list()
        runner_names = [r.get('name') for r in runners]
        
        if runner_name not in runner_names:
            raise RunnerNotFoundError(f"Runner '{runner_name}' not found")
        
        # Generate manifest
        manifest_info = client.runners.manifest(runner_name)
        
        if not manifest_info.get('url'):
            raise RunnerError("Manifest generation failed: no URL returned")
        
        manifest_url = manifest_info['url']
        print(f"✅ Manifest generated successfully: {manifest_url}")
        
        # Basic validation
        if not manifest_url.startswith(('http://', 'https://')):
            print("⚠️  Warning: Manifest URL may not be valid")
        
        return manifest_url
        
    except (RunnerError, RunnerNotFoundError) as e:
        print(f"❌ Manifest generation failed: {e}")
        return None

# Generate manifests for all healthy runners
healthy_runners = get_healthy_runners()
for runner in healthy_runners:
    runner_name = runner.get('name')
    if runner_name:
        generate_and_validate_manifest(runner_name)

Integration Examples

The Runners service integrates seamlessly with other Kubiya services and workflows:

Workflow Integration

# Use healthy runners in workflow execution
def execute_workflow_on_healthy_runner(workflow_config):
    healthy_runners = get_healthy_runners()
    
    if not healthy_runners:
        raise RunnerError("No healthy runners available for workflow execution")
    
    # Select first healthy runner
    selected_runner = healthy_runners[0]
    runner_name = selected_runner.get('name')
    
    print(f"Executing workflow on runner: {runner_name}")
    # Execute workflow using the workflow service
    # result = client.workflows.execute(workflow_config, runner=runner_name)
    
    return runner_name

Infrastructure Monitoring

# Monitor infrastructure health across runners
def infrastructure_health_report():
    try:
        runners = client.runners.list()
        
        report = {
            'total_runners': len(runners),
            'healthy_runners': 0,
            'unhealthy_runners': 0,
            'version_distribution': {},
            'namespace_distribution': {},
            'component_health': {
                'tool_manager': {'healthy': 0, 'unhealthy': 0},
                'agent_manager': {'healthy': 0, 'unhealthy': 0}
            }
        }
        
        for runner in runners:
            # Version tracking
            version = runner.get('version', 'unknown')
            report['version_distribution'][version] = report['version_distribution'].get(version, 0) + 1
            
            # Namespace tracking
            namespace = runner.get('namespace', 'default')
            report['namespace_distribution'][namespace] = report['namespace_distribution'].get(namespace, 0) + 1
            
            # Health tracking
            runner_health = runner.get('runner_health', {})
            if runner_health.get('status', '').lower() in ['healthy', 'ok', 'running']:
                report['healthy_runners'] += 1
            else:
                report['unhealthy_runners'] += 1
            
            # Component health tracking
            tool_manager = runner.get('tool_manager_health', {})
            if tool_manager.get('status', '').lower() in ['healthy', 'ok', 'running']:
                report['component_health']['tool_manager']['healthy'] += 1
            else:
                report['component_health']['tool_manager']['unhealthy'] += 1
                
            agent_manager = runner.get('agent_manager_health', {})
            if agent_manager.get('status', '').lower() in ['healthy', 'ok', 'running']:
                report['component_health']['agent_manager']['healthy'] += 1
            else:
                report['component_health']['agent_manager']['unhealthy'] += 1
        
        return report
        
    except RunnerError as e:
        print(f"Health report generation failed: {e}")
        return None

# Generate and display report
report = infrastructure_health_report()
if report:
    print("Infrastructure Health Report:")
    print(f"  Total Runners: {report['total_runners']}")
    print(f"  Healthy: {report['healthy_runners']}")
    print(f"  Unhealthy: {report['unhealthy_runners']}")
    print(f"  Version Distribution: {report['version_distribution']}")
    print(f"  Namespace Distribution: {report['namespace_distribution']}")

Advanced Usage

Custom Health Checks

The service performs automatic health checks, but you can implement custom validation:
def custom_runner_validation(runner):
    """Custom validation logic for runners"""
    issues = []
    
    # Check basic runner info
    if not runner.get('name'):
        issues.append("Missing runner name")
    
    if not runner.get('version'):
        issues.append("Missing version information")
    
    # Check health status
    runner_health = runner.get('runner_health', {})
    if runner_health.get('status', '').lower() not in ['healthy', 'ok', 'running']:
        issues.append(f"Unhealthy status: {runner_health.get('status', 'unknown')}")
    
    # Check component health
    tool_manager = runner.get('tool_manager_health', {})
    if tool_manager.get('status', '').lower() not in ['healthy', 'ok', 'running']:
        issues.append(f"Tool manager unhealthy: {tool_manager.get('status', 'unknown')}")
    
    agent_manager = runner.get('agent_manager_health', {})
    if agent_manager.get('status', '').lower() not in ['healthy', 'ok', 'running']:
        issues.append(f"Agent manager unhealthy: {agent_manager.get('status', 'unknown')}")
    
    return issues

# Validate all runners
runners = client.runners.list()
for runner in runners:
    runner_name = runner.get('name', 'Unknown')
    issues = custom_runner_validation(runner)
    
    if issues:
        print(f"❌ {runner_name} has issues:")
        for issue in issues:
            print(f"  - {issue}")
    else:
        print(f"✅ {runner_name} passed validation")

Next Steps