ADK Multi-Agent Architecture

Serverless AI Agents at Your Service

🚀 Serverless Agents with Function Calling

ADK exposes Kubiya’s serverless AI agents that leverage state-of-the-art techniques:

  • Function Calling: Agents use simple function calls to execute workflow tools
  • Serverless Execution: Spin up on-demand, no persistent processes
  • Tool Integration: Seamlessly call workflow steps as functions
  • Best-in-Class Models: Leverages top AI models for reasoning
  • Deterministic Outputs: Structured responses that fit into workflows

The Power of Inline Agents

Unlike traditional agent frameworks that require complex setups, Kubiya’s inline agents are:

  1. Just workflow steps - No separate agent infrastructure
  2. Function-based - Agents call tools through simple function interfaces
  3. Context-aware - Access workflow state and previous outputs
  4. Serverless - No always-on processes consuming resources
  5. Integrated - Part of your workflow DAG, not external
# Serverless agent as a workflow step
.step("intelligent-analysis")
.inline_agent(
    message="Analyze system health and recommend actions",
    agent_name="sre-assistant",
    ai_instructions="You are an SRE expert. Use the provided tools to gather metrics.",
    
    # Agent uses function calling to execute tools
    tools=[
        {
            "name": "get_cpu_usage",
            "type": "function",
            "description": "Get current CPU usage across pods",
            "implementation": "kubectl top pods | awk '{print $2}'"
        },
        {
            "name": "check_error_logs", 
            "type": "function",
            "description": "Count errors in application logs",
            "implementation": "kubectl logs -l app=myapp | grep ERROR | wc -l"
        }
    ],
    
    runners=["kubiya-hosted"]  # Serverless execution
)
.output("SRE_ANALYSIS")

How ADK Agents Work

The ADK provider orchestrates multiple specialized agents to transform natural language into production-ready workflows:

Agent Roles

1. Loop Orchestrator

The Loop Orchestrator is the conductor of the entire process:

  • Purpose: Coordinates all other agents
  • Responsibilities:
    • Manages the generation lifecycle
    • Handles error recovery and retries
    • Enforces iteration limits
    • Maintains conversation state
# Orchestrator configuration
config = ADKConfig(
    max_loop_iterations=3,  # Maximum refinement attempts
    enable_streaming=True,  # Stream agent outputs
    timeout=300            # Overall timeout
)

2. Context Loader Agent

Loads platform-specific context to inform workflow generation:

  • Fetches: Runners, integrations, secrets, organization settings
  • Outputs: Structured context for the generator
  • Uses: Kubiya API tools

The Context Loader ensures generated workflows use only available resources and integrations.

Loaded Context Includes:

  • Available runners and their capabilities
  • Installed integrations (Slack, AWS, etc.)
  • Accessible secrets (names only, not values)
  • Organization policies and constraints

3. Workflow Generator Agent

The creative powerhouse that generates workflow code:

  • Model: DeepSeek V3 (default) or configurable
  • Input: Task description + platform context
  • Output: Python SDK code using the DSL
from kubiya_workflow_sdk.dsl import Workflow, Step, parallel

workflow = Workflow(name="backup-databases")
workflow.description = "Automated database backup workflow"
workflow.runner = "kubiya-hosted"

# Parallel backup of multiple databases
backup_steps = []
for db in ["users", "orders", "inventory"]:
    step = Step(
        name=f"backup_{db}",
        tool="pg_dump",
        parameters={
            "database": db,
            "output": f"/backups/{db}_{{date}}.sql"
        }
    )
    backup_steps.append(step)

workflow.add_step(parallel(*backup_steps))

# Compress backups
workflow.add_step(Step(
    name="compress",
    tool="tar",
    parameters={
        "input": "/backups/*.sql",
        "output": "/backups/backup_{{date}}.tar.gz"
    }
))

4. Compiler Agent

Validates and compiles the generated code:

  • Validates: Syntax, imports, workflow structure
  • Compiles: Python code to workflow JSON
  • Checks: Dependencies, circular references
  • Reports: Errors and warnings

Common Validation Checks:

  • ✓ Valid Python syntax
  • ✓ Correct SDK imports and usage
  • ✓ Runner exists and is accessible
  • ✓ Required integrations are available
  • ✓ Step dependencies are valid
  • ✓ No circular dependencies

5. Refinement Agent

Fixes errors through intelligent iteration:

  • Input: Original code + compilation errors
  • Process: Analyzes errors and generates fixes
  • Smart: Learns from previous attempts

The refinement agent will attempt up to max_loop_iterations fixes before giving up.

Refinement Strategies:

  • Fix syntax errors
  • Resolve missing imports
  • Correct API usage
  • Add error handling
  • Optimize performance

6. Output Agent

Formats the final result:

  • Formats: JSON, YAML, or streaming events
  • Adds: Metadata, usage instructions
  • Handles: Both plan and act modes

Agent Communication

Agents communicate through structured messages:

# Example agent message
message = {
    "role": "workflow_generator",
    "content": {
        "generated_code": "...",
        "confidence": 0.95,
        "alternatives": []
    },
    "metadata": {
        "model": "deepseek-v3",
        "tokens_used": 1234,
        "generation_time": 2.5
    }
}

