Skip to main content
Kubiya’s runtime-agnostic architecture allows you to extend the platform with custom runtimes using any Python-based AI framework. Whether you want to integrate LangChain, CrewAI, AutoGen, or build a completely custom solution, this guide shows you how.
Developer-Focused DocumentationThis page contains technical implementation details for building custom runtimes. For user-facing runtime selection and configuration, see the Runtime Comparison page.

Why Build a Custom Runtime?

Custom runtimes enable you to:
  1. Integrate specialized frameworks: Use LangChain chains, CrewAI crews, AutoGen agents, or any Python AI framework
  2. Implement custom orchestration: Build complex multi-agent systems or specialized workflow patterns
  3. Optimize for specific use cases: Create runtimes tailored to your organization’s needs
  4. Leverage existing tooling: Integrate with internal tools, APIs, and infrastructure
  5. Control execution logic: Full control over model interactions, tool calling, and conversation management
Common use cases:
  • Multi-agent collaboration systems (CrewAI, AutoGen)
  • Specialized RAG implementations (LangChain with custom retrieval)
  • Domain-specific orchestration (healthcare, finance, legal)
  • Integration with proprietary AI systems
  • Custom prompt engineering and model routing

Runtime Architecture

All runtimes in Kubiya inherit from the BaseRuntime abstract class and register via the RuntimeRegistry: Key components:
ComponentPurpose
BaseRuntimeAbstract base class defining the runtime interface
RuntimeRegistryDecorator-based registration system
RuntimeExecutionContextInput context with agent config, prompt, skills, etc.
RuntimeExecutionResultStandardized output with response, usage, metadata
RuntimeCapabilitiesFeature flags (streaming, tools, MCP, etc.)
RuntimeTypeEnum for runtime identification

Building a Custom Runtime

1

Understand the Base Class

Review the BaseRuntime interface in /control_plane_api/worker/runtimes/base.pyKey abstract methods you must implement:
from control_plane_api.worker.runtimes.base import (
    BaseRuntime,
    RuntimeType,
    RuntimeCapabilities,
    RuntimeExecutionContext,
    RuntimeExecutionResult,
    RuntimeRegistry,
)

class MyCustomRuntime(BaseRuntime):
    """Your custom runtime implementation."""

    async def _execute_impl(
        self, context: RuntimeExecutionContext
    ) -> RuntimeExecutionResult:
        """
        Core execution logic (non-streaming).

        Args:
            context: Contains agent_id, prompt, skills, model_config, etc.

        Returns:
            RuntimeExecutionResult with response, usage, success flag
        """
        pass  # Implement your logic

    async def _stream_execute_impl(
        self,
        context: RuntimeExecutionContext,
        event_callback: Optional[Callable] = None,
    ) -> AsyncIterator[RuntimeExecutionResult]:
        """
        Core streaming execution logic.

        Yields:
            RuntimeExecutionResult chunks for real-time feedback
        """
        pass  # Implement your streaming logic

    def get_runtime_type(self) -> RuntimeType:
        """Return your runtime type identifier."""
        return RuntimeType.CUSTOM  # Or add to enum

    def get_capabilities(self) -> RuntimeCapabilities:
        """Declare what your runtime supports."""
        return RuntimeCapabilities(
            streaming=True,
            tools=True,
            mcp=True,
            hooks=True,
            cancellation=True,
            conversation_history=True,
            custom_tools=True,
        )
What the base class provides for free:
  • Lifecycle management (execute() and stream_execute() orchestration)
  • Automatic hook calling (before_execute, after_execute, on_error)
  • Control Plane integration (metadata caching)
  • Cancellation registration
  • Configuration validation
  • Error handling framework
2

Create Your Runtime Class

File structure for a custom runtime:
control_plane_api/worker/runtimes/
├── my_custom/
│   ├── __init__.py
│   ├── runtime.py          # Main runtime class
│   ├── config.py           # Configuration builder
│   ├── hooks.py            # Tool execution hooks (optional)
│   └── README.md           # Documentation
└── base.py                 # BaseRuntime class
Basic runtime skeleton:
# control_plane_api/worker/runtimes/my_custom/runtime.py

