Skip to main content
Kubiya’s runtime-agnostic architecture allows you to extend the platform with custom runtimes using any Python-based AI framework. Whether you want to integrate LangChain, CrewAI, AutoGen, or build a completely custom solution, this guide shows you how.
Developer-Focused DocumentationThis page contains technical implementation details for building custom runtimes. For user-facing runtime selection and configuration, see the Runtime Comparison page.

Why Build a Custom Runtime?

Custom runtimes enable you to:
  1. Integrate specialized frameworks: Use LangChain chains, CrewAI crews, AutoGen agents, or any Python AI framework
  2. Implement custom orchestration: Build complex multi-agent systems or specialized workflow patterns
  3. Optimize for specific use cases: Create runtimes tailored to your organization’s needs
  4. Leverage existing tooling: Integrate with internal tools, APIs, and infrastructure
  5. Control execution logic: Full control over model interactions, tool calling, and conversation management
Common use cases:
  • Multi-agent collaboration systems (CrewAI, AutoGen)
  • Specialized RAG implementations (LangChain with custom retrieval)
  • Domain-specific orchestration (healthcare, finance, legal)
  • Integration with proprietary AI systems
  • Custom prompt engineering and model routing

Runtime Architecture

All runtimes in Kubiya inherit from the BaseRuntime abstract class and register via the RuntimeRegistry: Key components:
ComponentPurpose
BaseRuntimeAbstract base class defining the runtime interface
RuntimeRegistryDecorator-based registration system
RuntimeExecutionContextInput context with agent config, prompt, skills, etc.
RuntimeExecutionResultStandardized output with response, usage, metadata
RuntimeCapabilitiesFeature flags (streaming, tools, MCP, etc.)
RuntimeTypeEnum for runtime identification

Building a Custom Runtime

1

Understand the Base Class

Review the BaseRuntime interface in /control_plane_api/worker/runtimes/base.pyKey abstract methods you must implement:
from control_plane_api.worker.runtimes.base import (
    BaseRuntime,
    RuntimeType,
    RuntimeCapabilities,
    RuntimeExecutionContext,
    RuntimeExecutionResult,
    RuntimeRegistry,
)

class MyCustomRuntime(BaseRuntime):
    """Your custom runtime implementation."""

    async def _execute_impl(
        self, context: RuntimeExecutionContext
    ) -> RuntimeExecutionResult:
        """
        Core execution logic (non-streaming).

        Args:
            context: Contains agent_id, prompt, skills, model_config, etc.

        Returns:
            RuntimeExecutionResult with response, usage, success flag
        """
        pass  # Implement your logic

    async def _stream_execute_impl(
        self,
        context: RuntimeExecutionContext,
        event_callback: Optional[Callable] = None,
    ) -> AsyncIterator[RuntimeExecutionResult]:
        """
        Core streaming execution logic.

        Yields:
            RuntimeExecutionResult chunks for real-time feedback
        """
        pass  # Implement your streaming logic

    def get_runtime_type(self) -> RuntimeType:
        """Return your runtime type identifier."""
        return RuntimeType.CUSTOM  # Or add to enum

    def get_capabilities(self) -> RuntimeCapabilities:
        """Declare what your runtime supports."""
        return RuntimeCapabilities(
            streaming=True,
            tools=True,
            mcp=True,
            hooks=True,
            cancellation=True,
            conversation_history=True,
            custom_tools=True,
        )
What the base class provides for free:
  • Lifecycle management (execute() and stream_execute() orchestration)
  • Automatic hook calling (before_execute, after_execute, on_error)
  • Control Plane integration (metadata caching)
  • Cancellation registration
  • Configuration validation
  • Error handling framework
2

Create Your Runtime Class

File structure for a custom runtime:
control_plane_api/worker/runtimes/
├── my_custom/
│   ├── __init__.py
│   ├── runtime.py          # Main runtime class
│   ├── config.py           # Configuration builder
│   ├── hooks.py            # Tool execution hooks (optional)
│   └── README.md           # Documentation
└── base.py                 # BaseRuntime class
Basic runtime skeleton:
# control_plane_api/worker/runtimes/my_custom/runtime.py

