> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kubiya.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Custom Runtimes

> Complete developer guide for building custom runtimes with frameworks like LangChain, CrewAI, or AutoGen. Extend Kubiya's runtime-agnostic architecture with your own execution engine.

Kubiya's **runtime-agnostic architecture** allows you to extend the platform with custom runtimes using any Python-based AI framework. Whether you want to integrate LangChain, CrewAI, AutoGen, or build a completely custom solution, this guide shows you how.

<Note>
  **Developer-Focused Documentation**

  This page contains technical implementation details for building custom runtimes. For user-facing runtime selection and configuration, see the [Runtime Comparison](/core-concepts/runtimes/comparison) page.
</Note>

***

## Why Build a Custom Runtime?

Custom runtimes enable you to:

1. **Integrate specialized frameworks**: Use LangChain chains, CrewAI crews, AutoGen agents, or any Python AI framework
2. **Implement custom orchestration**: Build complex multi-agent systems or specialized workflow patterns
3. **Optimize for specific use cases**: Create runtimes tailored to your organization's needs
4. **Leverage existing tooling**: Integrate with internal tools, APIs, and infrastructure
5. **Control execution logic**: Full control over model interactions, tool calling, and conversation management

**Common use cases:**

* Multi-agent collaboration systems (CrewAI, AutoGen)
* Specialized RAG implementations (LangChain with custom retrieval)
* Domain-specific orchestration (healthcare, finance, legal)
* Integration with proprietary AI systems
* Custom prompt engineering and model routing

***

## Runtime Architecture

All runtimes in Kubiya inherit from the `BaseRuntime` abstract class and register via the `RuntimeRegistry`:

```mermaid theme={null}
classDiagram
    class BaseRuntime {
        <<abstract>>
        +execute()
        +stream_execute()
        +get_capabilities()
        +before_execute()
        +after_execute()
        +on_error()
    }

    class AgnoRuntime {
        +Multi-model support
        +LiteLLM integration
        +Python tools
    }

    class ClaudeCodeRuntime {
        +Code-specialized
        +Extended history
        +Session resumption
    }

    class CustomRuntime {
        +Your implementation
        +Custom framework
        +Specialized logic
    }

    class LangChainRuntime {
        +Chains & agents
        +LangChain tools
        +Memory systems
    }

    class CrewAIRuntime {
        +Multi-agent crews
        +Role-based tasks
        +Collaboration
    }

    BaseRuntime <|-- AgnoRuntime
    BaseRuntime <|-- ClaudeCodeRuntime
    BaseRuntime <|-- CustomRuntime
    CustomRuntime <|-- LangChainRuntime
    CustomRuntime <|-- CrewAIRuntime
```

**Key components:**

| Component                   | Purpose                                               |
| --------------------------- | ----------------------------------------------------- |
| **BaseRuntime**             | Abstract base class defining the runtime interface    |
| **RuntimeRegistry**         | Decorator-based registration system                   |
| **RuntimeExecutionContext** | Input context with agent config, prompt, skills, etc. |
| **RuntimeExecutionResult**  | Standardized output with response, usage, metadata    |
| **RuntimeCapabilities**     | Feature flags (streaming, tools, MCP, etc.)           |
| **RuntimeType**             | Enum for runtime identification                       |

***

## Building a Custom Runtime