import structlog
from typing import AsyncIterator, Optional, Callable, Dict, Any
from control_plane_api.worker.runtimes.base import (
    BaseRuntime,
    RuntimeType,
    RuntimeCapabilities,
    RuntimeExecutionContext,
    RuntimeExecutionResult,
    RuntimeRegistry,
)

logger = structlog.get_logger(__name__)


@RuntimeRegistry.register(RuntimeType.CUSTOM)
class MyCustomRuntime(BaseRuntime):
    """
    Custom runtime using [Your Framework].

    Features:
    - [List your key features]
    - [Tool integration approach]
    - [Model support]
    """

    def __init__(self, control_plane_client: Any, cancellation_manager: Any, **kwargs):
        """Initialize the custom runtime."""
        super().__init__(control_plane_client, cancellation_manager, **kwargs)

        # Initialize your framework
        self.my_framework_client = self._init_framework()

        logger.info("custom_runtime_initialized", runtime_type="my_custom")

    def _init_framework(self):
        """Initialize your AI framework (LangChain, CrewAI, etc.)."""
        # Your initialization logic
        pass

    async def _execute_impl(
        self, context: RuntimeExecutionContext
    ) -> RuntimeExecutionResult:
        """Execute using your custom framework."""
        try:
            # 1. Extract configuration
            prompt = context.prompt
            system_prompt = context.system_prompt
            model_id = context.model_id
            skills = context.skills

            # 2. Build your framework's agent/chain
            agent = self._build_agent(context)

            # 3. Execute
            response = await agent.run(prompt)

            # 4. Extract usage metrics
            usage = self._extract_usage(response)

            # 5. Return standardized result
            return RuntimeExecutionResult(
                response=response.get("output", ""),
                usage=usage,
                success=True,
                finish_reason="stop",
                model=model_id,
            )

        except Exception as e:
            logger.error("execution_failed", error=str(e))
            return RuntimeExecutionResult(
                response="",
                usage={},
                success=False,
                error=str(e),
                finish_reason="error",
            )

    async def _stream_execute_impl(
        self,
        context: RuntimeExecutionContext,
        event_callback: Optional[Callable] = None,
    ) -> AsyncIterator[RuntimeExecutionResult]:
        """Stream execution with real-time feedback."""
        # Implement streaming if your framework supports it
        agent = self._build_agent(context)

        accumulated_response = ""

        async for chunk in agent.stream(context.prompt):
            accumulated_response += chunk

            # Yield partial result
            yield RuntimeExecutionResult(
                response=accumulated_response,
                usage={},  # Update with partial usage if available
                success=True,
                finish_reason=None,  # None until completion
            )

        # Final chunk with usage
        yield RuntimeExecutionResult(
            response=accumulated_response,
            usage=self._extract_usage(agent),
            success=True,
            finish_reason="stop",
        )

    def get_runtime_type(self) -> RuntimeType:
        """Return runtime type."""
        return RuntimeType.CUSTOM  # Or add new type to enum

    def get_capabilities(self) -> RuntimeCapabilities:
        """Declare runtime capabilities."""
        return RuntimeCapabilities(
            streaming=True,  # Set based on your implementation
            tools=True,
            mcp=False,  # Set based on MCP support
            hooks=True,
            cancellation=True,
            conversation_history=True,
            custom_tools=False,
        )

    def _build_agent(self, context: RuntimeExecutionContext):
        """Build your framework's agent/chain."""
        # Implement based on your framework
        pass

    def _extract_usage(self, response) -> Dict[str, Any]:
        """Extract token usage from framework response."""
        # Standardized format:
        return {
            "input_tokens": response.get("prompt_tokens", 0),
            "output_tokens": response.get("completion_tokens", 0),
            "total_tokens": response.get("total_tokens", 0),
        }
3

Implement Tool Integration