import structlog
from typing import AsyncIterator, Optional, Callable, Dict, Any
from control_plane_api.worker.runtimes.base import (
    BaseRuntime,
    RuntimeType,
    RuntimeCapabilities,
    RuntimeExecutionContext,
    RuntimeExecutionResult,
    RuntimeRegistry,
)

logger = structlog.get_logger(__name__)


@RuntimeRegistry.register(RuntimeType.CUSTOM)
class MyCustomRuntime(BaseRuntime):
    """
    Custom runtime using [Your Framework].

    Features:
    - [List your key features]
    - [Tool integration approach]
    - [Model support]
    """

    def __init__(self, control_plane_client: Any, cancellation_manager: Any, **kwargs):
        """Initialize the custom runtime."""
        super().__init__(control_plane_client, cancellation_manager, **kwargs)

        # Initialize your framework
        self.my_framework_client = self._init_framework()

        logger.info("custom_runtime_initialized", runtime_type="my_custom")

    def _init_framework(self):
        """Initialize your AI framework (LangChain, CrewAI, etc.)."""
        # Your initialization logic
        pass

    async def _execute_impl(
        self, context: RuntimeExecutionContext
    ) -> RuntimeExecutionResult:
        """Execute using your custom framework."""
        try:
            # 1. Extract configuration
            prompt = context.prompt
            system_prompt = context.system_prompt
            model_id = context.model_id
            skills = context.skills

            # 2. Build your framework's agent/chain
            agent = self._build_agent(context)

            # 3. Execute
            response = await agent.run(prompt)

            # 4. Extract usage metrics
            usage = self._extract_usage(response)

            # 5. Return standardized result
            return RuntimeExecutionResult(
                response=response.get("output", ""),
                usage=usage,
                success=True,
                finish_reason="stop",
                model=model_id,
            )

        except Exception as e:
            logger.error("execution_failed", error=str(e))
            return RuntimeExecutionResult(
                response="",
                usage={},
                success=False,
                error=str(e),
                finish_reason="error",
            )

    async def _stream_execute_impl(
        self,
        context: RuntimeExecutionContext,
        event_callback: Optional[Callable] = None,
    ) -> AsyncIterator[RuntimeExecutionResult]:
        """Stream execution with real-time feedback."""
        # Implement streaming if your framework supports it
        agent = self._build_agent(context)

        accumulated_response = ""

        async for chunk in agent.stream(context.prompt):
            accumulated_response += chunk

            # Yield partial result
            yield RuntimeExecutionResult(
                response=accumulated_response,
                usage={},  # Update with partial usage if available
                success=True,
                finish_reason=None,  # None until completion
            )

        # Final chunk with usage
        yield RuntimeExecutionResult(
            response=accumulated_response,
            usage=self._extract_usage(agent),
            success=True,
            finish_reason="stop",
        )

    def get_runtime_type(self) -> RuntimeType:
        """Return runtime type."""
        return RuntimeType.CUSTOM  # Or add new type to enum

    def get_capabilities(self) -> RuntimeCapabilities:
        """Declare runtime capabilities."""
        return RuntimeCapabilities(
            streaming=True,  # Set based on your implementation
            tools=True,
            mcp=False,  # Set based on MCP support
            hooks=True,
            cancellation=True,
            conversation_history=True,
            custom_tools=False,
        )

    def _build_agent(self, context: RuntimeExecutionContext):
        """Build your framework's agent/chain."""
        # Implement based on your framework
        pass

    def _extract_usage(self, response) -> Dict[str, Any]:
        """Extract token usage from framework response."""
        # Standardized format:
        return {
            "input_tokens": response.get("prompt_tokens", 0),
            "output_tokens": response.get("completion_tokens", 0),
            "total_tokens": response.get("total_tokens", 0),
        }
3

Implement Tool Integration

