Workflows Service API Reference

Complete reference documentation for all methods and exceptions in the Kubiya Workflows service.

Classes

WorkflowService

Main service class for executing and managing workflows.
class WorkflowService(BaseService):
    """Service for managing workflows"""

Methods

execute(workflow_definition: Union[Dict[str, Any], str], parameters: Optional[Dict[str, Any]] = None, stream: bool = True, runner: Optional[str] = None) -> Union[Dict[str, Any], Generator[str, None, None]]
Execute a workflow with optional parameter injection and streaming support. Parameters:
  • workflow_definition (Union[Dict[str, Any], str]): Workflow definition as dictionary or JSON string
  • parameters (Optional[Dict[str, Any]]): Parameters to inject into the workflow (default: None)
  • stream (bool): Whether to stream the execution in real-time (default: True)
  • runner (Optional[str]): Specific runner to use for execution (default: None, uses default runner)
Returns:
  • For streaming (stream=True): Generator[str, None, None] - Generator yielding execution events
  • For non-streaming (stream=False): Dict[str, Any] - Final response containing all events
Response Structure (Non-streaming):
{
    "events": [
        # List of execution events
        {"type": "step_start", "step_name": "greeting", "timestamp": "..."},
        {"type": "step_complete", "step_name": "greeting", "result": "..."},
        # ... more events
    ]
}
Streaming Events: Each streaming event can be:
  • String: Raw log output
  • Dictionary: Structured event data with type, timestamp, and context
Raises:
  • WorkflowExecutionError: For execution-specific errors with detailed context
Example:
# Streaming execution
workflow_def = {
    "name": "example-workflow",
    "steps": [
        {
            "name": "example-step",
            "command": "echo date",
            "executor": {
                "type": "command",
                "config": {}
            }
        }
    ]
}

parameters = {"name": "World"}

try:
    # Stream execution events
    for event in client.workflows.execute(
        workflow_definition=workflow_def,
        parameters=parameters,
        stream=True,
        runner="custom-runner"
    ):
        print(f"Event: {event}")
        
        # Handle structured events
        if isinstance(event, dict):
            event_type = event.get("type")
            
            if event_type == "step_start":
                print(f"🏁 Starting step: {event.get('step_name')}")
            elif event_type == "step_complete":
                print(f"✅ Completed step: {event.get('step_name')}")
            elif event_type == "error":
                print(f"❌ Error: {event.get('message')}")
                break
                
except WorkflowExecutionError as e:
    print(f"Execution failed: {e}")
    
    # Access error context
    print(f"Workflow ID: {e.details.get('workflow_id')}")
    print(f"Execution ID: {e.details.get('execution_id')}")
    
    if e.details.get("step"):
        print(f"Failed at step: {e.details['step']}")

# Non-streaming execution
try:
    result = client.workflows.execute(
        workflow_definition=workflow_def,
        parameters=parameters,
        stream=False
    )
    
    print(f"Execution completed with {len(result['events'])} events")
    
    # Process events
    for event in result["events"]:
        print(f"Event: {event}")
        
except WorkflowExecutionError as e:
    print(f"Execution failed: {e}")

# JSON string workflow definition
json_workflow = '''
{
    "name": "json-workflow",
    "description": "Workflow from JSON string",
    "steps": [
        {
            "name": "process",
            "action": "transform",
            "parameters": {
                "input": "{{ input_data }}",
                "operation": "{{ transform_type }}"
            }
        }
    ]
}
'''

try:
    for event in client.workflows.execute(
        workflow_definition=json_workflow,
        parameters={
            "input_data": "sample data",
            "transform_type": "uppercase"
        }
    ):
        print(f"Processing: {event}")
        
except WorkflowExecutionError as e:
    if "Invalid workflow JSON" in str(e):
        print("JSON parsing failed - check syntax")
    else:
        print(f"Execution error: {e}")
list(filter: str = "all", limit: int = 10, offset: int = 0) -> Dict[str, Any]
List workflow executions with filtering and pagination support. Parameters:
  • filter (str): Filter criteria for executions (default: “all”)
    • "all": All executions regardless of status
    • "running": Currently executing workflows
    • "completed": Successfully completed workflows
    • "failed": Failed workflow executions
  • limit (int): Maximum number of results to return (default: 10)
  • offset (int): Number of results to skip for pagination (default: 0)