Tools are the capabilities your agent can use (Skills, MCP servers, custom functions).Option 1: Convert Kubiya Skills to your framework’s tool format
def _build_tools_from_skills(self, skills: List[Any]) -> List[Any]:
    """
    Convert Kubiya Skills to your framework's tool format.

    Args:
        skills: List of resolved Kubiya Skills

    Returns:
        List of tools in your framework's format
    """
    framework_tools = []

    for skill in skills:
        # Get Agno Toolkit from skill
        if hasattr(skill, "get_tools"):
            toolkit = skill.get_tools()

            # Convert each tool
            for tool in toolkit.tools:
                framework_tool = self._convert_tool(tool)
                framework_tools.append(framework_tool)

    return framework_tools

def _convert_tool(self, agno_tool):
    """Convert Agno tool to your framework's format."""
    # Example for LangChain:
    from langchain.tools import StructuredTool

    return StructuredTool.from_function(
        func=agno_tool.entrypoint,
        name=agno_tool.name,
        description=agno_tool.description,
        args_schema=agno_tool.parameters,  # Pydantic model
    )
Option 2: Native MCP Server integration
async def _build_mcp_tools(self, mcp_servers: Dict) -> List[Any]:
    """
    Connect to MCP servers and convert tools.

    Args:
        mcp_servers: MCP server configurations from context

    Returns:
        List of tools from MCP servers
    """
    from mcp import ClientSession, StdioServerParameters
    from mcp.client.stdio import stdio_client

    tools = []

    for server_name, server_config in mcp_servers.items():
        async with stdio_client(
            StdioServerParameters(
                command=server_config["command"],
                args=server_config.get("args", []),
                env=server_config.get("env"),
            )
        ) as (read, write):
            async with ClientSession(read, write) as session:
                await session.initialize()

                # List tools from MCP server
                mcp_tools = await session.list_tools()

                # Convert to your framework's format
                for mcp_tool in mcp_tools.tools:
                    framework_tool = self._convert_mcp_tool(mcp_tool)
                    tools.append(framework_tool)

    return tools
Option 3: Hybrid approach
def _build_all_tools(self, context: RuntimeExecutionContext):
    """Combine Skills and MCP tools."""
    all_tools = []

    # Add Skills
    if context.skills:
        skill_tools = self._build_tools_from_skills(context.skills)
        all_tools.extend(skill_tools)

    # Add MCP servers
    if context.mcp_servers:
        mcp_tools = await self._build_mcp_tools(context.mcp_servers)
        all_tools.extend(mcp_tools)

    return all_tools
4

Add Lifecycle Hooks

Lifecycle hooks enable monitoring, logging, and integration points.
async def before_execute(self, context: RuntimeExecutionContext) -> None:
    """
    Hook called before execution starts.

    Use for:
    - Validation
    - Resource setup
    - Logging
    - Metrics initialization
    """
    self.logger.info(
        "execution_starting",
        execution_id=context.execution_id[:8],
        agent_id=context.agent_id,
        model=context.model_id,
    )

    # Validate custom requirements
    if not context.model_id:
        raise ValueError("model_id is required for custom runtime")

    # Setup resources (e.g., database connections)
    self._setup_resources(context)

async def after_execute(
    self, context: RuntimeExecutionContext, result: RuntimeExecutionResult
) -> None:
    """
    Hook called after successful execution.

    Use for:
    - Cleanup
    - Metrics logging
    - Analytics submission
    - Webhooks
    """
    self.logger.info(
        "execution_completed",
        execution_id=context.execution_id[:8],
        success=result.success,
        input_tokens=result.usage.get("input_tokens", 0),
        output_tokens=result.usage.get("output_tokens", 0),
    )

    # Cleanup resources
    self._cleanup_resources()

    # Submit custom metrics
    await self._submit_metrics(context, result)

async def on_error(
    self, context: RuntimeExecutionContext, error: Exception
) -> RuntimeExecutionResult:
    """
    Hook called when execution fails.

    Use for:
    - Error logging
    - Alerting
    - Fallback logic
    - Custom error handling
    """
    self.logger.error(
        "execution_error",
        execution_id=context.execution_id[:8],
        error_type=type(error).__name__,
        error_message=str(error),
    )

    # Optional: Implement fallback logic
    if isinstance(error, RateLimitError):
        return await self._handle_rate_limit(context)

    # Return error result
    return RuntimeExecutionResult(
        response=f"Execution failed: {str(error)}",
        usage={},
        success=False,
        error=str(error),
        finish_reason="error",
    )