Tools are the capabilities your agent can use (Skills, MCP servers, custom functions).Option 1: Convert Kubiya Skills to your framework’s tool format
def _build_tools_from_skills(self, skills: List[Any]) -> List[Any]:
    """
    Convert Kubiya Skills to your framework's tool format.

    Args:
        skills: List of resolved Kubiya Skills

    Returns:
        List of tools in your framework's format
    """
    framework_tools = []

    for skill in skills:
        # Get Agno Toolkit from skill
        if hasattr(skill, "get_tools"):
            toolkit = skill.get_tools()

            # Convert each tool
            for tool in toolkit.tools:
                framework_tool = self._convert_tool(tool)
                framework_tools.append(framework_tool)

    return framework_tools

def _convert_tool(self, agno_tool):
    """Convert Agno tool to your framework's format."""
    # Example for LangChain:
    from langchain.tools import StructuredTool

    return StructuredTool.from_function(
        func=agno_tool.entrypoint,
        name=agno_tool.name,
        description=agno_tool.description,
        args_schema=agno_tool.parameters,  # Pydantic model
    )
Option 2: Native MCP Server integration
async def _build_mcp_tools(self, mcp_servers: Dict) -> List[Any]:
    """
    Connect to MCP servers and convert tools.

    Args:
        mcp_servers: MCP server configurations from context

    Returns:
        List of tools from MCP servers
    """
    from mcp import ClientSession, StdioServerParameters
    from mcp.client.stdio import stdio_client

    tools = []

    for server_name, server_config in mcp_servers.items():
        async with stdio_client(
            StdioServerParameters(
                command=server_config["command"],
                args=server_config.get("args", []),
                env=server_config.get("env"),
            )
        ) as (read, write):
            async with ClientSession(read, write) as session:
                await session.initialize()

                # List tools from MCP server
                mcp_tools = await session.list_tools()

                # Convert to your framework's format
                for mcp_tool in mcp_tools.tools:
                    framework_tool = self._convert_mcp_tool(mcp_tool)
                    tools.append(framework_tool)

    return tools
Option 3: Hybrid approach
def _build_all_tools(self, context: RuntimeExecutionContext):
    """Combine Skills and MCP tools."""
    all_tools = []

    # Add Skills
    if context.skills:
        skill_tools = self._build_tools_from_skills(context.skills)
        all_tools.extend(skill_tools)

    # Add MCP servers
    if context.mcp_servers:
        mcp_tools = await self._build_mcp_tools(context.mcp_servers)
        all_tools.extend(mcp_tools)

    return all_tools
4

Add Lifecycle Hooks

Lifecycle hooks enable monitoring, logging, and integration points.
async def before_execute(self, context: RuntimeExecutionContext) -> None:
    """
    Hook called before execution starts.

    Use for:
    - Validation
    - Resource setup
    - Logging
    - Metrics initialization
    """
    self.logger.info(
        "execution_starting",
        execution_id=context.execution_id[:8],
        agent_id=context.agent_id,
        model=context.model_id,
    )

    # Validate custom requirements
    if not context.model_id:
        raise ValueError("model_id is required for custom runtime")

    # Setup resources (e.g., database connections)
    self._setup_resources(context)

async def after_execute(
    self, context: RuntimeExecutionContext, result: RuntimeExecutionResult
) -> None:
    """
    Hook called after successful execution.

    Use for:
    - Cleanup
    - Metrics logging
    - Analytics submission
    - Webhooks
    """
    self.logger.info(
        "execution_completed",
        execution_id=context.execution_id[:8],
        success=result.success,
        input_tokens=result.usage.get("input_tokens", 0),
        output_tokens=result.usage.get("output_tokens", 0),
    )

    # Cleanup resources
    self._cleanup_resources()

    # Submit custom metrics
    await self._submit_metrics(context, result)

async def on_error(
    self, context: RuntimeExecutionContext, error: Exception
) -> RuntimeExecutionResult:
    """
    Hook called when execution fails.

    Use for:
    - Error logging
    - Alerting
    - Fallback logic
    - Custom error handling
    """
    self.logger.error(
        "execution_error",
        execution_id=context.execution_id[:8],
        error_type=type(error).__name__,
        error_message=str(error),
    )

    # Optional: Implement fallback logic
    if isinstance(error, RateLimitError):
        return await self._handle_rate_limit(context)

    # Return error result
    return RuntimeExecutionResult(
        response=f"Execution failed: {str(error)}",
        usage={},
        success=False,
        error=str(error),
        finish_reason="error",
    )