<Steps>
  <Step title="Understand the Base Class">
    **Review the `BaseRuntime` interface** in `/control_plane_api/worker/runtimes/base.py`

    Key abstract methods you must implement:

    ```python theme={null}
    from control_plane_api.worker.runtimes.base import (
        BaseRuntime,
        RuntimeType,
        RuntimeCapabilities,
        RuntimeExecutionContext,
        RuntimeExecutionResult,
        RuntimeRegistry,
    )

    class MyCustomRuntime(BaseRuntime):
        """Your custom runtime implementation."""

        async def _execute_impl(
            self, context: RuntimeExecutionContext
        ) -> RuntimeExecutionResult:
            """
            Core execution logic (non-streaming).

            Args:
                context: Contains agent_id, prompt, skills, model_config, etc.

            Returns:
                RuntimeExecutionResult with response, usage, success flag
            """
            pass  # Implement your logic

        async def _stream_execute_impl(
            self,
            context: RuntimeExecutionContext,
            event_callback: Optional[Callable] = None,
        ) -> AsyncIterator[RuntimeExecutionResult]:
            """
            Core streaming execution logic.

            Yields:
                RuntimeExecutionResult chunks for real-time feedback
            """
            pass  # Implement your streaming logic

        def get_runtime_type(self) -> RuntimeType:
            """Return your runtime type identifier."""
            return RuntimeType.CUSTOM  # Or add to enum

        def get_capabilities(self) -> RuntimeCapabilities:
            """Declare what your runtime supports."""
            return RuntimeCapabilities(
                streaming=True,
                tools=True,
                mcp=True,
                hooks=True,
                cancellation=True,
                conversation_history=True,
                custom_tools=True,
            )
    ```

    **What the base class provides for free:**

    * Lifecycle management (`execute()` and `stream_execute()` orchestration)
    * Automatic hook calling (`before_execute`, `after_execute`, `on_error`)
    * Control Plane integration (metadata caching)
    * Cancellation registration
    * Configuration validation
    * Error handling framework
  </Step>

  <Step title="Create Your Runtime Class">
    **File structure** for a custom runtime:

    ```
    control_plane_api/worker/runtimes/
    ├── my_custom/
    │   ├── __init__.py
    │   ├── runtime.py          # Main runtime class
    │   ├── config.py           # Configuration builder
    │   ├── hooks.py            # Tool execution hooks (optional)
    │   └── README.md           # Documentation
    └── base.py                 # BaseRuntime class
    ```

    **Basic runtime skeleton:**

    ```python theme={null}
    # control_plane_api/worker/runtimes/my_custom/runtime.py

    import structlog
    from typing import AsyncIterator, Optional, Callable, Dict, Any
    from control_plane_api.worker.runtimes.base import (
        BaseRuntime,
        RuntimeType,
        RuntimeCapabilities,
        RuntimeExecutionContext,
        RuntimeExecutionResult,
        RuntimeRegistry,
    )

    logger = structlog.get_logger(__name__)


    @RuntimeRegistry.register(RuntimeType.CUSTOM)
    class MyCustomRuntime(BaseRuntime):
        """
        Custom runtime using [Your Framework].

        Features:
        - [List your key features]
        - [Tool integration approach]
        - [Model support]
        """

        def __init__(self, control_plane_client: Any, cancellation_manager: Any, **kwargs):
            """Initialize the custom runtime."""
            super().__init__(control_plane_client, cancellation_manager, **kwargs)

            # Initialize your framework
            self.my_framework_client = self._init_framework()

            logger.info("custom_runtime_initialized", runtime_type="my_custom")

        def _init_framework(self):
            """Initialize your AI framework (LangChain, CrewAI, etc.)."""
            # Your initialization logic
            pass

        async def _execute_impl(
            self, context: RuntimeExecutionContext
        ) -> RuntimeExecutionResult:
            """Execute using your custom framework."""
            try:
                # 1. Extract configuration
                prompt = context.prompt
                system_prompt = context.system_prompt
                model_id = context.model_id
                skills = context.skills

                # 2. Build your framework's agent/chain
                agent = self._build_agent(context)

                # 3. Execute
                response = await agent.run(prompt)

                # 4. Extract usage metrics
                usage = self._extract_usage(response)

                # 5. Return standardized result
                return RuntimeExecutionResult(
                    response=response.get("output", ""),
                    usage=usage,
                    success=True,
                    finish_reason="stop",
                    model=model_id,
                )

            except Exception as e:
                logger.error("execution_failed", error=str(e))
                return RuntimeExecutionResult(
                    response="",
                    usage={},
                    success=False,
                    error=str(e),
                    finish_reason="error",
                )

        async def _stream_execute_impl(
            self,
            context: RuntimeExecutionContext,
            event_callback: Optional[Callable] = None,
        ) -> AsyncIterator[RuntimeExecutionResult]:
            """Stream execution with real-time feedback."""
            # Implement streaming if your framework supports it
            agent = self._build_agent(context)

            accumulated_response = ""

            async for chunk in agent.stream(context.prompt):
                accumulated_response += chunk

                # Yield partial result
                yield RuntimeExecutionResult(
                    response=accumulated_response,
                    usage={},  # Update with partial usage if available
                    success=True,
                    finish_reason=None,  # None until completion
                )

            # Final chunk with usage
            yield RuntimeExecutionResult(
                response=accumulated_response,
                usage=self._extract_usage(agent),
                success=True,
                finish_reason="stop",
            )

        def get_runtime_type(self) -> RuntimeType:
            """Return runtime type."""
            return RuntimeType.CUSTOM  # Or add new type to enum

        def get_capabilities(self) -> RuntimeCapabilities:
            """Declare runtime capabilities."""
            return RuntimeCapabilities(
                streaming=True,  # Set based on your implementation
                tools=True,
                mcp=False,  # Set based on MCP support
                hooks=True,
                cancellation=True,
                conversation_history=True,
                custom_tools=False,
            )

        def _build_agent(self, context: RuntimeExecutionContext):
            """Build your framework's agent/chain."""
            # Implement based on your framework
            pass

        def _extract_usage(self, response) -> Dict[str, Any]:
            """Extract token usage from framework response."""
            # Standardized format:
            return {
                "input_tokens": response.get("prompt_tokens", 0),
                "output_tokens": response.get("completion_tokens", 0),
                "total_tokens": response.get("total_tokens", 0),
            }
    ```
  </Step>

  <Step title="Implement Tool Integration">
    **Tools are the capabilities your agent can use** (Skills, MCP servers, custom functions).

    **Option 1: Convert Kubiya Skills to your framework's tool format**

    ```python theme={null}
    def _build_tools_from_skills(self, skills: List[Any]) -> List[Any]:
        """
        Convert Kubiya Skills to your framework's tool format.

        Args:
            skills: List of resolved Kubiya Skills

        Returns:
            List of tools in your framework's format
        """
        framework_tools = []

        for skill in skills:
            # Get Agno Toolkit from skill
            if hasattr(skill, "get_tools"):
                toolkit = skill.get_tools()

                # Convert each tool
                for tool in toolkit.tools:
                    framework_tool = self._convert_tool(tool)
                    framework_tools.append(framework_tool)

        return framework_tools

    def _convert_tool(self, agno_tool):
        """Convert Agno tool to your framework's format."""
        # Example for LangChain:
        from langchain.tools import StructuredTool

        return StructuredTool.from_function(
            func=agno_tool.entrypoint,
            name=agno_tool.name,
            description=agno_tool.description,
            args_schema=agno_tool.parameters,  # Pydantic model
        )
    ```

    **Option 2: Native MCP Server integration**

    ```python theme={null}
    async def _build_mcp_tools(self, mcp_servers: Dict) -> List[Any]:
        """
        Connect to MCP servers and convert tools.

        Args:
            mcp_servers: MCP server configurations from context

        Returns:
            List of tools from MCP servers
        """
        from mcp import ClientSession, StdioServerParameters
        from mcp.client.stdio import stdio_client

        tools = []

        for server_name, server_config in mcp_servers.items():
            async with stdio_client(
                StdioServerParameters(
                    command=server_config["command"],
                    args=server_config.get("args", []),
                    env=server_config.get("env"),
                )
            ) as (read, write):
                async with ClientSession(read, write) as session:
                    await session.initialize()

                    # List tools from MCP server
                    mcp_tools = await session.list_tools()

                    # Convert to your framework's format
                    for mcp_tool in mcp_tools.tools:
                        framework_tool = self._convert_mcp_tool(mcp_tool)
                        tools.append(framework_tool)

        return tools
    ```

    **Option 3: Hybrid approach**

    ```python theme={null}
    def _build_all_tools(self, context: RuntimeExecutionContext):
        """Combine Skills and MCP tools."""
        all_tools = []

        # Add Skills
        if context.skills:
            skill_tools = self._build_tools_from_skills(context.skills)
            all_tools.extend(skill_tools)

        # Add MCP servers
        if context.mcp_servers:
            mcp_tools = await self._build_mcp_tools(context.mcp_servers)
            all_tools.extend(mcp_tools)

        return all_tools
    ```
  </Step>

  <Step title="Add Lifecycle Hooks">
    **Lifecycle hooks** enable monitoring, logging, and integration points.

    ```python theme={null}
    async def before_execute(self, context: RuntimeExecutionContext) -> None:
        """
        Hook called before execution starts.

        Use for:
        - Validation
        - Resource setup
        - Logging
        - Metrics initialization
        """
        self.logger.info(
            "execution_starting",
            execution_id=context.execution_id[:8],
            agent_id=context.agent_id,
            model=context.model_id,
        )

        # Validate custom requirements
        if not context.model_id:
            raise ValueError("model_id is required for custom runtime")

        # Setup resources (e.g., database connections)
        self._setup_resources(context)

    async def after_execute(
        self, context: RuntimeExecutionContext, result: RuntimeExecutionResult
    ) -> None:
        """
        Hook called after successful execution.

        Use for:
        - Cleanup
        - Metrics logging
        - Analytics submission
        - Webhooks
        """
        self.logger.info(
            "execution_completed",
            execution_id=context.execution_id[:8],
            success=result.success,
            input_tokens=result.usage.get("input_tokens", 0),
            output_tokens=result.usage.get("output_tokens", 0),
        )

        # Cleanup resources
        self._cleanup_resources()

        # Submit custom metrics
        await self._submit_metrics(context, result)

    async def on_error(
        self, context: RuntimeExecutionContext, error: Exception
    ) -> RuntimeExecutionResult:
        """
        Hook called when execution fails.

        Use for:
        - Error logging
        - Alerting
        - Fallback logic
        - Custom error handling
        """
        self.logger.error(
            "execution_error",
            execution_id=context.execution_id[:8],
            error_type=type(error).__name__,
            error_message=str(error),
        )

        # Optional: Implement fallback logic
        if isinstance(error, RateLimitError):
            return await self._handle_rate_limit(context)

        # Return error result
        return RuntimeExecutionResult(
            response=f"Execution failed: {str(error)}",
            usage={},
            success=False,
            error=str(error),
            finish_reason="error",
        )
    ```
  </Step>

  <Step title="Register Your Runtime">
    **Use the `@RuntimeRegistry.register()` decorator** to make your runtime discoverable:

    ```python theme={null}
    from control_plane_api.worker.runtimes.base import RuntimeType, RuntimeRegistry

    # Option 1: Use existing RuntimeType
    @RuntimeRegistry.register(RuntimeType.CUSTOM)
    class MyCustomRuntime(BaseRuntime):
        pass

    # Option 2: Add new RuntimeType to enum (recommended)
    # In base.py, add:
    # class RuntimeType(str, Enum):
    #     DEFAULT = "default"
    #     CLAUDE_CODE = "claude_code"
    #     LANGCHAIN = "langchain"      # Add this
    #     CREWAI = "crewai"            # Add this

    @RuntimeRegistry.register(RuntimeType.LANGCHAIN)
    class LangChainRuntime(BaseRuntime):
        pass
    ```

    **Runtime discovery:**

    ```python theme={null}
    # List all registered runtimes
    available_runtimes = RuntimeRegistry.list_available()
    # Output: [RuntimeType.DEFAULT, RuntimeType.CLAUDE_CODE, RuntimeType.LANGCHAIN]

    # Get runtime info
    runtime_info = RuntimeRegistry.get_runtime_info_all()
    # Output: {
    #   "default": {...},
    #   "claude_code": {...},
    #   "langchain": {...}
    # }
    ```
  </Step>

  <Step title="Test Your Runtime">
    **Unit tests** for your runtime implementation:

    ```python theme={null}
    # tests/worker/runtimes/test_my_custom_runtime.py

    import pytest
    from unittest.mock import AsyncMock, MagicMock
    from control_plane_api.worker.runtimes.my_custom.runtime import MyCustomRuntime
    from control_plane_api.worker.runtimes.base import RuntimeExecutionContext

    @pytest.fixture
    def runtime():
        """Create runtime instance with mocked dependencies."""
        control_plane = MagicMock()
        cancellation_manager = MagicMock()
        return MyCustomRuntime(control_plane, cancellation_manager)

    @pytest.fixture
    def context():
        """Create execution context for testing."""
        return RuntimeExecutionContext(
            execution_id="test-execution-123",
            agent_id="test-agent-456",
            organization_id="test-org-789",
            prompt="What is the capital of France?",
            system_prompt="You are a helpful assistant.",
            model_id="gpt-4",
        )

    @pytest.mark.asyncio
    async def test_execute_success(runtime, context):
        """Test successful execution."""
        result = await runtime.execute(context)

        assert result.success is True
        assert result.response  # Has response
        assert result.usage["total_tokens"] > 0
        assert result.finish_reason == "stop"

    @pytest.mark.asyncio
    async def test_execute_with_tools(runtime, context):
        """Test execution with tool calling."""
        # Add mock skills to context
        context.skills = [MagicMock()]

        result = await runtime.execute(context)

        assert result.success is True
        assert result.tool_execution_messages  # Tools were called

    @pytest.mark.asyncio
    async def test_stream_execute(runtime, context):
        """Test streaming execution."""
        chunks = []

        async for chunk in runtime.stream_execute(context):
            chunks.append(chunk)

        assert len(chunks) > 0
        assert chunks[-1].finish_reason == "stop"
        assert chunks[-1].usage["total_tokens"] > 0

    @pytest.mark.asyncio
    async def test_execute_error_handling(runtime, context):
        """Test error handling."""
        # Force an error
        context.model_id = None  # Invalid config

        result = await runtime.execute(context)

        assert result.success is False
        assert result.error is not None
        assert result.finish_reason == "error"

    def test_capabilities(runtime):
        """Test capability declaration."""
        caps = runtime.get_capabilities()

        assert caps.streaming is True
        assert caps.tools is True
        assert caps.conversation_history is True

    def test_runtime_type(runtime):
        """Test runtime type identification."""
        runtime_type = runtime.get_runtime_type()

        assert runtime_type == RuntimeType.CUSTOM
    ```

    **Integration tests** with real executions:

    ```python theme={null}
    @pytest.mark.integration
    @pytest.mark.asyncio
    async def test_full_execution_flow(control_plane_client, cancellation_manager):
        """Test complete execution flow with real dependencies."""
        runtime = MyCustomRuntime(control_plane_client, cancellation_manager)

        context = RuntimeExecutionContext(
            execution_id="integration-test-123",
            agent_id="agent-456",
            organization_id="org-789",
            prompt="Calculate 15 * 23",
            model_id="gpt-4",
        )

        result = await runtime.execute(context)

        assert result.success is True
        assert "345" in result.response  # Correct answer
    ```
  </Step>
