Workflows Service Overview

The Kubiya Workflows service provides a powerful interface for executing and managing workflows through the Kubiya platform. It enables you to execute workflow definitions with real-time streaming support, parameter injection, and comprehensive error handling.

Features

  • Workflow Execution: Execute workflows with flexible parameter injection
  • Real-time Streaming: Stream workflow execution logs in real-time
  • Runner Flexibility: Support for multiple runners and automatic runner selection
  • Comprehensive Error Handling: Detailed error reporting with workflow-specific context
  • Execution Management: List and filter workflow executions with pagination

Core Components

WorkflowService

The main service class provides two core operations:
  • execute(): Execute workflow definitions with streaming support
  • list(): List and filter workflow executions with pagination

Quick Start

Basic Usage

from kubiya_workflow_sdk import KubiyaClient
from kubiya_workflow_sdk.kubiya_services.exceptions import WorkflowExecutionError

# Initialize client
client = KubiyaClient(
    api_key="your-api-key",
    base_url="https://api.kubiya.ai"
)

# Define workflow
workflow_def = {
    "name": "example-workflow",
    "steps": [
        {
            "name": "example-step",
            "command": "echo date",
            "executor": {
                "type": "command",
                "config": {}
            }
        }
    ]
}

# Execute workflow parameters
parameters = {
    "name": "World"
}

try:
    # Execute workflow with streaming
    for event in client.workflows.execute(
        workflow_definition=workflow_definition,
        parameters=parameters,
        stream=True
    ):
        print(f"Event: {event}")
        
except WorkflowExecutionError as e:
    print(f"Workflow execution failed: {e}")
    if e.details.get("step"):
        print(f"Failed at step: {e.details['step']}")

Non-Streaming Execution

# Execute workflow without streaming to get final result
try:
    result = client.workflows.execute(
        workflow_definition=workflow_definition,
        parameters=parameters,
        stream=False
    )
    
    print(f"Workflow completed: {result}")
    
    # Access execution events
    events = result.get("events", [])
    for event in events:
        print(f"Event: {event}")
        
except WorkflowExecutionError as e:
    print(f"Workflow execution failed: {e}")

Using JSON Workflow Definitions

import json

# Workflow as JSON string
workflow_json = '''
{
    "name": "data-processing-workflow",
    "description": "Process and transform data",
    "steps": [
        {
            "name": "fetch_data",
            "action": "http_request",
            "parameters": {
                "url": "{{ data_source_url }}",
                "method": "GET"
            }
        },
        {
            "name": "process_data",
            "action": "transform",
            "parameters": {
                "operation": "{{ transform_operation }}"
            }
        }
    ]
}
'''

# Parameters for the workflow
parameters = {
    "data_source_url": "https://api.example.com/data",
    "transform_operation": "normalize"
}

try:
    # Execute with JSON string workflow definition
    for event in client.workflows.execute(
        workflow_definition=workflow_json,
        parameters=parameters,
        stream=True
    ):
        print(f"Processing: {event}")
        
except WorkflowExecutionError as e:
    print(f"Execution failed: {e}")
    if "Invalid workflow JSON" in str(e):
        print("Check your JSON syntax")

Using Custom Runners

# Execute workflow with specific runner
try:
    for event in client.workflows.execute(
        workflow_definition=workflow_definition,
        parameters=parameters,
        runner="custom-runner-id",
        stream=True
    ):
        print(f"Event: {event}")
        
except WorkflowExecutionError as e:
    print(f"Execution failed on custom runner: {e}")

Workflow Execution Listing

# List recent workflow executions
try:
    executions = client.workflows.list(
        filter="all",
        limit=20,
        offset=0
    )
    
    print(f"Found {len(executions.get('workflows', []))} executions")
    
    for execution in executions.get('workflows', []):
        print(f"ID: {execution.get('id')}")
        print(f"Status: {execution.get('status')}")
        print(f"Started: {execution.get('started_at')}")
        
except Exception as e:
    print(f"Failed to list executions: {e}")

# Filter by status
try:
    running_executions = client.workflows.list(
        filter="running",
        limit=10
    )
    
    print(f"Running executions: {len(running_executions.get('workflows', []))}")
    
    completed_executions = client.workflows.list(
        filter="completed",
        limit=10
    )
    
    print(f"Completed executions: {len(completed_executions.get('workflows', []))}")
    
    failed_executions = client.workflows.list(
        filter="failed",
        limit=10
    )
    
    print(f"Failed executions: {len(failed_executions.get('workflows', []))}")
    
except Exception as e:
    print(f"Failed to filter executions: {e}")

Error Handling

The Workflows service provides specialized exceptions for different failure scenarios:

WorkflowExecutionError

Thrown when workflow execution fails:
try:
    result = client.workflows.execute(workflow_definition, parameters)
except WorkflowExecutionError as e:
    print(f"Execution failed: {e}")
    print(f"Workflow ID: {e.details.get('workflow_id')}")
    print(f"Execution ID: {e.details.get('execution_id')}")
    
    # Check if failure occurred at specific step
    failed_step = e.details.get("step")
    if failed_step:
        print(f"Failed at step: {failed_step}")