Customizing Agent Behavior

Model Selection

Different models for different agents:

config = ADKConfig(
    model_overrides={
        "orchestrator": "together_ai/deepseek-ai/DeepSeek-V3",
        "workflow_generator": "together_ai/Qwen/QwQ-32B-Preview",
        "compiler": "together_ai/google/gemma-2b-it",
        "refinement": "together_ai/deepseek-ai/DeepSeek-V3"
    }
)

Custom Tools

Add custom tools to agents:

from kubiya_workflow_sdk.providers.adk.tools import Tool

# Custom validation tool
@Tool(name="custom_validator")
def validate_security_policies(workflow_code: str) -> dict:
    """Validate workflow against security policies."""
    # Custom validation logic
    return {"valid": True, "warnings": []}

# Add to compiler agent
compiler_agent.add_tool(validate_security_policies)

Agent Hooks

Intercept agent communications:

def on_agent_message(agent_name: str, message: dict):
    """Log or modify agent messages."""
    print(f"[{agent_name}] {message.get('content', {}).get('status')}")
    
    # Modify message if needed
    if agent_name == "workflow_generator":
        message["metadata"]["custom_flag"] = True
    
    return message

adk.set_message_hook(on_agent_message)

Performance Optimization

Monitoring Agents

Logging

Enable detailed agent logging:

import logging

# Enable ADK agent logging
logging.getLogger("kubiya_workflow_sdk.providers.adk.agents").setLevel(logging.DEBUG)

# Or specific agents
logging.getLogger("kubiya_workflow_sdk.providers.adk.agents.generator").setLevel(logging.DEBUG)

Metrics

Track agent performance:

# Get agent metrics
metrics = adk.get_agent_metrics()

print(f"Total requests: {metrics['total_requests']}")
print(f"Average generation time: {metrics['avg_generation_time']}s")
print(f"Refinement rate: {metrics['refinement_rate']}%")
print(f"Success rate: {metrics['success_rate']}%")

Streaming Agent Events

Monitor agents in real-time:

async for event in adk.compose(task="...", stream=True, include_agent_events=True):
    if event["type"] == "agent_event":
        agent = event["agent"]
        status = event["status"]
        print(f"[{agent}] {status}")

Best Practices

Model Selection

Use powerful models for generation, fast models for validation

Context Optimization

Provide only necessary context to reduce token usage

Error Handling

Always set reasonable iteration limits and timeouts

Monitoring

Track agent metrics to optimize performance

Troubleshooting

Next Steps

ADK Multi-Agent Architecture

Serverless AI Agents at Your Service

🚀 Serverless Agents with Function Calling

ADK exposes Kubiya’s serverless AI agents that leverage state-of-the-art techniques:

  • Function Calling: Agents use simple function calls to execute workflow tools
  • Serverless Execution: Spin up on-demand, no persistent processes
  • Tool Integration: Seamlessly call workflow steps as functions
  • Best-in-Class Models: Leverages top AI models for reasoning
  • Deterministic Outputs: Structured responses that fit into workflows

The Power of Inline Agents

Unlike traditional agent frameworks that require complex setups, Kubiya’s inline agents are:

  1. Just workflow steps - No separate agent infrastructure
  2. Function-based - Agents call tools through simple function interfaces
  3. Context-aware - Access workflow state and previous outputs
  4. Serverless - No always-on processes consuming resources
  5. Integrated - Part of your workflow DAG, not external
# Serverless agent as a workflow step
.step("intelligent-analysis")
.inline_agent(
    message="Analyze system health and recommend actions",
    agent_name="sre-assistant",
    ai_instructions="You are an SRE expert. Use the provided tools to gather metrics.",
    
    # Agent uses function calling to execute tools
    tools=[
        {
            "name": "get_cpu_usage",
            "type": "function",
            "description": "Get current CPU usage across pods",
            "implementation": "kubectl top pods | awk '{print $2}'"
        },
        {
            "name": "check_error_logs", 
            "type": "function",
            "description": "Count errors in application logs",
            "implementation": "kubectl logs -l app=myapp | grep ERROR | wc -l"
        }
    ],
    
    runners=["kubiya-hosted"]  # Serverless execution
)
.output("SRE_ANALYSIS")

How ADK Agents Work

The ADK provider orchestrates multiple specialized agents to transform natural language into production-ready workflows:

Agent Roles

1. Loop Orchestrator

The Loop Orchestrator is the conductor of the entire process:

  • Purpose: Coordinates all other agents
  • Responsibilities:
    • Manages the generation lifecycle
    • Handles error recovery and retries
    • Enforces iteration limits
    • Maintains conversation state
# Orchestrator configuration
config = ADKConfig(
    max_loop_iterations=3,  # Maximum refinement attempts
    enable_streaming=True,  # Stream agent outputs
    timeout=300            # Overall timeout
)

2. Context Loader Agent

Loads platform-specific context to inform workflow generation:

  • Fetches: Runners, integrations, secrets, organization settings
  • Outputs: Structured context for the generator
  • Uses: Kubiya API tools