</Steps>

***

## Real-World Examples

### Example 1: LangChain Runtime

**Key implementation patterns for LangChain integration:**

```python theme={null}
@RuntimeRegistry.register(RuntimeType.LANGCHAIN)
class LangChainRuntime(BaseRuntime):
    """LangChain runtime for chains, agents, and tools."""

    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult:
        """Execute using LangChain agent."""
        # 1. Create LangChain LLM
        llm = ChatOpenAI(model=context.model_id)

        # 2. Convert Kubiya Skills to LangChain tools
        tools = [
            StructuredTool.from_function(
                func=tool.entrypoint,
                name=tool.name,
                description=tool.description,
            )
            for skill in context.skills
            for tool in skill.get_tools().tools
        ]

        # 3. Build agent with prompt template
        agent = create_openai_functions_agent(llm, tools, prompt_template)
        agent_executor = AgentExecutor(agent=agent, tools=tools)

        # 4. Execute and return result
        response = await agent_executor.ainvoke({"input": context.prompt})

        return RuntimeExecutionResult(
            response=response["output"],
            usage=self._extract_usage(response),
            success=True,
        )

    def get_capabilities(self) -> RuntimeCapabilities:
        return RuntimeCapabilities(
            streaming=True,  # Via LangChain callbacks
            tools=True,
            conversation_history=True,
        )
```