5

Register Your Runtime

Use the @RuntimeRegistry.register() decorator to make your runtime discoverable:
from control_plane_api.worker.runtimes.base import RuntimeType, RuntimeRegistry

# Option 1: Use existing RuntimeType
@RuntimeRegistry.register(RuntimeType.CUSTOM)
class MyCustomRuntime(BaseRuntime):
    pass

# Option 2: Add new RuntimeType to enum (recommended)
# In base.py, add:
# class RuntimeType(str, Enum):
#     DEFAULT = "default"
#     CLAUDE_CODE = "claude_code"
#     LANGCHAIN = "langchain"      # Add this
#     CREWAI = "crewai"            # Add this

@RuntimeRegistry.register(RuntimeType.LANGCHAIN)
class LangChainRuntime(BaseRuntime):
    pass
Runtime discovery:
# List all registered runtimes
available_runtimes = RuntimeRegistry.list_available()
# Output: [RuntimeType.DEFAULT, RuntimeType.CLAUDE_CODE, RuntimeType.LANGCHAIN]

# Get runtime info
runtime_info = RuntimeRegistry.get_runtime_info_all()
# Output: {
#   "default": {...},
#   "claude_code": {...},
#   "langchain": {...}
# }
6

Test Your Runtime

Unit tests for your runtime implementation:
# tests/worker/runtimes/test_my_custom_runtime.py

import pytest
from unittest.mock import AsyncMock, MagicMock
from control_plane_api.worker.runtimes.my_custom.runtime import MyCustomRuntime
from control_plane_api.worker.runtimes.base import RuntimeExecutionContext

@pytest.fixture
def runtime():
    """Create runtime instance with mocked dependencies."""
    control_plane = MagicMock()
    cancellation_manager = MagicMock()
    return MyCustomRuntime(control_plane, cancellation_manager)

@pytest.fixture
def context():
    """Create execution context for testing."""
    return RuntimeExecutionContext(
        execution_id="test-execution-123",
        agent_id="test-agent-456",
        organization_id="test-org-789",
        prompt="What is the capital of France?",
        system_prompt="You are a helpful assistant.",
        model_id="gpt-4",
    )

@pytest.mark.asyncio
async def test_execute_success(runtime, context):
    """Test successful execution."""
    result = await runtime.execute(context)

    assert result.success is True
    assert result.response  # Has response
    assert result.usage["total_tokens"] > 0
    assert result.finish_reason == "stop"

@pytest.mark.asyncio
async def test_execute_with_tools(runtime, context):
    """Test execution with tool calling."""
    # Add mock skills to context
    context.skills = [MagicMock()]

    result = await runtime.execute(context)

    assert result.success is True
    assert result.tool_execution_messages  # Tools were called

@pytest.mark.asyncio
async def test_stream_execute(runtime, context):
    """Test streaming execution."""
    chunks = []

    async for chunk in runtime.stream_execute(context):
        chunks.append(chunk)

    assert len(chunks) > 0
    assert chunks[-1].finish_reason == "stop"
    assert chunks[-1].usage["total_tokens"] > 0

@pytest.mark.asyncio
async def test_execute_error_handling(runtime, context):
    """Test error handling."""
    # Force an error
    context.model_id = None  # Invalid config

    result = await runtime.execute(context)

    assert result.success is False
    assert result.error is not None
    assert result.finish_reason == "error"

def test_capabilities(runtime):
    """Test capability declaration."""
    caps = runtime.get_capabilities()

    assert caps.streaming is True
    assert caps.tools is True
    assert caps.conversation_history is True

def test_runtime_type(runtime):
    """Test runtime type identification."""
    runtime_type = runtime.get_runtime_type()

    assert runtime_type == RuntimeType.CUSTOM
Integration tests with real executions:
@pytest.mark.integration
@pytest.mark.asyncio
async def test_full_execution_flow(control_plane_client, cancellation_manager):
    """Test complete execution flow with real dependencies."""
    runtime = MyCustomRuntime(control_plane_client, cancellation_manager)

    context = RuntimeExecutionContext(
        execution_id="integration-test-123",
        agent_id="agent-456",
        organization_id="org-789",
        prompt="Calculate 15 * 23",
        model_id="gpt-4",
    )

    result = await runtime.execute(context)

    assert result.success is True
    assert "345" in result.response  # Correct answer

