The Compose API is the main entry point for AI-powered workflow generation. It handles everything from understanding your request to generating and optionally executing workflows.
Overview
The compose()
method provides a unified interface for:
Natural language to workflow transformation
Automatic validation and refinement
Optional execution with streaming
Multiple output formats (JSON, YAML, streaming)
HTTP Endpoint
POST /compose
Content-Type : application/json
Authorization : Bearer your-api-key
Request Body
The task description in natural language. Be specific and include requirements. Examples:
“Create a workflow to backup PostgreSQL databases daily”
“Deploy a containerized app to Kubernetes with health checks”
“Set up CI/CD pipeline for Python project with tests”
Additional context to guide workflow generation. Specific runner to use for execution
List of tools/commands available
Security or operational constraints
Target environment details
Execution parameters (only used in act
mode).
Operation mode:
"plan"
: Generate workflow only
"act"
: Generate and execute workflow
Enable streaming response. When true
, returns Server-Sent Events.
Streaming format:
"sse"
: Server-Sent Events format
"vercel"
: Vercel AI SDK format
null
: Raw ADK events
Session ID for conversation continuity. Automatically generated if not provided.
User ID for namespacing and tracking. Defaults to "default_user"
.
Plan Mode Response
Act Mode Response
{
"workflow" : {
"name" : "backup-databases" ,
"description" : "Automated database backup workflow" ,
"runner" : "kubiya-hosted" ,
"steps" : [
{
"name" : "backup_postgres" ,
"tool" : "pg_dump" ,
"parameters" : {
"database" : "production" ,
"output" : "/backups/db-backup.sql"
}
}
]
},
"metadata" : {
"generated_at" : "2024-01-15T10:30:00Z" ,
"model" : "gpt-4o" ,
"tokens_used" : 1250
}
}
Usage Examples
cURL Example
Python SDK
TypeScript SDK
JavaScript Fetch
curl -X POST http://localhost:8001/compose \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-api-key" \
-d '{
"task": "Deploy my Node.js app to Kubernetes staging",
"mode": "plan",
"context": {
"environment": "staging",
"preferred_runner": "k8s-staging"
}
}'
Streaming Examples
Streaming Generation
Server-Sent Events
Streaming with cURL
# Stream the generation process
async for event in adk.compose(
task = "Deploy application to Kubernetes" ,
mode = "plan" ,
stream = True
):
# Handle SSE events
if event.startswith( "data: " ):
data = json.loads(event[ 6 :])
print ( f " { data[ 'type' ] } : { data.get( 'content' , '' ) } " )
Event Types
When streaming is enabled, various event types are emitted:
Generation Events
Execution Events (Act Mode)
// Text generation progress
{ "type" : "text" , "content" : "Analyzing requirements..." }
// Tool calls (loading context)
{ "type" : "tool_call" , "name" : "get_runners" , "arguments" : {}}
// Tool results
{ "type" : "tool_result" , "name" : "get_runners" , "result" : [ ... ]}
// Workflow ready
{ "type" : "workflow" , "data" : { ... }}
Error Handling
The compose API handles various error scenarios:
try :
result = await adk.compose( task = "..." )
except ProviderError as e:
# Handle generation failures
print ( f "Generation failed: { e } " )
# Validation errors are automatically handled
# The AI will attempt to fix them
# But you can set limits:
config = ADKConfig( max_loop_iterations = 2 )
adk = get_provider( "adk" , config = config)
async for event in adk.compose( task = "..." , mode = "act" ):
if event.get( "type" ) == "error" :
# Handle execution errors
logger.error( f "Execution error: { event[ 'message' ] } " )
# Decide whether to continue or abort
import asyncio
try :
async with asyncio.timeout( 300 ): # 5 minute timeout
result = await adk.compose( task = "..." )
except asyncio.TimeoutError:
print ( "Generation timed out" )
Advanced Configuration
Custom Models
from kubiya_workflow_sdk.providers.adk import ADKConfig
config = ADKConfig(
model_overrides = {
"workflow_generator" : "together_ai/Qwen/QwQ-32B-Preview" ,
"refinement" : "together_ai/deepseek-ai/DeepSeek-V3"
}
)
adk = get_provider( "adk" , config = config)
config = ADKConfig(
max_loop_iterations = 5 , # More refinement attempts
timeout = 600 , # 10 minute timeout
enable_caching = True , # Cache context loading
stream_buffer_size = 2048 # Larger streaming buffer
)
Custom Filters
# Filter streaming events
async for event in adk.compose(
task = "..." ,
stream = True ,
stream_filter = {
"include_tool_calls" : False , # Skip tool events
"include_thoughts" : True , # Include reasoning
"min_importance" : "medium" # Filter by importance
}
):
process_filtered_event(event)
Integration Examples
FastAPI Endpoint
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class ComposeRequest ( BaseModel ):
task: str
mode: str = "plan"
context: dict = {}
parameters: dict = {}
@app.post ( "/api/compose" )
async def compose_workflow ( request : ComposeRequest):
try :
adk = get_provider( "adk" )
if request.mode == "plan" :
# Non-streaming for plan mode
result = await adk.compose(
task = request.task,
context = request.context,
mode = "plan" ,
stream = False
)
return result
else :
# For act mode, use websocket instead
raise HTTPException(
status_code = 400 ,
detail = "Use WebSocket endpoint for act mode"
)
except Exception as e:
raise HTTPException( status_code = 500 , detail = str (e))
WebSocket Streaming
from fastapi import WebSocket
import json
@app.websocket ( "/ws/compose" )
async def compose_stream ( websocket : WebSocket):
await websocket.accept()
try :
# Receive request
data = await websocket.receive_json()
adk = get_provider( "adk" )
# Stream responses
async for event in adk.compose(
task = data[ "task" ],
mode = data.get( "mode" , "plan" ),
context = data.get( "context" , {}),
parameters = data.get( "parameters" , {}),
stream = True ,
stream_format = "vercel"
):
await websocket.send_text(event)
except Exception as e:
await websocket.send_json({
"type" : "error" ,
"message" : str (e)
})
finally :
await websocket.close()
Best Practices
Be Specific Provide detailed task descriptions for better results
Use Context Include relevant context about your environment
Handle Errors Always implement proper error handling
Monitor Usage Track token usage and generation times
Common Patterns
Retry with Refinement
async def compose_with_retry ( task : str , max_attempts : int = 3 ):
for attempt in range (max_attempts):
try :
result = await adk.compose(
task = task,
context = {
"attempt" : attempt + 1 ,
"previous_errors" : locals ().get( "errors" , [])
}
)
return result
except ProviderError as e:
errors = locals ().get( "errors" , [])
errors.append( str (e))
if attempt == max_attempts - 1 :
raise
await asyncio.sleep( 2 ** attempt)
Progress Tracking
class ProgressTracker :
def __init__ ( self ):
self .stages = {
"context_loading" : False ,
"generation" : False ,
"validation" : False ,
"execution" : False
}
async def track_compose ( self , adk , task , mode ):
async for event in adk.compose( task = task, mode = mode, stream = True ):
# Update progress based on events
if "Loading context" in str (event):
self .stages[ "context_loading" ] = True
elif "Generating workflow" in str (event):
self .stages[ "generation" ] = True
# ... etc
yield event
Troubleshooting
Ensure your task description is clear: # Too vague
task = "backup stuff"
# Better
task = "Create a workflow to backup all PostgreSQL databases to S3 daily at 2 AM"
Increase timeout for complex workflows: config = ADKConfig( timeout = 900 ) # 15 minutes
adk = get_provider( "adk" , config = config)
Ensure context is properly formatted: # Wrong
context = "use kubernetes runner"
# Correct
context = {
"preferred_runner" : "kubernetes-runner" ,
"namespace" : "production"
}