JSON Validation Errors

import json

# Handle invalid JSON workflow definitions
try:
    invalid_json = '{"name": "test", "steps": [invalid json}'
    
    result = client.workflows.execute(
        workflow_definition=invalid_json,
        parameters={}
    )
    
except WorkflowExecutionError as e:
    if "Invalid workflow JSON" in str(e):
        print("Workflow definition contains invalid JSON")
        print(f"Error: {e}")

Best Practices

1. Always Validate Workflow Definitions

# Validate workflow structure before execution
def validate_workflow(workflow_def):
    """Validate basic workflow structure"""
    required_fields = ["name", "steps"]
    
    if isinstance(workflow_def, str):
        try:
            workflow_def = json.loads(workflow_def)
        except json.JSONDecodeError:
            raise ValueError("Invalid JSON in workflow definition")
    
    for field in required_fields:
        if field not in workflow_def:
            raise ValueError(f"Missing required field: {field}")
    
    if not isinstance(workflow_def["steps"], list):
        raise ValueError("Steps must be a list")
    
    return workflow_def

# Use validation before execution
try:
    validated_workflow = validate_workflow(workflow_definition)
    
    result = client.workflows.execute(
        workflow_definition=validated_workflow,
        parameters=parameters
    )
    
except ValueError as e:
    print(f"Validation failed: {e}")
except WorkflowExecutionError as e:
    print(f"Execution failed: {e}")

2. Handle Streaming Gracefully

# Stream with proper error handling and processing
def process_workflow_stream(client, workflow_def, params):
    """Process workflow stream with error handling"""
    try:
        event_count = 0
        
        for event in client.workflows.execute(
            workflow_definition=workflow_def,
            parameters=params,
            stream=True
        ):
            event_count += 1
            
            # Process different event types
            if isinstance(event, dict):
                event_type = event.get("type", "unknown")
                
                if event_type == "step_start":
                    print(f"🏁 Started step: {event.get('step_name')}")
                elif event_type == "step_complete":
                    print(f"✅ Completed step: {event.get('step_name')}")
                elif event_type == "error":
                    print(f"❌ Error: {event.get('message')}")
                    break
                else:
                    print(f"📋 {event}")
            else:
                print(f"📋 {event}")
        
        print(f"Processed {event_count} events")
        
    except WorkflowExecutionError as e:
        print(f"Streaming failed: {e}")
        
        # Check for specific error context
        if e.details.get("step"):
            print(f"Failed at step: {e.details['step']}")

# Use the stream processor
process_workflow_stream(client, workflow_definition, parameters)

3. Use Meaningful Parameters

# Organize parameters clearly
workflow_parameters = {
    # Environment configuration
    "environment": "production",
    "region": "us-east-1",
    
    # Application parameters
    "app_name": "my-service",
    "version": "1.2.0",
    
    # Resource configuration
    "cpu_limit": "500m",
    "memory_limit": "1Gi",
    
    # Feature flags
    "enable_logging": True,
    "debug_mode": False
}

# Execute with organized parameters
result = client.workflows.execute(
    workflow_definition=workflow_definition,
    parameters=workflow_parameters,
    stream=False
)

4. Monitor Execution Progress

# Track execution progress and metrics
import time
from datetime import datetime

def monitor_workflow_execution(client, workflow_def, params):
    """Monitor workflow execution with metrics"""
    start_time = datetime.now()
    
    try:
        print(f"Starting workflow execution at {start_time}")
        
        step_count = 0
        error_count = 0
        
        for event in client.workflows.execute(
            workflow_definition=workflow_def,
            parameters=params,
            stream=True
        ):
            if isinstance(event, dict):
                event_type = event.get("type", "")
                
                if "step" in event_type:
                    step_count += 1
                elif "error" in event_type:
                    error_count += 1
            
            # Log progress every 10 events
            if (step_count + error_count) % 10 == 0:
                elapsed = datetime.now() - start_time
                print(f"Progress: {step_count} steps, {error_count} errors, {elapsed} elapsed")
        
        end_time = datetime.now()
        total_time = end_time - start_time
        
        print(f"Execution completed in {total_time}")
        print(f"Total steps: {step_count}, Errors: {error_count}")
        
    except WorkflowExecutionError as e:
        end_time = datetime.now()
        total_time = end_time - start_time
        
        print(f"Execution failed after {total_time}: {e}")

# Monitor execution
monitor_workflow_execution(client, workflow_definition, parameters)

Integration Examples

The Workflows service integrates seamlessly with other Kubiya services and can be used to orchestrate complex automation scenarios.

Common Workflow Patterns

  • Data Processing: ETL workflows with multiple transformation steps
  • Infrastructure Management: Deployment and configuration workflows
  • Testing Automation: Automated testing pipelines with validation steps
  • Monitoring and Alerting: Event-driven workflows for incident response
  • Business Process Automation: Multi-step approval and notification workflows

Next Steps