Real-World Examples

Example 1: LangChain Runtime

Key implementation patterns for LangChain integration:
@RuntimeRegistry.register(RuntimeType.LANGCHAIN)
class LangChainRuntime(BaseRuntime):
    """LangChain runtime for chains, agents, and tools."""

    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult:
        """Execute using LangChain agent."""
        # 1. Create LangChain LLM
        llm = ChatOpenAI(model=context.model_id)

        # 2. Convert Kubiya Skills to LangChain tools
        tools = [
            StructuredTool.from_function(
                func=tool.entrypoint,
                name=tool.name,
                description=tool.description,
            )
            for skill in context.skills
            for tool in skill.get_tools().tools
        ]

        # 3. Build agent with prompt template
        agent = create_openai_functions_agent(llm, tools, prompt_template)
        agent_executor = AgentExecutor(agent=agent, tools=tools)

        # 4. Execute and return result
        response = await agent_executor.ainvoke({"input": context.prompt})

        return RuntimeExecutionResult(
            response=response["output"],
            usage=self._extract_usage(response),
            success=True,
        )

    def get_capabilities(self) -> RuntimeCapabilities:
        return RuntimeCapabilities(
            streaming=True,  # Via LangChain callbacks
            tools=True,
            conversation_history=True,
        )
Key concepts:
  • Convert Kubiya Skills to LangChain StructuredTool format
  • Use LangChain’s AgentExecutor for tool orchestration
  • Leverage LangChain callbacks for streaming
  • Map conversation history to LangChain message format

Example 2: CrewAI Runtime

Key implementation patterns for multi-agent CrewAI:
@RuntimeRegistry.register(RuntimeType.CREWAI)
class CrewAIRuntime(BaseRuntime):
    """CrewAI runtime for multi-agent collaboration."""

    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult:
        """Execute using CrewAI crew of specialized agents."""
        # 1. Define specialized agents
        researcher = Agent(
            role="Research Analyst",
            goal="Research and analyze information",
            llm=ChatOpenAI(model=context.model_id),
        )

        writer = Agent(
            role="Content Writer",
            goal="Write clear content",
            llm=ChatOpenAI(model=context.model_id),
        )

        # 2. Create task
        task = Task(
            description=context.prompt,
            agent=researcher,
        )

        # 3. Create crew and execute
        crew = Crew(
            agents=[researcher, writer],
            tasks=[task],
            process=Process.sequential,
        )

        result = crew.kickoff()

        return RuntimeExecutionResult(
            response=str(result),
            usage={"total_tokens": 0},  # CrewAI doesn't expose usage
            success=True,
        )

    def get_capabilities(self) -> RuntimeCapabilities:
        return RuntimeCapabilities(
            streaming=False,  # CrewAI doesn't stream natively
            tools=True,
            conversation_history=True,
        )
Key concepts:
  • Define multiple specialized agents with roles
  • Use CrewAI’s Process for orchestration (sequential, hierarchical)
  • Agents collaborate on tasks automatically
  • Note: CrewAI doesn’t provide token usage metrics

Configuration Schema

Define runtime-specific configuration via runtime_config in agent configuration:
# In your runtime implementation:

def _validate_config(self, context: RuntimeExecutionContext) -> None:
    """Validate custom runtime configuration."""
    super()._validate_config(context)  # Call base validation

    runtime_config = context.runtime_config or {}

    # Validate required fields
    if "required_field" not in runtime_config:
        raise ValueError("required_field is missing from runtime_config")

    # Validate field types
    if not isinstance(runtime_config.get("max_iterations"), int):
        raise ValueError("max_iterations must be an integer")

# Configuration example via CLI:
# kubiya agent create my-agent \
#   --runtime langchain \
#   --runtime-config '{"max_iterations": 15, "agent_type": "openai-functions"}'