5

Register Your Runtime

Use the @RuntimeRegistry.register() decorator to make your runtime discoverable:
from control_plane_api.worker.runtimes.base import RuntimeType, RuntimeRegistry

# Option 1: Use existing RuntimeType
@RuntimeRegistry.register(RuntimeType.CUSTOM)
class MyCustomRuntime(BaseRuntime):
    pass

# Option 2: Add new RuntimeType to enum (recommended)
# In base.py, add:
# class RuntimeType(str, Enum):
#     DEFAULT = "default"
#     CLAUDE_CODE = "claude_code"
#     LANGCHAIN = "langchain"      # Add this
#     CREWAI = "crewai"            # Add this

@RuntimeRegistry.register(RuntimeType.LANGCHAIN)
class LangChainRuntime(BaseRuntime):
    pass
Runtime discovery:
# List all registered runtimes
available_runtimes = RuntimeRegistry.list_available()
# Output: [RuntimeType.DEFAULT, RuntimeType.CLAUDE_CODE, RuntimeType.LANGCHAIN]

# Get runtime info
runtime_info = RuntimeRegistry.get_runtime_info_all()
# Output: {
#   "default": {...},
#   "claude_code": {...},
#   "langchain": {...}
# }
6

Test Your Runtime

Unit tests for your runtime implementation:
# tests/worker/runtimes/test_my_custom_runtime.py

import pytest
from unittest.mock import AsyncMock, MagicMock
from control_plane_api.worker.runtimes.my_custom.runtime import MyCustomRuntime
from control_plane_api.worker.runtimes.base import RuntimeExecutionContext

@pytest.fixture
def runtime():
    """Create runtime instance with mocked dependencies."""
    control_plane = MagicMock()
    cancellation_manager = MagicMock()
    return MyCustomRuntime(control_plane, cancellation_manager)

@pytest.fixture
def context():
    """Create execution context for testing."""
    return RuntimeExecutionContext(
        execution_id="test-execution-123",
        agent_id="test-agent-456",
        organization_id="test-org-789",
        prompt="What is the capital of France?",
        system_prompt="You are a helpful assistant.",
        model_id="gpt-4",
    )

@pytest.mark.asyncio
async def test_execute_success(runtime, context):
    """Test successful execution."""
    result = await runtime.execute(context)

    assert result.success is True
    assert result.response  # Has response
    assert result.usage["total_tokens"] > 0
    assert result.finish_reason == "stop"

@pytest.mark.asyncio
async def test_execute_with_tools(runtime, context):
    """Test execution with tool calling."""
    # Add mock skills to context
    context.skills = [MagicMock()]

    result = await runtime.execute(context)

    assert result.success is True
    assert result.tool_execution_messages  # Tools were called

@pytest.mark.asyncio
async def test_stream_execute(runtime, context):
    """Test streaming execution."""
    chunks = []

    async for chunk in runtime.stream_execute(context):
        chunks.append(chunk)

    assert len(chunks) > 0
    assert chunks[-1].finish_reason == "stop"
    assert chunks[-1].usage["total_tokens"] > 0

@pytest.mark.asyncio
async def test_execute_error_handling(runtime, context):
    """Test error handling."""
    # Force an error
    context.model_id = None  # Invalid config

    result = await runtime.execute(context)

    assert result.success is False
    assert result.error is not None
    assert result.finish_reason == "error"

def test_capabilities(runtime):
    """Test capability declaration."""
    caps = runtime.get_capabilities()

    assert caps.streaming is True
    assert caps.tools is True
    assert caps.conversation_history is True

def test_runtime_type(runtime):
    """Test runtime type identification."""
    runtime_type = runtime.get_runtime_type()

    assert runtime_type == RuntimeType.CUSTOM