**Key concepts:**

* Convert Kubiya Skills to LangChain `StructuredTool` format
* Use LangChain's `AgentExecutor` for tool orchestration
* Leverage LangChain callbacks for streaming
* Map conversation history to LangChain message format

### Example 2: CrewAI Runtime

**Key implementation patterns for multi-agent CrewAI:**

```python theme={null}
@RuntimeRegistry.register(RuntimeType.CREWAI)
class CrewAIRuntime(BaseRuntime):
    """CrewAI runtime for multi-agent collaboration."""

    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult:
        """Execute using CrewAI crew of specialized agents."""
        # 1. Define specialized agents
        researcher = Agent(
            role="Research Analyst",
            goal="Research and analyze information",
            llm=ChatOpenAI(model=context.model_id),
        )

        writer = Agent(
            role="Content Writer",
            goal="Write clear content",
            llm=ChatOpenAI(model=context.model_id),
        )

        # 2. Create task
        task = Task(
            description=context.prompt,
            agent=researcher,
        )

        # 3. Create crew and execute
        crew = Crew(
            agents=[researcher, writer],
            tasks=[task],
            process=Process.sequential,
        )

        result = crew.kickoff()

        return RuntimeExecutionResult(
            response=str(result),
            usage={"total_tokens": 0},  # CrewAI doesn't expose usage
            success=True,
        )

    def get_capabilities(self) -> RuntimeCapabilities:
        return RuntimeCapabilities(
            streaming=False,  # CrewAI doesn't stream natively
            tools=True,
            conversation_history=True,
        )
```