Returns:
  • Dict[str, Any]: Dictionary containing workflow executions and metadata
Raises:
  • Generic exceptions for API errors
Example:
# List all recent executions
try:
    executions = client.workflows.list()
    
    print(f"Found {len(executions['workflows'])} executions")
    
    # Display execution summaries
    for execution in executions['workflows']:
        status = execution.get('status', 'unknown')
        name = execution.get('workflow_name', 'unnamed')
        duration = execution.get('duration', 0)
        
        print(f"📋 {name}: {status} ({duration}s)")
        
    # Check pagination
    pagination = executions.get('pagination', {})
    if pagination.get('has_more'):
        print(f"More results available (total: {pagination.get('total')})")
        
except Exception as e:
    print(f"Failed to list executions: {e}")

# Filter by status with pagination
try:
    # Get running executions
    running = client.workflows.list(
        filter="running",
        limit=20,
        offset=0
    )
    
    print(f"Running workflows: {len(running['workflows'])}")
    
    # Get recent failures with larger limit
    failed = client.workflows.list(
        filter="failed", 
        limit=50,
        offset=0
    )
    
    print(f"Failed workflows: {len(failed['workflows'])}")
    
    # Paginate through completed executions
    page_size = 25
    page = 0
    all_completed = []
    
    while True:
        completed = client.workflows.list(
            filter="completed",
            limit=page_size,
            offset=page * page_size
        )
        
        page_workflows = completed.get('workflows', [])
        if not page_workflows:
            break
            
        all_completed.extend(page_workflows)
        
        # Check if more pages available
        pagination = completed.get('pagination', {})
        if not pagination.get('has_more'):
            break
            
        page += 1
        
    print(f"Total completed workflows: {len(all_completed)}")
    
except Exception as e:
    print(f"Failed to filter executions: {e}")

# Analyze execution patterns
try:
    all_executions = client.workflows.list(
        filter="all",
        limit=100
    )
    
    workflows = all_executions.get('workflows', [])
    
    # Group by status
    status_counts = {}
    total_duration = 0
    
    for execution in workflows:
        status = execution.get('status', 'unknown')
        duration = execution.get('duration', 0)
        
        status_counts[status] = status_counts.get(status, 0) + 1
        total_duration += duration
    
    print("Execution Summary:")
    for status, count in status_counts.items():
        print(f"  {status}: {count}")
    
    if workflows:
        avg_duration = total_duration / len(workflows)
        print(f"Average duration: {avg_duration:.1f}s")
        
except Exception as e:
    print(f"Failed to analyze executions: {e}")

Exceptions

WorkflowError (Base Exception)

Base exception class for all workflow-related errors.
class WorkflowError(KubiyaSDKError):
    """Exception for workflow-related errors"""

Attributes

  • workflow_id (Optional[str]): ID of the workflow that caused the error
  • execution_id (Optional[str]): ID of the specific execution that failed
  • details (Dict[str, Any]): Additional error context and metadata

Error Details Structure

{
    "workflow_id": "workflow-uuid",
    "execution_id": "execution-uuid"
}

WorkflowExecutionError

Specialized exception for workflow execution failures.
class WorkflowExecutionError(WorkflowError):
    """Exception for workflow execution errors"""

Attributes

  • workflow_id (Optional[str]): ID of the workflow that failed
  • execution_id (Optional[str]): ID of the failed execution
  • step (Optional[str]): Name of the step where failure occurred
  • details (Dict[str, Any]): Complete error context

Error Details Structure

{
    "workflow_id": "workflow-uuid",
    "execution_id": "execution-uuid", 
    "step": "step-name",
    # Additional context depending on error type
}

Common Error Scenarios

JSON Parsing Errors:
try:
    invalid_json = '{"name": "test", invalid json}'
    
    result = client.workflows.execute(
        workflow_definition=invalid_json
    )
    
except WorkflowExecutionError as e:
    if "Invalid workflow JSON" in str(e):
        print("Workflow definition contains invalid JSON syntax")
        print(f"Error details: {e}")
Step Execution Errors:
try:
    result = client.workflows.execute(
        workflow_definition=workflow_def,
        parameters=params
    )
    
