Comprehensive examples of integrating Kubiya MCP server with various AI agents and custom applications
# ~/.config/Claude/claude_desktop_config.json (Linux)
# ~/Library/Application Support/Claude/claude_desktop_config.json (macOS)
# %APPDATA%\Claude\claude_desktop_config.json (Windows)
{
"mcpServers": {
"kubiya": {
"command": "kubiya",
"args": ["mcp", "serve"],
"env": {
"KUBIYA_API_KEY": "kb-your-api-key-here",
"KUBIYA_DEFAULT_RUNNER": "auto",
"KUBIYA_OPA_ENFORCE": "false"
}
}
}
}
{
"mcpServers": {
"kubiya-production": {
"command": "kubiya",
"args": ["mcp", "serve", "--allow-platform-apis"],
"env": {
"KUBIYA_API_KEY": "kb-prod-key",
"KUBIYA_DEFAULT_RUNNER": "k8s-production",
"KUBIYA_OPA_ENFORCE": "true",
"KUBIYA_LOG_LEVEL": "INFO"
}
},
"kubiya-development": {
"command": "kubiya",
"args": ["mcp", "serve"],
"env": {
"KUBIYA_API_KEY": "kb-dev-key",
"KUBIYA_DEFAULT_RUNNER": "local-docker",
"KUBIYA_OPA_ENFORCE": "false",
"KUBIYA_LOG_LEVEL": "DEBUG"
}
}
}
}
# In Claude Desktop, you can now ask:
# 1. Infrastructure Management
"""
User: "Check the health of our Kubernetes cluster and restart any failed pods"
Claude: I'll help you check your Kubernetes cluster health and restart any failed pods. Let me break this down into steps:
1. First, I'll check for failed pods across all namespaces
2. If any failed pods are found, I'll restart them
3. I'll verify the restart was successful
Let me start by checking for failed pods:
"""
# Claude then uses execute_tool to run:
{
"tool_name": "kubectl",
"args": {
"command": "get pods --all-namespaces --field-selector=status.phase=Failed"
},
"runner": "auto"
}
# 2. Development Workflow
"""
User: "Deploy our application version 2.1.0 to staging, run tests, and if successful, deploy to production"
Claude: I'll orchestrate a complete deployment pipeline for version 2.1.0. This involves:
1. Building the application with version 2.1.0
2. Deploying to staging environment
3. Running automated tests
4. If tests pass, deploying to production
5. Monitoring the deployment
Let me start by building the application:
"""
# Claude creates and executes a workflow:
{
"tool_name": "execute_workflow",
"args": {
"workflow_definition": {
"name": "deploy-v2.1.0",
"steps": [
{
"name": "build",
"image": "docker:latest",
"command": "docker build -t myapp:2.1.0 ."
},
{
"name": "deploy-staging",
"image": "bitnami/kubectl:latest",
"command": "kubectl apply -f k8s/staging/ && kubectl set image deployment/myapp myapp=myapp:2.1.0",
"depends_on": ["build"]
},
{
"name": "test",
"image": "python:3.11",
"command": "python -m pytest tests/integration/",
"depends_on": ["deploy-staging"]
},
{
"name": "deploy-production",
"image": "bitnami/kubectl:latest",
"command": "kubectl apply -f k8s/production/ && kubectl set image deployment/myapp myapp=myapp:2.1.0",
"depends_on": ["test"]
}
]
}
}
}
# .cursor-settings.json
{
"mcp.servers": {
"kubiya": {
"command": "kubiya",
"args": ["mcp", "serve"],
"env": {
"KUBIYA_API_KEY": "kb-your-api-key",
"KUBIYA_DEFAULT_RUNNER": "local-docker"
}
}
}
}
# In Cursor Composer:
# 1. Database Operations
"""
User: "Create a PostgreSQL database, run migrations, and populate with test data"
Cursor: I'll help you set up a PostgreSQL database with migrations and test data. Let me break this down:
1. Start a PostgreSQL container
2. Run database migrations
3. Populate with test data
4. Verify the setup
Let me start by creating the PostgreSQL database:
"""
# Cursor executes:
{
"tool_name": "execute_tool",
"args": {
"tool_name": "postgres",
"args": {
"command": "docker run -d --name test-db -e POSTGRES_PASSWORD=password -p 5432:5432 postgres:15"
}
}
}
# 2. Code Analysis and Deployment
"""
User: "Analyze the performance of this Python application and suggest optimizations"
Cursor: I'll analyze your Python application's performance and provide optimization suggestions. Let me:
1. Run performance profiling
2. Analyze dependencies
3. Check for common bottlenecks
4. Generate optimization recommendations
Starting with performance profiling:
"""
# Cursor creates a custom tool:
{
"tool_name": "create_on_demand_tool",
"args": {
"name": "python-profiler",
"description": "Profile Python application performance",
"container": {
"image": "python:3.11",
"command": ["python", "-m", "cProfile", "-o", "profile.prof", "app.py"]
}
}
}
# chatgpt_kubiya_plugin.py
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import StdioServerTransport
class KubiyaChatGPTPlugin:
def __init__(self, api_key):
self.api_key = api_key
self.session = None
async def initialize(self):
"""Initialize MCP connection"""
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={"KUBIYA_API_KEY": self.api_key}
)
self.transport = StdioServerTransport(server_params)
read, write = await self.transport.__aenter__()
self.session = ClientSession(read, write)
await self.session.initialize()
async def execute_tool(self, tool_name, arguments):
"""Execute Kubiya tool via MCP"""
if not self.session:
await self.initialize()
result = await self.session.call_tool(tool_name, arguments)
return result.content
async def list_available_tools(self):
"""Get list of available tools"""
if not self.session:
await self.initialize()
tools = await self.session.list_tools()
return [{"name": t.name, "description": t.description} for t in tools.tools]
# ChatGPT Plugin Manifest
PLUGIN_MANIFEST = {
"schema_version": "v1",
"name_for_human": "Kubiya Automation",
"name_for_model": "kubiya",
"description_for_human": "Execute automation tools and workflows on any infrastructure",
"description_for_model": "Execute containerized tools, manage infrastructure, run workflows, and handle enterprise automation tasks through Kubiya's MCP server",
"auth": {
"type": "user_http",
"authorization_type": "bearer"
},
"api": {
"type": "openapi",
"url": "https://api.kubiya.ai/openapi.json"
},
"logo_url": "https://kubiya.ai/logo.png",
"contact_email": "support@kubiya.ai",
"legal_info_url": "https://kubiya.ai/legal"
}
from langchain.tools import BaseTool
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import StdioServerTransport
class KubiyaTool(BaseTool):
name = "kubiya_executor"
description = "Execute automation tools and workflows using Kubiya"
def __init__(self, api_key):
super().__init__()
self.api_key = api_key
self.session = None
async def _arun(self, tool_name: str, arguments: dict) -> str:
"""Async version of the tool"""
if not self.session:
await self._initialize_session()
result = await self.session.call_tool(tool_name, arguments)
return str(result.content)
def _run(self, tool_name: str, arguments: dict) -> str:
"""Sync version of the tool"""
return asyncio.run(self._arun(tool_name, arguments))
async def _initialize_session(self):
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={"KUBIYA_API_KEY": self.api_key}
)
self.transport = StdioServerTransport(server_params)
read, write = await self.transport.__aenter__()
self.session = ClientSession(read, write)
await self.session.initialize()
# Example usage
llm = OpenAI(temperature=0)
kubiya_tool = KubiyaTool(api_key="kb-your-key")
agent = initialize_agent(
tools=[kubiya_tool],
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# Now you can use the agent
result = agent.run("Deploy my application to production and run health checks")
from llama_index.tools import BaseTool
from llama_index.agent import OpenAIAgent
import asyncio
from mcp import ClientSession, StdioServerParameters
class KubiyaLlamaIndexTool(BaseTool):
def __init__(self, api_key):
self.api_key = api_key
self.session = None
metadata = ToolMetadata(
name="kubiya_executor",
description="Execute automation tools and workflows using Kubiya MCP server. "
"Can run containerized tools, manage infrastructure, and execute workflows.",
fn_schema=ExecuteToolSchema
)
super().__init__(metadata=metadata)
async def acall(self, tool_name: str, arguments: dict) -> str:
if not self.session:
await self._initialize_session()
result = await self.session.call_tool(tool_name, arguments)
return str(result.content)
def call(self, tool_name: str, arguments: dict) -> str:
return asyncio.run(self.acall(tool_name, arguments))
# Schema for tool parameters
class ExecuteToolSchema(BaseModel):
tool_name: str = Field(description="Name of the tool to execute")
arguments: dict = Field(description="Arguments for the tool")
# Create agent with Kubiya tool
kubiya_tool = KubiyaLlamaIndexTool(api_key="kb-your-key")
agent = OpenAIAgent.from_tools([kubiya_tool], verbose=True)
# Use the agent
response = agent.chat("Check the status of our Kubernetes cluster and scale up if needed")
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import StdioServerTransport
app = FastAPI(title="Kubiya MCP Proxy")
class ToolRequest(BaseModel):
tool_name: str
arguments: dict
runner: str = "auto"
class KubiyaMCPClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = None
async def initialize(self):
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={"KUBIYA_API_KEY": self.api_key}
)
self.transport = StdioServerTransport(server_params)
read, write = await self.transport.__aenter__()
self.session = ClientSession(read, write)
await self.session.initialize()
async def execute_tool(self, tool_name, arguments):
if not self.session:
await self.initialize()
result = await self.session.call_tool(tool_name, arguments)
return result
# Initialize client
mcp_client = KubiyaMCPClient(api_key="kb-your-key")
@app.post("/execute-tool")
async def execute_tool(request: ToolRequest):
try:
result = await mcp_client.execute_tool(
request.tool_name,
request.arguments
)
return {"success": True, "result": result.content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/tools")
async def list_tools():
try:
if not mcp_client.session:
await mcp_client.initialize()
tools = await mcp_client.session.list_tools()
return {
"tools": [
{"name": t.name, "description": t.description}
for t in tools.tools
]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# WebSocket endpoint for streaming
@app.websocket("/execute-tool-stream")
async def execute_tool_stream(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
tool_name = data["tool_name"]
arguments = data["arguments"]
# Execute tool with streaming
async for chunk in mcp_client.execute_tool_stream(tool_name, arguments):
await websocket.send_json({
"type": "output",
"content": chunk
})
except WebSocketDisconnect:
print("Client disconnected")
# multi_tenant_kubiya.py
import asyncio
from typing import Dict, Optional
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import StdioServerTransport
class MultiTenantKubiyaManager:
def __init__(self):
self.tenant_sessions: Dict[str, ClientSession] = {}
self.tenant_configs: Dict[str, dict] = {}
async def add_tenant(self, tenant_id: str, config: dict):
"""Add a new tenant configuration"""
self.tenant_configs[tenant_id] = config
await self._initialize_tenant_session(tenant_id)
async def _initialize_tenant_session(self, tenant_id: str):
"""Initialize MCP session for a tenant"""
config = self.tenant_configs[tenant_id]
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={
"KUBIYA_API_KEY": config["api_key"],
"KUBIYA_DEFAULT_RUNNER": config.get("default_runner", "auto"),
"KUBIYA_OPA_ENFORCE": config.get("opa_enforce", "true")
}
)
transport = StdioServerTransport(server_params)
read, write = await transport.__aenter__()
session = ClientSession(read, write)
await session.initialize()
self.tenant_sessions[tenant_id] = session
async def execute_tool_for_tenant(self, tenant_id: str, tool_name: str, arguments: dict):
"""Execute tool for specific tenant"""
if tenant_id not in self.tenant_sessions:
await self._initialize_tenant_session(tenant_id)
session = self.tenant_sessions[tenant_id]
result = await session.call_tool(tool_name, arguments)
return result
# Usage example
manager = MultiTenantKubiyaManager()
# Add tenants
await manager.add_tenant("tenant-a", {
"api_key": "kb-tenant-a-key",
"default_runner": "tenant-a-k8s",
"opa_enforce": "true"
})
await manager.add_tenant("tenant-b", {
"api_key": "kb-tenant-b-key",
"default_runner": "tenant-b-docker",
"opa_enforce": "false"
})
# Execute tools for different tenants
result_a = await manager.execute_tool_for_tenant("tenant-a", "kubectl", {
"command": "get pods -n tenant-a"
})
result_b = await manager.execute_tool_for_tenant("tenant-b", "docker", {
"command": "ps"
})
# .github/workflows/kubiya-integration.yml
name: Kubiya AI-Powered CI/CD
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
ai-powered-deployment:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Kubiya CLI
run: |
curl -fsSL https://raw.githubusercontent.com/kubiyabot/cli/main/install.sh | bash
echo "$HOME/.kubiya/bin" >> $GITHUB_PATH
- name: AI-Powered Deployment
run: |
# Use Kubiya MCP to intelligently deploy
python3 << 'EOF'
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import StdioServerTransport
async def ai_deployment():
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={
"KUBIYA_API_KEY": "${{ secrets.KUBIYA_API_KEY }}",
"KUBIYA_DEFAULT_RUNNER": "github-actions"
}
)
async with StdioServerTransport(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# AI analyzes the code changes and determines deployment strategy
result = await session.call_tool("execute_workflow", {
"workflow_definition": {
"name": "ai-deployment",
"steps": [
{
"name": "analyze-changes",
"image": "python:3.11",
"command": "python scripts/analyze_changes.py"
},
{
"name": "smart-test",
"image": "python:3.11",
"command": "python -m pytest --smart-selection",
"depends_on": ["analyze-changes"]
},
{
"name": "deploy",
"image": "bitnami/kubectl:latest",
"command": "kubectl apply -f k8s/",
"depends_on": ["smart-test"]
}
]
}
})
print(f"Deployment result: {result.content}")
asyncio.run(ai_deployment())
EOF
env:
KUBIYA_API_KEY: ${{ secrets.KUBIYA_API_KEY }}
# slack_kubiya_bot.py
import asyncio
import re
from slack_bolt.async_app import AsyncApp
from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import StdioServerTransport
class KubiyaSlackBot:
def __init__(self, slack_bot_token, slack_app_token, kubiya_api_key):
self.app = AsyncApp(token=slack_bot_token)
self.kubiya_api_key = kubiya_api_key
self.mcp_session = None
# Register event handlers
self.app.message(re.compile(r"kubiya", re.IGNORECASE))(self.handle_kubiya_request)
self.app.command("/kubiya")(self.handle_kubiya_command)
async def initialize_mcp(self):
"""Initialize MCP connection"""
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={"KUBIYA_API_KEY": self.kubiya_api_key}
)
self.transport = StdioServerTransport(server_params)
read, write = await self.transport.__aenter__()
self.mcp_session = ClientSession(read, write)
await self.mcp_session.initialize()
async def handle_kubiya_request(self, message, say):
"""Handle natural language requests"""
if not self.mcp_session:
await self.initialize_mcp()
user_message = message['text']
# Parse natural language request
await say(f"🤖 Processing your request: {user_message}")
try:
# Use AI to determine the appropriate tool and arguments
# This is a simplified example - in practice, you'd use NLP
if "deploy" in user_message.lower():
result = await self.mcp_session.call_tool("execute_workflow", {
"workflow_definition": {
"name": "slack-deploy",
"steps": [
{
"name": "deploy",
"image": "bitnami/kubectl:latest",
"command": "kubectl get deployments"
}
]
}
})
elif "pods" in user_message.lower():
result = await self.mcp_session.call_tool("execute_tool", {
"tool_name": "kubectl",
"args": {"command": "get pods -A"}
})
else:
result = await self.mcp_session.call_tool("list_tools", {})
await say(f"✅ Result:\n```\n{result.content}\n```")
except Exception as e:
await say(f"❌ Error: {str(e)}")
async def handle_kubiya_command(self, ack, command, say):
"""Handle /kubiya slash command"""
await ack()
if not self.mcp_session:
await self.initialize_mcp()
command_text = command['text']
try:
# Parse command format: /kubiya tool_name arg1=value1 arg2=value2
parts = command_text.split()
tool_name = parts[0]
arguments = {}
for part in parts[1:]:
if '=' in part:
key, value = part.split('=', 1)
arguments[key] = value
result = await self.mcp_session.call_tool(tool_name, arguments)
await say(f"✅ Command executed:\n```\n{result.content}\n```")
except Exception as e:
await say(f"❌ Error executing command: {str(e)}")
async def start(self):
"""Start the Slack bot"""
handler = AsyncSocketModeHandler(self.app, slack_app_token)
await handler.start_async()
# Usage
bot = KubiyaSlackBot(
slack_bot_token="xoxb-your-token",
slack_app_token="xapp-your-token",
kubiya_api_key="kb-your-key"
)
asyncio.run(bot.start())
# voice_kubiya_assistant.py
import asyncio
import speech_recognition as sr
import pyttsx3
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import StdioServerTransport
class VoiceKubiyaAssistant:
def __init__(self, kubiya_api_key):
self.kubiya_api_key = kubiya_api_key
self.recognizer = sr.Recognizer()
self.microphone = sr.Microphone()
self.tts = pyttsx3.init()
self.mcp_session = None
async def initialize_mcp(self):
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={"KUBIYA_API_KEY": self.kubiya_api_key}
)
self.transport = StdioServerTransport(server_params)
read, write = await self.transport.__aenter__()
self.mcp_session = ClientSession(read, write)
await self.mcp_session.initialize()
def listen_for_command(self):
"""Listen for voice commands"""
with self.microphone as source:
self.recognizer.adjust_for_ambient_noise(source)
print("Listening for Kubiya commands...")
with self.microphone as source:
audio = self.recognizer.listen(source)
try:
command = self.recognizer.recognize_google(audio)
print(f"Recognized: {command}")
return command
except sr.UnknownValueError:
print("Could not understand audio")
return None
def speak(self, text):
"""Convert text to speech"""
self.tts.say(text)
self.tts.runAndWait()
async def process_voice_command(self, command):
"""Process voice command and execute via Kubiya"""
if not self.mcp_session:
await self.initialize_mcp()
# Simple command parsing (in practice, use NLP)
if "status" in command.lower() and "cluster" in command.lower():
result = await self.mcp_session.call_tool("execute_tool", {
"tool_name": "kubectl",
"args": {"command": "get nodes"}
})
self.speak("Cluster status retrieved successfully")
return result.content
elif "deploy" in command.lower():
self.speak("Starting deployment process")
result = await self.mcp_session.call_tool("execute_workflow", {
"workflow_definition": {
"name": "voice-deploy",
"steps": [
{
"name": "deploy",
"image": "bitnami/kubectl:latest",
"command": "kubectl apply -f k8s/"
}
]
}
})
self.speak("Deployment completed")
return result.content
else:
self.speak("Command not recognized")
return None
async def run(self):
"""Main voice assistant loop"""
await self.initialize_mcp()
while True:
command = self.listen_for_command()
if command:
if "exit" in command.lower():
self.speak("Goodbye!")
break
result = await self.process_voice_command(command)
if result:
print(f"Result: {result}")
# Usage
assistant = VoiceKubiyaAssistant(kubiya_api_key="kb-your-key")
asyncio.run(assistant.run())
// react_native_kubiya.js
import { NativeModules } from 'react-native';
class KubiyaMobileClient {
constructor(apiKey) {
this.apiKey = apiKey;
this.initialized = false;
}
async initialize() {
if (this.initialized) return;
try {
// Initialize MCP connection via native module
await NativeModules.KubiyaMCP.initialize({
apiKey: this.apiKey,
command: 'kubiya',
args: ['mcp', 'serve']
});
this.initialized = true;
} catch (error) {
console.error('Failed to initialize Kubiya MCP:', error);
throw error;
}
}
async executeTool(toolName, arguments) {
if (!this.initialized) {
await this.initialize();
}
try {
const result = await NativeModules.KubiyaMCP.callTool(toolName, arguments);
return result;
} catch (error) {
console.error('Tool execution failed:', error);
throw error;
}
}
async listTools() {
if (!this.initialized) {
await this.initialize();
}
try {
const tools = await NativeModules.KubiyaMCP.listTools();
return tools;
} catch (error) {
console.error('Failed to list tools:', error);
throw error;
}
}
}
// React Native Component
import React, { useState, useEffect } from 'react';
import { View, Text, Button, TextInput, FlatList, Alert } from 'react-native';
const KubiyaApp = () => {
const [client, setClient] = useState(null);
const [tools, setTools] = useState([]);
const [command, setCommand] = useState('');
const [output, setOutput] = useState('');
useEffect(() => {
const initClient = async () => {
try {
const kubiyaClient = new KubiyaMobileClient('kb-your-key');
await kubiyaClient.initialize();
setClient(kubiyaClient);
const availableTools = await kubiyaClient.listTools();
setTools(availableTools);
} catch (error) {
Alert.alert('Error', 'Failed to initialize Kubiya client');
}
};
initClient();
}, []);
const executeCommand = async () => {
if (!client || !command) return;
try {
const result = await client.executeTool('execute_tool', {
tool_name: 'kubectl',
args: { command: command }
});
setOutput(result.content);
} catch (error) {
Alert.alert('Error', 'Failed to execute command');
}
};
return (
<View style={{ flex: 1, padding: 20 }}>
<Text style={{ fontSize: 24, fontWeight: 'bold', marginBottom: 20 }}>
Kubiya Mobile
</Text>
<TextInput
style={{
borderWidth: 1,
borderColor: '#ccc',
padding: 10,
marginBottom: 10,
borderRadius: 5
}}
value={command}
onChangeText={setCommand}
placeholder="Enter kubectl command"
/>
<Button title="Execute" onPress={executeCommand} />
<Text style={{ marginTop: 20, fontSize: 16, fontWeight: 'bold' }}>
Output:
</Text>
<Text style={{ marginTop: 10, fontFamily: 'monospace' }}>
{output}
</Text>
<Text style={{ marginTop: 20, fontSize: 16, fontWeight: 'bold' }}>
Available Tools:
</Text>
<FlatList
data={tools}
keyExtractor={(item) => item.name}
renderItem={({ item }) => (
<View style={{ padding: 10, borderBottomWidth: 1, borderBottomColor: '#eee' }}>
<Text style={{ fontWeight: 'bold' }}>{item.name}</Text>
<Text>{item.description}</Text>
</View>
)}
/>
</View>
);
};
export default KubiyaApp;
# metrics_integration.py
import asyncio
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import StdioServerTransport
class KubiyaMetricsCollector:
def __init__(self, kubiya_api_key):
self.kubiya_api_key = kubiya_api_key
# Prometheus metrics
self.tool_executions = Counter(
'kubiya_tool_executions_total',
'Total number of tool executions',
['tool_name', 'status', 'integration']
)
self.execution_duration = Histogram(
'kubiya_execution_duration_seconds',
'Tool execution duration',
['tool_name', 'integration']
)
self.active_sessions = Gauge(
'kubiya_active_sessions',
'Number of active MCP sessions',
['integration']
)
async def monitored_execute_tool(self, tool_name, arguments, integration_name):
"""Execute tool with metrics collection"""
start_time = time.time()
try:
# Initialize MCP session
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={"KUBIYA_API_KEY": self.kubiya_api_key}
)
async with StdioServerTransport(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
self.active_sessions.labels(integration=integration_name).inc()
result = await session.call_tool(tool_name, arguments)
# Record successful execution
self.tool_executions.labels(
tool_name=tool_name,
status='success',
integration=integration_name
).inc()
return result
except Exception as e:
# Record failed execution
self.tool_executions.labels(
tool_name=tool_name,
status='error',
integration=integration_name
).inc()
raise
finally:
# Record execution duration
duration = time.time() - start_time
self.execution_duration.labels(
tool_name=tool_name,
integration=integration_name
).observe(duration)
self.active_sessions.labels(integration=integration_name).dec()
# Start metrics server
start_http_server(8000)
metrics_collector = KubiyaMetricsCollector("kb-your-key")
async def robust_tool_execution(session, tool_name, arguments, max_retries=3):
"""Robust tool execution with retry logic"""
for attempt in range(max_retries):
try:
result = await session.call_tool(tool_name, arguments)
return result
except Exception as e:
if attempt == max_retries - 1:
raise
# Exponential backoff
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
# Reinitialize session if needed
if "connection" in str(e).lower():
await session.initialize()
class MCPConnectionPool:
def __init__(self, api_key, pool_size=5):
self.api_key = api_key
self.pool_size = pool_size
self.connections = asyncio.Queue(maxsize=pool_size)
self.initialized = False
async def initialize(self):
for _ in range(self.pool_size):
connection = await self._create_connection()
await self.connections.put(connection)
self.initialized = True
async def _create_connection(self):
server_params = StdioServerParameters(
command="kubiya",
args=["mcp", "serve"],
env={"KUBIYA_API_KEY": self.api_key}
)
transport = StdioServerTransport(server_params)
read, write = await transport.__aenter__()
session = ClientSession(read, write)
await session.initialize()
return session
async def execute_tool(self, tool_name, arguments):
if not self.initialized:
await self.initialize()
connection = await self.connections.get()
try:
result = await connection.call_tool(tool_name, arguments)
return result
finally:
await self.connections.put(connection)
import os
import jwt
from datetime import datetime, timedelta
class SecureKubiyaClient:
def __init__(self, api_key_path=None):
# Load API key from secure location
if api_key_path:
with open(api_key_path, 'r') as f:
self.api_key = f.read().strip()
else:
self.api_key = os.getenv('KUBIYA_API_KEY')
if not self.api_key:
raise ValueError("API key not found")
def generate_session_token(self, user_id, permissions):
"""Generate JWT token for session"""
payload = {
'user_id': user_id,
'permissions': permissions,
'exp': datetime.utcnow() + timedelta(hours=1)
}
return jwt.encode(payload, self.api_key, algorithm='HS256')
def validate_session_token(self, token):
"""Validate JWT token"""
try:
payload = jwt.decode(token, self.api_key, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Session expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid session token")
Was this page helpful?