Integration tests with real executions:
@pytest.mark.integration
@pytest.mark.asyncio
async def test_full_execution_flow(control_plane_client, cancellation_manager):
    """Test complete execution flow with real dependencies."""
    runtime = MyCustomRuntime(control_plane_client, cancellation_manager)

    context = RuntimeExecutionContext(
        execution_id="integration-test-123",
        agent_id="agent-456",
        organization_id="org-789",
        prompt="Calculate 15 * 23",
        model_id="gpt-4",
    )

    result = await runtime.execute(context)

    assert result.success is True
    assert "345" in result.response  # Correct answer

Real-World Examples

Example 1: LangChain Runtime

Key implementation patterns for LangChain integration:
@RuntimeRegistry.register(RuntimeType.LANGCHAIN)
class LangChainRuntime(BaseRuntime):
    """LangChain runtime for chains, agents, and tools."""

    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult:
        """Execute using LangChain agent."""
        # 1. Create LangChain LLM
        llm = ChatOpenAI(model=context.model_id)

        # 2. Convert Kubiya Skills to LangChain tools
        tools = [
            StructuredTool.from_function(
                func=tool.entrypoint,
                name=tool.name,
                description=tool.description,
            )
            for skill in context.skills
            for tool in skill.get_tools().tools
        ]

        # 3. Build agent with prompt template
        agent = create_openai_functions_agent(llm, tools, prompt_template)
        agent_executor = AgentExecutor(agent=agent, tools=tools)

        # 4. Execute and return result
        response = await agent_executor.ainvoke({"input": context.prompt})

        return RuntimeExecutionResult(
            response=response["output"],
            usage=self._extract_usage(response),
            success=True,
        )

    def get_capabilities(self) -> RuntimeCapabilities:
        return RuntimeCapabilities(
            streaming=True,  # Via LangChain callbacks
            tools=True,
            conversation_history=True,
        )
Key concepts:
  • Convert Kubiya Skills to LangChain StructuredTool format
  • Use LangChain’s AgentExecutor for tool orchestration
  • Leverage LangChain callbacks for streaming
  • Map conversation history to LangChain message format

Example 2: CrewAI Runtime

Key implementation patterns for multi-agent CrewAI:
@RuntimeRegistry.register(RuntimeType.CREWAI)
class CrewAIRuntime(BaseRuntime):
    """CrewAI runtime for multi-agent collaboration."""

    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult:
        """Execute using CrewAI crew of specialized agents."""
        # 1. Define specialized agents
        researcher = Agent(
            role="Research Analyst",
            goal="Research and analyze information",
            llm=ChatOpenAI(model=context.model_id),
        )

        writer = Agent(
            role="Content Writer",
            goal="Write clear content",
            llm=ChatOpenAI(model=context.model_id),
        )

        # 2. Create task
        task = Task(
            description=context.prompt,
            agent=researcher,
        )

        # 3. Create crew and execute
        crew = Crew(
            agents=[researcher, writer],
            tasks=[task],
            process=Process.sequential,
        )

        result = crew.kickoff()

        return RuntimeExecutionResult(
            response=str(result),
            usage={"total_tokens": 0},  # CrewAI doesn't expose usage
            success=True,
        )

    def get_capabilities(self) -> RuntimeCapabilities:
        return RuntimeCapabilities(
            streaming=False,  # CrewAI doesn't stream natively
            tools=True,
            conversation_history=True,
        )
Key concepts:
  • Define multiple specialized agents with roles
  • Use CrewAI’s Process for orchestration (sequential, hierarchical)
  • Agents collaborate on tasks automatically
  • Note: CrewAI doesn’t provide token usage metrics

Configuration Schema

Define runtime-specific configuration via runtime_config in agent configuration:
# In your runtime implementation:

def _validate_config(self, context: RuntimeExecutionContext) -> None:
    """Validate custom runtime configuration."""
    super()._validate_config(context)  # Call base validation

    runtime_config = context.runtime_config or {}

    # Validate required fields
    if "required_field" not in runtime_config:
        raise ValueError("required_field is missing from runtime_config")

    # Validate field types
    if not isinstance(runtime_config.get("max_iterations"), int):
        raise ValueError("max_iterations must be an integer")