**Key concepts:**

* Define multiple specialized agents with roles
* Use CrewAI's `Process` for orchestration (sequential, hierarchical)
* Agents collaborate on tasks automatically
* Note: CrewAI doesn't provide token usage metrics

***

## Configuration Schema

**Define runtime-specific configuration** via `runtime_config` in agent configuration:

```python theme={null}
# In your runtime implementation:

def _validate_config(self, context: RuntimeExecutionContext) -> None:
    """Validate custom runtime configuration."""
    super()._validate_config(context)  # Call base validation

    runtime_config = context.runtime_config or {}

    # Validate required fields
    if "required_field" not in runtime_config:
        raise ValueError("required_field is missing from runtime_config")

    # Validate field types
    if not isinstance(runtime_config.get("max_iterations"), int):
        raise ValueError("max_iterations must be an integer")

# Configuration example via CLI:
# kubiya agent create my-agent \
#   --runtime langchain \
#   --runtime-config '{"max_iterations": 15, "agent_type": "openai-functions"}'

# Configuration example via API:
# {
#   "name": "my-agent",
#   "runtime": "langchain",
#   "runtime_config": {
#     "max_iterations": 15,
#     "agent_type": "openai-functions",
#     "memory_type": "conversation_buffer",
#     "verbose": true
#   }
# }
```