The Context Loader ensures generated workflows use only available resources and integrations.

Loaded Context Includes:

  • Available runners and their capabilities
  • Installed integrations (Slack, AWS, etc.)
  • Accessible secrets (names only, not values)
  • Organization policies and constraints

3. Workflow Generator Agent

The creative powerhouse that generates workflow code:

  • Model: DeepSeek V3 (default) or configurable
  • Input: Task description + platform context
  • Output: Python SDK code using the DSL
from kubiya_workflow_sdk.dsl import Workflow, Step, parallel

workflow = Workflow(name="backup-databases")
workflow.description = "Automated database backup workflow"
workflow.runner = "kubiya-hosted"

# Parallel backup of multiple databases
backup_steps = []
for db in ["users", "orders", "inventory"]:
    step = Step(
        name=f"backup_{db}",
        tool="pg_dump",
        parameters={
            "database": db,
            "output": f"/backups/{db}_{{date}}.sql"
        }
    )
    backup_steps.append(step)

workflow.add_step(parallel(*backup_steps))

# Compress backups
workflow.add_step(Step(
    name="compress",
    tool="tar",
    parameters={
        "input": "/backups/*.sql",
        "output": "/backups/backup_{{date}}.tar.gz"
    }
))

4. Compiler Agent

Validates and compiles the generated code:

  • Validates: Syntax, imports, workflow structure
  • Compiles: Python code to workflow JSON
  • Checks: Dependencies, circular references
  • Reports: Errors and warnings

Common Validation Checks:

  • ✓ Valid Python syntax
  • ✓ Correct SDK imports and usage
  • ✓ Runner exists and is accessible
  • ✓ Required integrations are available
  • ✓ Step dependencies are valid
  • ✓ No circular dependencies

5. Refinement Agent

Fixes errors through intelligent iteration:

  • Input: Original code + compilation errors
  • Process: Analyzes errors and generates fixes
  • Smart: Learns from previous attempts

The refinement agent will attempt up to max_loop_iterations fixes before giving up.

Refinement Strategies:

  • Fix syntax errors
  • Resolve missing imports
  • Correct API usage
  • Add error handling
  • Optimize performance

6. Output Agent

Formats the final result:

  • Formats: JSON, YAML, or streaming events
  • Adds: Metadata, usage instructions
  • Handles: Both plan and act modes

Agent Communication

Agents communicate through structured messages:

# Example agent message
message = {
    "role": "workflow_generator",
    "content": {
        "generated_code": "...",
        "confidence": 0.95,
        "alternatives": []
    },
    "metadata": {
        "model": "deepseek-v3",
        "tokens_used": 1234,
        "generation_time": 2.5
    }
}

Customizing Agent Behavior

Model Selection

Different models for different agents:

config = ADKConfig(
    model_overrides={
        "orchestrator": "together_ai/deepseek-ai/DeepSeek-V3",
        "workflow_generator": "together_ai/Qwen/QwQ-32B-Preview",
        "compiler": "together_ai/google/gemma-2b-it",
        "refinement": "together_ai/deepseek-ai/DeepSeek-V3"
    }
)

Custom Tools

Add custom tools to agents:

from kubiya_workflow_sdk.providers.adk.tools import Tool

# Custom validation tool
@Tool(name="custom_validator")
def validate_security_policies(workflow_code: str) -> dict:
    """Validate workflow against security policies."""
    # Custom validation logic
    return {"valid": True, "warnings": []}

# Add to compiler agent
compiler_agent.add_tool(validate_security_policies)

Agent Hooks

Intercept agent communications:

def on_agent_message(agent_name: str, message: dict):
    """Log or modify agent messages."""
    print(f"[{agent_name}] {message.get('content', {}).get('status')}")
    
    # Modify message if needed
    if agent_name == "workflow_generator":
        message["metadata"]["custom_flag"] = True
    
    return message

adk.set_message_hook(on_agent_message)

Performance Optimization

Monitoring Agents

Logging

Enable detailed agent logging:

import logging

# Enable ADK agent logging
logging.getLogger("kubiya_workflow_sdk.providers.adk.agents").setLevel(logging.DEBUG)

# Or specific agents
logging.getLogger("kubiya_workflow_sdk.providers.adk.agents.generator").setLevel(logging.DEBUG)

Metrics

Track agent performance:

# Get agent metrics
metrics = adk.get_agent_metrics()

print(f"Total requests: {metrics['total_requests']}")
print(f"Average generation time: {metrics['avg_generation_time']}s")
print(f"Refinement rate: {metrics['refinement_rate']}%")
print(f"Success rate: {metrics['success_rate']}%")

Streaming Agent Events

Monitor agents in real-time:

async for event in adk.compose(task="...", stream=True, include_agent_events=True):
    if event["type"] == "agent_event":
        agent = event["agent"]
        status = event["status"]
        print(f"[{agent}] {status}")

Best Practices

Model Selection

Use powerful models for generation, fast models for validation

Context Optimization

Provide only necessary context to reduce token usage

Error Handling

Always set reasonable iteration limits and timeouts

Monitoring

Track agent metrics to optimize performance

Troubleshooting

Next Steps