except WorkflowExecutionError as e:
    # Check which step failed
    failed_step = e.details.get("step")
    if failed_step:
        print(f"Workflow failed at step: {failed_step}")
        
        # Handle specific step failures
        if failed_step == "validation":
            print("Input validation failed - check parameters")
        elif failed_step == "api_call":
            print("External API call failed - check connectivity")
        elif failed_step == "data_transform":
            print("Data transformation failed - check input format")
    
    # Access workflow context
    workflow_id = e.details.get("workflow_id")
    execution_id = e.details.get("execution_id")
    
    if workflow_id and execution_id:
        print(f"Failed execution: {execution_id} of workflow: {workflow_id}")
Runner and Infrastructure Errors:
try:
    result = client.workflows.execute(
        workflow_definition=workflow_def,
        runner="custom-runner"
    )
    
except WorkflowExecutionError as e:
    error_message = str(e)
    
    # Check for runner-specific issues
    if "runner" in error_message.lower():
        print("Runner issue detected")
        
        if "not found" in error_message:
            print("Specified runner not available")
        elif "capacity" in error_message:
            print("Runner at capacity - try again later")
        elif "permission" in error_message:
            print("Insufficient permissions for runner")
    
    # Check for resource constraints
    if "timeout" in error_message.lower():
        print("Execution timed out - consider breaking into smaller steps")
    elif "memory" in error_message.lower():
        print("Memory limit exceeded - optimize workflow")
    elif "cpu" in error_message.lower():
        print("CPU limit exceeded - reduce computational complexity")
Parameter and Validation Errors:
try:
    result = client.workflows.execute(
        workflow_definition=workflow_def,
        parameters=invalid_params
    )
    
except WorkflowExecutionError as e:
    error_message = str(e)
    
    # Handle parameter-related errors
    if "parameter" in error_message.lower():
        print("Parameter validation failed")
        
        # Suggest parameter fixes
        print("Check required parameters are provided")
        print("Verify parameter types and formats")
        print("Ensure parameter names match workflow definition")
    
    # Handle workflow definition errors
    if "definition" in error_message.lower():
        print("Workflow definition validation failed")
        
        print("Verify workflow has required fields: name, steps")
        print("Check step definitions are properly formatted")
        print("Ensure action types are supported")

Example: Comprehensive Error Handling

def execute_workflow_with_error_handling(client, workflow_def, params=None):
    """Execute workflow with comprehensive error handling"""
    
    try:
        print("Starting workflow execution...")
        
        result = client.workflows.execute(
            workflow_definition=workflow_def,
            parameters=params or {},
            stream=True
        )
        
        event_count = 0
        for event in result:
            event_count += 1
            
            # Process events
            if isinstance(event, dict):
                event_type = event.get("type", "")
                
                if event_type == "error":
                    print(f"❌ Workflow error: {event.get('message')}")
                    return False
                elif event_type == "step_complete":
                    print(f"✅ Step completed: {event.get('step_name')}")
                elif event_type == "workflow_complete":
                    print(f"🎉 Workflow completed successfully")
                    return True
            else:
                print(f"📋 {event}")
        
        print(f"Processed {event_count} events")
        return True
        
    except WorkflowExecutionError as e:
        print(f"❌ Workflow execution failed: {e}")
        
        # Categorize and handle different error types
        error_message = str(e).lower()
        
        if "invalid workflow json" in error_message:
            print("🔧 Fix: Check JSON syntax in workflow definition")
            
        elif e.details.get("step"):
            failed_step = e.details["step"]
            print(f"🔧 Fix: Review step '{failed_step}' configuration")
            
        elif "runner" in error_message:
            print("🔧 Fix: Check runner availability and permissions")
            
        elif "timeout" in error_message:
            print("🔧 Fix: Consider breaking workflow into smaller steps")
            
        elif "parameter" in error_message:
            print("🔧 Fix: Verify all required parameters are provided")
            
        else:
            print("🔧 Fix: Check workflow definition and execution environment")
        
        # Log detailed error context
        if e.details:
            print(f"Error context: {e.details}")
            
        return False
        
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        print("🔧 Fix: Check network connectivity and API credentials")
        return False

# Usage
success = execute_workflow_with_error_handling(
    client=client,
    workflow_def=workflow_definition,
    params=parameters
)

if success:
    print("Workflow completed successfully!")
else:
    print("Workflow execution failed - check error messages above")
This API reference provides complete documentation for all public interfaces in the Workflows service. Use the examples and error handling patterns to build robust workflow execution and management systems.