***

## Best Practices

<AccordionGroup>
  <Accordion title="Standardize usage metrics" icon="chart-bar">
    **Always populate `usage` in RuntimeExecutionResult** for analytics:

    ```python theme={null}
    return RuntimeExecutionResult(
        response=response_text,
        usage={
            "input_tokens": prompt_tokens,
            "output_tokens": completion_tokens,
            "total_tokens": total_tokens,
            # Optional but recommended:
            "cache_read_tokens": cached_tokens,
            "model_provider": "openai",  # or "anthropic", "google", etc.
        },
        success=True,
    )
    ```

    If your framework doesn't expose usage, estimate or use callbacks.
  </Accordion>

  <Accordion title="Implement proper error handling" icon="triangle-exclamation">
    **Use lifecycle hooks for graceful degradation:**

    ```python theme={null}
    async def on_error(self, context, error):
        """Handle errors gracefully."""
        self.logger.error("execution_error", error=str(error))

        # Specific error handling
        if isinstance(error, ModelNotFoundError):
            return RuntimeExecutionResult(
                response="Model not available. Please check configuration.",
                usage={},
                success=False,
                error="Model not found",
            )

        # Generic fallback
        return await super().on_error(context, error)
    ```
  </Accordion>

  <Accordion title="Support conversation history" icon="clock-rotate-left">
    **Convert Kubiya conversation history to your framework's format:**

    ```python theme={null}
    def _build_messages(self, conversation_history: List[Dict]):
        """Convert to framework-specific format."""
        messages = []

        for msg in conversation_history:
            role = msg["role"]
            content = msg["content"]

            # Convert to your format
            if role == "user":
                messages.append(YourFrameworkUserMessage(content))
            elif role == "assistant":
                messages.append(YourFrameworkAssistantMessage(content))

        return messages
    ```
  </Accordion>

  <Accordion title="Leverage Control Plane integration" icon="network-wired">
    **Use the Control Plane client for metadata, caching, and state:**

    ```python theme={null}
    # In your runtime:
    async def before_execute(self, context):
        # Cache execution metadata
        self.control_plane.cache_metadata(
            context.execution_id,
            "AGENT",
        )

        # Get cached data if needed
        cached_data = await self.control_plane.get_cached_data(context.agent_id)
    ```
  </Accordion>

  <Accordion title="Document your runtime" icon="book">
    **Provide comprehensive documentation for users:**

    ```markdown theme={null}
    # LangChain Runtime

    ## Overview
    [Description of what your runtime does]

    ## Supported Features
    - Streaming: Yes
    - Tools: Yes
    - MCP: No
    - Conversation history: Yes

    ## Configuration
    [Explain runtime_config options]

    ## Usage Examples
    [CLI and API examples]

    ## Limitations
    [Known limitations or constraints]

    ## Troubleshooting
    [Common issues and solutions]
    ```
  </Accordion>

  <Accordion title="Test thoroughly" icon="vial">
    **Write comprehensive tests:**

    ```python theme={null}
    # Unit tests
    - test_execute_success
    - test_execute_with_tools
    - test_execute_error_handling
    - test_stream_execute
    - test_conversation_history
    - test_capabilities
    - test_configuration_validation

    # Integration tests
    - test_full_execution_flow
    - test_tool_calling_integration
    - test_control_plane_integration

    # Performance tests
    - test_execution_latency
    - test_streaming_throughput
    - test_concurrent_executions
    ```
  </Accordion>