# Configuration example via API:
# {
#   "name": "my-agent",
#   "runtime": "langchain",
#   "runtime_config": {
#     "max_iterations": 15,
#     "agent_type": "openai-functions",
#     "memory_type": "conversation_buffer",
#     "verbose": true
#   }
# }

Best Practices

Always populate usage in RuntimeExecutionResult for analytics:
return RuntimeExecutionResult(
    response=response_text,
    usage={
        "input_tokens": prompt_tokens,
        "output_tokens": completion_tokens,
        "total_tokens": total_tokens,
        # Optional but recommended:
        "cache_read_tokens": cached_tokens,
        "model_provider": "openai",  # or "anthropic", "google", etc.
    },
    success=True,
)
If your framework doesn’t expose usage, estimate or use callbacks.
Use lifecycle hooks for graceful degradation:
async def on_error(self, context, error):
    """Handle errors gracefully."""
    self.logger.error("execution_error", error=str(error))

    # Specific error handling
    if isinstance(error, ModelNotFoundError):
        return RuntimeExecutionResult(
            response="Model not available. Please check configuration.",
            usage={},
            success=False,
            error="Model not found",
        )

    # Generic fallback
    return await super().on_error(context, error)
Convert Kubiya conversation history to your framework’s format:
def _build_messages(self, conversation_history: List[Dict]):
    """Convert to framework-specific format."""
    messages = []

    for msg in conversation_history:
        role = msg["role"]
        content = msg["content"]

        # Convert to your format
        if role == "user":
            messages.append(YourFrameworkUserMessage(content))
        elif role == "assistant":
            messages.append(YourFrameworkAssistantMessage(content))

    return messages
Use the Control Plane client for metadata, caching, and state:
# In your runtime:
async def before_execute(self, context):
    # Cache execution metadata
    self.control_plane.cache_metadata(
        context.execution_id,
        "AGENT",
    )

    # Get cached data if needed
    cached_data = await self.control_plane.get_cached_data(context.agent_id)
Provide comprehensive documentation for users:
# LangChain Runtime

## Overview
[Description of what your runtime does]

## Supported Features
- Streaming: Yes
- Tools: Yes
- MCP: No
- Conversation history: Yes

## Configuration
[Explain runtime_config options]

## Usage Examples
[CLI and API examples]

## Limitations
[Known limitations or constraints]

## Troubleshooting
[Common issues and solutions]
Write comprehensive tests:
# Unit tests
- test_execute_success
- test_execute_with_tools
- test_execute_error_handling
- test_stream_execute
- test_conversation_history
- test_capabilities
- test_configuration_validation

# Integration tests
- test_full_execution_flow
- test_tool_calling_integration
- test_control_plane_integration

# Performance tests
- test_execution_latency
- test_streaming_throughput
- test_concurrent_executions

API Reference

BaseRuntime Class