# Configuration example via CLI:
# kubiya agent create my-agent \
#   --runtime langchain \
#   --runtime-config '{"max_iterations": 15, "agent_type": "openai-functions"}'

# Configuration example via API:
# {
#   "name": "my-agent",
#   "runtime": "langchain",
#   "runtime_config": {
#     "max_iterations": 15,
#     "agent_type": "openai-functions",
#     "memory_type": "conversation_buffer",
#     "verbose": true
#   }
# }

Best Practices

Always populate usage in RuntimeExecutionResult for analytics:
return RuntimeExecutionResult(
    response=response_text,
    usage={
        "input_tokens": prompt_tokens,
        "output_tokens": completion_tokens,
        "total_tokens": total_tokens,
        # Optional but recommended:
        "cache_read_tokens": cached_tokens,
        "model_provider": "openai",  # or "anthropic", "google", etc.
    },
    success=True,
)
If your framework doesn’t expose usage, estimate or use callbacks.
Use lifecycle hooks for graceful degradation:
async def on_error(self, context, error):
    """Handle errors gracefully."""
    self.logger.error("execution_error", error=str(error))

    # Specific error handling
    if isinstance(error, ModelNotFoundError):
        return RuntimeExecutionResult(
            response="Model not available. Please check configuration.",
            usage={},
            success=False,
            error="Model not found",
        )

    # Generic fallback
    return await super().on_error(context, error)
Convert Kubiya conversation history to your framework’s format:
def _build_messages(self, conversation_history: List[Dict]):
    """Convert to framework-specific format."""
    messages = []

    for msg in conversation_history:
        role = msg["role"]
        content = msg["content"]

        # Convert to your format
        if role == "user":
            messages.append(YourFrameworkUserMessage(content))
        elif role == "assistant":
            messages.append(YourFrameworkAssistantMessage(content))

    return messages
Use the Control Plane client for metadata, caching, and state:
# In your runtime:
async def before_execute(self, context):
    # Cache execution metadata
    self.control_plane.cache_metadata(
        context.execution_id,
        "AGENT",
    )

    # Get cached data if needed
    cached_data = await self.control_plane.get_cached_data(context.agent_id)
Provide comprehensive documentation for users:
# LangChain Runtime

## Overview
[Description of what your runtime does]

## Supported Features
- Streaming: Yes
- Tools: Yes
- MCP: No
- Conversation history: Yes

## Configuration
[Explain runtime_config options]

## Usage Examples
[CLI and API examples]

## Limitations
[Known limitations or constraints]

## Troubleshooting
[Common issues and solutions]
Write comprehensive tests:
# Unit tests
- test_execute_success
- test_execute_with_tools
- test_execute_error_handling
- test_stream_execute
- test_conversation_history
- test_capabilities
- test_configuration_validation

# Integration tests
- test_full_execution_flow
- test_tool_calling_integration
- test_control_plane_integration

# Performance tests
- test_execution_latency
- test_streaming_throughput
- test_concurrent_executions

API Reference

BaseRuntime Class