</AccordionGroup>

***

## API Reference

### BaseRuntime Class

**Abstract base class for all runtimes**

```python theme={null}
class BaseRuntime(ABC):
    """
    Abstract base class for agent runtimes.

    Attributes:
        control_plane: Control Plane client
        cancellation_manager: Cancellation manager
        logger: Structured logger
        config: Additional configuration
    """

    # Abstract methods (must implement)
    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult
    async def _stream_execute_impl(self, context: RuntimeExecutionContext, event_callback) -> AsyncIterator[RuntimeExecutionResult]
    def get_runtime_type(self) -> RuntimeType
    def get_capabilities(self) -> RuntimeCapabilities

    # Public interface (don't override)
    async def execute(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult
    async def stream_execute(self, context: RuntimeExecutionContext, event_callback) -> AsyncIterator[RuntimeExecutionResult]
    async def cancel(self, execution_id: str) -> bool
    async def get_usage(self, execution_id: str) -> Dict[str, Any]

    # Lifecycle hooks (override as needed)
    async def before_execute(self, context: RuntimeExecutionContext) -> None
    async def after_execute(self, context: RuntimeExecutionContext, result: RuntimeExecutionResult) -> None
    async def on_error(self, context: RuntimeExecutionContext, error: Exception) -> RuntimeExecutionResult

    # Helper methods (override as needed)
    async def _cancel_impl(self, execution_id: str) -> bool
    async def _get_usage_impl(self, execution_id: str) -> Dict[str, Any]
    def _validate_config(self, context: RuntimeExecutionContext) -> None
```

### RuntimeExecutionContext

**Input context passed to runtime:**

```python theme={null}
@dataclass
class RuntimeExecutionContext:
    execution_id: str              # Unique execution ID
    agent_id: str                  # Agent identifier
    organization_id: str           # Organization context
    prompt: str                    # User's input message
    system_prompt: Optional[str]   # System instructions
    conversation_history: List[Dict]  # Previous messages
    model_id: Optional[str]        # Model identifier
    model_config: Optional[Dict]   # Model configuration
    agent_config: Optional[Dict]   # Agent configuration
    skills: List[Any]              # Resolved skills
    mcp_servers: Optional[Dict]    # MCP server configs
    user_metadata: Optional[Dict]  # User metadata
    runtime_config: Optional[Dict] # Runtime-specific config
    # Enforcement context fields
    user_email: Optional[str]
    user_id: Optional[str]
    user_roles: List[str]
    team_id: Optional[str]
    team_name: Optional[str]
    environment: str = "production"
```

