The Kubiya Workflows service provides a powerful interface for executing and managing workflows through the Kubiya platform.
It enables you to execute workflow definitions with real-time streaming support, parameter injection, and comprehensive error handling.
# Execute workflow without streaming to get final resulttry: result = client.workflows.execute( workflow_definition=workflow_definition, parameters=parameters, stream=False ) print(f"Workflow completed: {result}") # Access execution events events = result.get("events", []) for event in events: print(f"Event: {event}")except WorkflowExecutionError as e: print(f"Workflow execution failed: {e}")
# Execute workflow with specific runnertry: for event in client.workflows.execute( workflow_definition=workflow_definition, parameters=parameters, runner="custom-runner-id", stream=True ): print(f"Event: {event}")except WorkflowExecutionError as e: print(f"Execution failed on custom runner: {e}")
try: result = client.workflows.execute(workflow_definition, parameters)except WorkflowExecutionError as e: print(f"Execution failed: {e}") print(f"Workflow ID: {e.details.get('workflow_id')}") print(f"Execution ID: {e.details.get('execution_id')}") # Check if failure occurred at specific step failed_step = e.details.get("step") if failed_step: print(f"Failed at step: {failed_step}")
# Validate workflow structure before executiondef validate_workflow(workflow_def): """Validate basic workflow structure""" required_fields = ["name", "steps"] if isinstance(workflow_def, str): try: workflow_def = json.loads(workflow_def) except json.JSONDecodeError: raise ValueError("Invalid JSON in workflow definition") for field in required_fields: if field not in workflow_def: raise ValueError(f"Missing required field: {field}") if not isinstance(workflow_def["steps"], list): raise ValueError("Steps must be a list") return workflow_def# Use validation before executiontry: validated_workflow = validate_workflow(workflow_definition) result = client.workflows.execute( workflow_definition=validated_workflow, parameters=parameters )except ValueError as e: print(f"Validation failed: {e}")except WorkflowExecutionError as e: print(f"Execution failed: {e}")