Abstract base class for all runtimes
class BaseRuntime(ABC):
    """
    Abstract base class for agent runtimes.

    Attributes:
        control_plane: Control Plane client
        cancellation_manager: Cancellation manager
        logger: Structured logger
        config: Additional configuration
    """

    # Abstract methods (must implement)
    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult
    async def _stream_execute_impl(self, context: RuntimeExecutionContext, event_callback) -> AsyncIterator[RuntimeExecutionResult]
    def get_runtime_type(self) -> RuntimeType
    def get_capabilities(self) -> RuntimeCapabilities

    # Public interface (don't override)
    async def execute(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult
    async def stream_execute(self, context: RuntimeExecutionContext, event_callback) -> AsyncIterator[RuntimeExecutionResult]
    async def cancel(self, execution_id: str) -> bool
    async def get_usage(self, execution_id: str) -> Dict[str, Any]

    # Lifecycle hooks (override as needed)
    async def before_execute(self, context: RuntimeExecutionContext) -> None
    async def after_execute(self, context: RuntimeExecutionContext, result: RuntimeExecutionResult) -> None
    async def on_error(self, context: RuntimeExecutionContext, error: Exception) -> RuntimeExecutionResult

    # Helper methods (override as needed)
    async def _cancel_impl(self, execution_id: str) -> bool
    async def _get_usage_impl(self, execution_id: str) -> Dict[str, Any]
    def _validate_config(self, context: RuntimeExecutionContext) -> None

RuntimeExecutionContext

Input context passed to runtime:
@dataclass
class RuntimeExecutionContext:
    execution_id: str              # Unique execution ID
    agent_id: str                  # Agent identifier
    organization_id: str           # Organization context
    prompt: str                    # User's input message
    system_prompt: Optional[str]   # System instructions
    conversation_history: List[Dict]  # Previous messages
    model_id: Optional[str]        # Model identifier
    model_config: Optional[Dict]   # Model configuration
    agent_config: Optional[Dict]   # Agent configuration
    skills: List[Any]              # Resolved skills
    mcp_servers: Optional[Dict]    # MCP server configs
    user_metadata: Optional[Dict]  # User metadata
    runtime_config: Optional[Dict] # Runtime-specific config
    # Enforcement context fields
    user_email: Optional[str]
    user_id: Optional[str]
    user_roles: List[str]
    team_id: Optional[str]
    team_name: Optional[str]
    environment: str = "production"

RuntimeExecutionResult

Output structure from runtime:
@dataclass
class RuntimeExecutionResult:
    response: str                  # Agent's response
    usage: Dict[str, Any]          # Token usage metrics
    success: bool                  # Execution success flag
    finish_reason: Optional[str]   # "stop", "length", "tool_use", "error"
    run_id: Optional[str]          # Run identifier
    model: Optional[str]           # Model used
    tool_execution_messages: Optional[List[Dict]]  # Tool execution details
    tool_messages: Optional[List[Dict]]            # Detailed tool messages
    error: Optional[str]           # Error message if failed
    metadata: Dict[str, Any]       # Additional metadata

RuntimeCapabilities

Runtime capability flags:
@dataclass
class RuntimeCapabilities:
    streaming: bool = False              # Supports streaming
    tools: bool = False                  # Supports tool calling
    mcp: bool = False                    # Supports MCP servers
    hooks: bool = False                  # Supports lifecycle hooks
    cancellation: bool = False           # Supports cancellation
    conversation_history: bool = False   # Supports history
    custom_tools: bool = False           # Supports custom tools

RuntimeRegistry

Runtime registration and discovery:
class RuntimeRegistry:
    """Registry for runtime discovery."""

    @classmethod
    def register(cls, runtime_type: RuntimeType) -> Callable
        """Decorator to register a runtime."""

    @classmethod
    def get(cls, runtime_type: RuntimeType) -> Type[BaseRuntime]
        """Get runtime class by type."""

    @classmethod
    def create(cls, runtime_type: RuntimeType, control_plane_client, cancellation_manager, **kwargs) -> BaseRuntime
        """Create runtime instance."""

    @classmethod
    def list_available(cls) -> List[RuntimeType]
        """List all registered runtimes."""

    @classmethod
    def get_runtime_info_all(cls) -> Dict[str, Dict[str, Any]]
        """Get info about all runtimes."""

Deployment

Steps to deploy your custom runtime:
  1. Implement your runtime class following this guide
  2. Add to RuntimeType enum in base.py (if adding new type)
  3. Register with decorator: @RuntimeRegistry.register(RuntimeType.YOUR_RUNTIME)
  4. Write tests (unit + integration)
  5. Document your runtime (README.md in runtime directory)
  6. Update agent configuration to use your runtime:
# Via CLI
kubiya agent create my-agent \
  --runtime your_custom_runtime \
  --model gpt-4

# Via API
curl -X POST https://control-plane.kubiya.ai/api/v1/agents \
  -H "Authorization: Bearer $KUBIYA_API_KEY" \
  -d '{
    "name": "my-agent",
    "runtime": "your_custom_runtime",
    "model_id": "gpt-4"
  }'
  1. Monitor execution via Kubiya dashboard and logs

Next Steps