### RuntimeExecutionResult

**Output structure from runtime:**

```python theme={null}
@dataclass
class RuntimeExecutionResult:
    response: str                  # Agent's response
    usage: Dict[str, Any]          # Token usage metrics
    success: bool                  # Execution success flag
    finish_reason: Optional[str]   # "stop", "length", "tool_use", "error"
    run_id: Optional[str]          # Run identifier
    model: Optional[str]           # Model used
    tool_execution_messages: Optional[List[Dict]]  # Tool execution details
    tool_messages: Optional[List[Dict]]            # Detailed tool messages
    error: Optional[str]           # Error message if failed
    metadata: Dict[str, Any]       # Additional metadata
```

### RuntimeCapabilities

**Runtime capability flags:**

```python theme={null}
@dataclass
class RuntimeCapabilities:
    streaming: bool = False              # Supports streaming
    tools: bool = False                  # Supports tool calling
    mcp: bool = False                    # Supports MCP servers
    hooks: bool = False                  # Supports lifecycle hooks
    cancellation: bool = False           # Supports cancellation
    conversation_history: bool = False   # Supports history
    custom_tools: bool = False           # Supports custom tools
```

### RuntimeRegistry

**Runtime registration and discovery:**

```python theme={null}
class RuntimeRegistry:
    """Registry for runtime discovery."""

    @classmethod
    def register(cls, runtime_type: RuntimeType) -> Callable
        """Decorator to register a runtime."""

    @classmethod
    def get(cls, runtime_type: RuntimeType) -> Type[BaseRuntime]
        """Get runtime class by type."""

    @classmethod
    def create(cls, runtime_type: RuntimeType, control_plane_client, cancellation_manager, **kwargs) -> BaseRuntime
        """Create runtime instance."""

    @classmethod
    def list_available(cls) -> List[RuntimeType]
        """List all registered runtimes."""

    @classmethod
    def get_runtime_info_all(cls) -> Dict[str, Dict[str, Any]]
        """Get info about all runtimes."""
```

***

## Deployment

**Steps to deploy your custom runtime:**

1. **Implement your runtime class** following this guide
2. **Add to RuntimeType enum** in `base.py` (if adding new type)
3. **Register with decorator**: `@RuntimeRegistry.register(RuntimeType.YOUR_RUNTIME)`
4. **Write tests** (unit + integration)
5. **Document** your runtime (README.md in runtime directory)
6. **Update agent configuration** to use your runtime:

```bash theme={null}
# Via CLI
kubiya agent create my-agent \
  --runtime your_custom_runtime \
  --model gpt-4

# Via API
curl -X POST https://control-plane.kubiya.ai/api/v1/agents \
  -H "Authorization: Bearer $KUBIYA_API_KEY" \
  -d '{
    "name": "my-agent",
    "runtime": "your_custom_runtime",
    "model_id": "gpt-4"
  }'
```

7. **Monitor execution** via Kubiya dashboard and logs

***

## Next Steps

<CardGroup cols={2}>
  <Card title="Agno Runtime Source" icon="code" href="https://github.com/kubiya-ai/kubiya/tree/main/agent-control-plane/control_plane_api/worker/runtimes/agno">
    Reference implementation of Agno runtime
  </Card>

  <Card title="Claude Code Runtime Source" icon="code" href="https://github.com/kubiya-ai/kubiya/tree/main/agent-control-plane/control_plane_api/worker/runtimes/claude_code">
    Reference implementation of Claude Code runtime
  </Card>

  <Card title="Skills Documentation" icon="layer-group" href="/core-concepts/skills">
    Learn about Skills (tools) integration
  </Card>

  <Card title="Control Plane API" icon="network-wired" href="/core-concepts/control-plane/overview">
    Control Plane integration and runtime registry
  </Card>
</CardGroup>