Abstract base class for all runtimes
class BaseRuntime(ABC):
    """
    Abstract base class for agent runtimes.

    Attributes:
        control_plane: Control Plane client
        cancellation_manager: Cancellation manager
        logger: Structured logger
        config: Additional configuration
    """

    # Abstract methods (must implement)
    async def _execute_impl(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult
    async def _stream_execute_impl(self, context: RuntimeExecutionContext, event_callback) -> AsyncIterator[RuntimeExecutionResult]
    def get_runtime_type(self) -> RuntimeType
    def get_capabilities(self) -> RuntimeCapabilities

    # Public interface (don't override)
    async def execute(self, context: RuntimeExecutionContext) -> RuntimeExecutionResult
    async def stream_execute(self, context: RuntimeExecutionContext, event_callback) -> AsyncIterator[RuntimeExecutionResult]
    async def cancel(self, execution_id: str) -> bool
    async def get_usage(self, execution_id: str) -> Dict[str, Any]

    # Lifecycle hooks (override as needed)
    async def before_execute(self, context: RuntimeExecutionContext) -> None
    async def after_execute(self, context: RuntimeExecutionContext, result: RuntimeExecutionResult) -> None
    async def on_error(self, context: RuntimeExecutionContext, error: Exception) -> RuntimeExecutionResult

    # Helper methods (override as needed)
    async def _cancel_impl(self, execution_id: str) -> bool
    async def _get_usage_impl(self, execution_id: str) -> Dict[str, Any]
    def _validate_config(self, context: RuntimeExecutionContext) -> None

RuntimeExecutionContext

Input context passed to runtime:
@dataclass
class RuntimeExecutionContext:
    execution_id: str              # Unique execution ID
    agent_id: str                  # Agent identifier
    organization_id: str           # Organization context
    prompt: str                    # User's input message
    system_prompt: Optional[str]   # System instructions
    conversation_history: List[Dict]  # Previous messages
    model_id: Optional[str]        # Model identifier
    model_config: Optional[Dict]   # Model configuration
    agent_config: Optional[Dict]   # Agent configuration
    skills: List[Any]              # Resolved skills
    mcp_servers: Optional[Dict]    # MCP server configs
    user_metadata: Optional[Dict]  # User metadata
    runtime_config: Optional[Dict] # Runtime-specific config
    # Enforcement context fields
    user_email: Optional[str]
    user_id: Optional[str]
    user_roles: List[str]
    team_id: Optional[str]
    team_name: Optional[str]
    environment: str = "production"

RuntimeExecutionResult

Output structure from runtime:
@dataclass
class RuntimeExecutionResult:
    response: str                  # Agent's response
    usage: Dict[str, Any]          # Token usage metrics
    success: bool                  # Execution success flag
    finish_reason: Optional[str]   # "stop", "length", "tool_use", "error"
    run_id: Optional[str]          # Run identifier
    model: Optional[str]           # Model used
    tool_execution_messages: Optional[List[Dict]]  # Tool execution details
    tool_messages: Optional[List[Dict]]            # Detailed tool messages
    error: Optional[str]           # Error message if failed
    metadata: Dict[str, Any]       # Additional metadata

RuntimeCapabilities

Runtime capability flags:
@dataclass
class RuntimeCapabilities:
    streaming: bool = False              # Supports streaming
    tools: bool = False                  # Supports tool calling
    mcp: bool = False                    # Supports MCP servers
    hooks: bool = False                  # Supports lifecycle hooks
    cancellation: bool = False           # Supports cancellation
    conversation_history: bool = False   # Supports history
    custom_tools: bool = False           # Supports custom tools

RuntimeRegistry

Runtime registration and discovery:
class RuntimeRegistry:
    """Registry for runtime discovery."""

    @classmethod
    def register(cls, runtime_type: RuntimeType) -> Callable
        """Decorator to register a runtime."""

    @classmethod
    def get(cls, runtime_type: RuntimeType) -> Type[BaseRuntime]
        """Get runtime class by type."""

    @classmethod
    def create(cls, runtime_type: RuntimeType, control_plane_client, cancellation_manager, **kwargs) -> BaseRuntime
        """Create runtime instance."""

    @classmethod
    def list_available(cls) -> List[RuntimeType]
        """List all registered runtimes."""

    @classmethod
    def get_runtime_info_all(cls) -> Dict[str, Dict[str, Any]]
        """Get info about all runtimes."""

Deployment

Steps to deploy your custom runtime:
  1. Implement your runtime class following this guide
  2. Add to RuntimeType enum in base.py (if adding new type)
  3. Register with decorator: @RuntimeRegistry.register(RuntimeType.YOUR_RUNTIME)
  4. Write tests (unit + integration)
  5. Document your runtime (README.md in runtime directory)
  6. Update agent configuration to use your runtime:
# Via CLI
kubiya agent create my-agent \
  --runtime your_custom_runtime \
  --model gpt-4

# Via API
curl -X POST https://control-plane.kubiya.ai/api/v1/agents \
  -H "Authorization: Bearer $KUBIYA_API_KEY" \
  -d '{
    "name": "my-agent",
    "runtime": "your_custom_runtime",
    "model_id": "gpt-4"
  }'
  1. Monitor execution via Kubiya dashboard and logs

Next Steps

Agno Runtime Source

Reference implementation of Agno runtime

Claude Code Runtime Source

Reference implementation of Claude Code runtime

Skills Documentation

Learn about Skills (tools) integration

Control Plane API

Control Plane integration and runtime registry