The Kubiya Tools SDK enables you to create custom, reusable tools that integrate seamlessly with Kubiya workflows. Tools are containerized, stateless operations that can be shared across teams and organizations.

Why Create Custom Tools?

Organization-Specific Logic

Encode your company’s specific processes and procedures

API Integrations

Connect to internal services and proprietary systems

Reusable Components

Share tools across teams and workflows

Community Contribution

Contribute to the @community-tools collection

Tool Structure

Every Kubiya tool follows a consistent structure:
from kubiya_sdk.tools import Tool, Arg

# Define a tool
my_tool = Tool(
    name="service-health-checker",
    description="Check health status of microservices",
    image="python:3.11-slim",
    requirements=["requests", "pyyaml"],
    
    # Tool arguments
    args=[
        Arg(
            name="service_url",
            description="Base URL of the service to check",
            type="str",
            required=True
        ),
        Arg(
            name="timeout",
            description="Request timeout in seconds", 
            type="int",
            default=30
        ),
        Arg(
            name="expected_status",
            description="Expected HTTP status code",
            type="int", 
            default=200
        )
    ],
    
    # Tool execution script
    content="""
import requests
import sys
import json

def check_service_health(url, timeout, expected_status):
    try:
        response = requests.get(f"{url}/health", timeout=timeout)
        
        result = {
            "url": url,
            "status_code": response.status_code,
            "expected_status": expected_status,
            "healthy": response.status_code == expected_status,
            "response_time_ms": response.elapsed.total_seconds() * 1000,
            "response_body": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text
        }
        
        print(json.dumps(result, indent=2))
        return 0 if result["healthy"] else 1
        
    except requests.RequestException as e:
        error_result = {
            "url": url,
            "error": str(e),
            "healthy": False
        }
        print(json.dumps(error_result, indent=2))
        return 1

if __name__ == "__main__":
    service_url = sys.argv[1]
    timeout = int(sys.argv[2])
    expected_status = int(sys.argv[3])
    
    exit_code = check_service_health(service_url, timeout, expected_status)
    sys.exit(exit_code)
"""
)

Community Tools Examples

Learn from real community tools in the @community-tools collection:

Database Tools

# @community-tools/postgres-query
from kubiya_sdk.tools import Tool, Arg

postgres_query = Tool(
    name="postgres-query",
    description="Execute PostgreSQL queries and return results",
    image="postgres:15-alpine",
    
    args=[
        Arg("query", "SQL query to execute", type="str", required=True),
        Arg("database", "Database name", type="str", required=True),
        Arg("host", "PostgreSQL host", type="str", default="localhost"),
        Arg("port", "PostgreSQL port", type="int", default=5432),
        Arg("username", "Database username", type="str", required=True),
        Arg("password", "Database password", type="str", required=True, sensitive=True)
    ],
    
    content="""
#!/bin/bash
set -e

export PGPASSWORD="$6"  # password argument
export PGHOST="$3"      # host argument  
export PGPORT="$4"      # port argument
export PGUSER="$5"      # username argument
export PGDATABASE="$2"  # database argument

echo "Executing query on database: $PGDATABASE"
echo "Query: $1"
echo "---"

# Execute query and format output as JSON
psql -t -c "$1" | jq -R -s 'split("\\n") | map(select(. != "")) | map(split("|") | map(gsub("^\\s+|\\s+$"; "")))'
"""
)

Cloud Tools

# @community-tools/aws-cost-analyzer
from kubiya_sdk.tools import Tool, Arg

aws_cost_analyzer = Tool(
    name="aws-cost-analyzer",
    description="Analyze AWS costs and provide optimization recommendations",
    image="amazon/aws-cli:latest",
    requirements=["boto3", "pandas", "matplotlib"],
    
    args=[
        Arg("start_date", "Start date for cost analysis (YYYY-MM-DD)", type="str", required=True),
        Arg("end_date", "End date for cost analysis (YYYY-MM-DD)", type="str", required=True),
        Arg("service_filter", "AWS service to filter by (optional)", type="str", default=""),
        Arg("group_by", "Group costs by (SERVICE, REGION, INSTANCE_TYPE)", type="str", default="SERVICE"),
        Arg("generate_chart", "Generate cost visualization chart", type="bool", default=True)
    ],
    
    env=["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_DEFAULT_REGION"],
    
    content="""
import boto3
import pandas as pd
import matplotlib.pyplot as plt
import json
import sys
from datetime import datetime

def analyze_aws_costs(start_date, end_date, service_filter, group_by, generate_chart):
    ce_client = boto3.client('ce')
    
    # Prepare Cost Explorer query
    query_params = {
        'TimePeriod': {
            'Start': start_date,
            'End': end_date
        },
        'Granularity': 'DAILY',
        'Metrics': ['BlendedCost'],
        'GroupBy': [{'Type': 'DIMENSION', 'Key': group_by}]
    }
    
    if service_filter:
        query_params['Filter'] = {
            'Dimensions': {
                'Key': 'SERVICE',
                'Values': [service_filter]
            }
        }
    
    try:
        response = ce_client.get_cost_and_usage(**query_params)
        
        # Process results
        cost_data = []
        total_cost = 0
        
        for result_by_time in response['ResultsByTime']:
            date = result_by_time['TimePeriod']['Start']
            
            for group in result_by_time['Groups']:
                group_key = group['Keys'][0]
                amount = float(group['Metrics']['BlendedCost']['Amount'])
                
                cost_data.append({
                    'Date': date,
                    group_by: group_key,
                    'Cost': amount
                })
                total_cost += amount
        
        # Create DataFrame for analysis
        df = pd.DataFrame(cost_data)
        
        # Generate summary
        summary = {
            'total_cost': round(total_cost, 2),
            'date_range': f"{start_date} to {end_date}",
            'group_by': group_by,
            'service_filter': service_filter or 'All Services',
            'top_cost_items': df.groupby(group_by)['Cost'].sum().sort_values(ascending=False).head(10).to_dict()
        }
        
        # Generate chart if requested
        chart_path = None
        if generate_chart and not df.empty:
            plt.figure(figsize=(12, 8))
            
            # Group by the specified dimension and sum costs
            grouped_costs = df.groupby(group_by)['Cost'].sum().sort_values(ascending=False)
            
            # Plot top 10 items
            top_10 = grouped_costs.head(10)
            plt.bar(range(len(top_10)), top_10.values)
            plt.xticks(range(len(top_10)), top_10.index, rotation=45, ha='right')
            plt.ylabel('Cost ($)')
            plt.title(f'AWS Costs by {group_by} ({start_date} to {end_date})')
            plt.tight_layout()
            
            chart_path = 'aws_cost_analysis.png'
            plt.savefig(chart_path, dpi=150, bbox_inches='tight')
            summary['chart_generated'] = chart_path
        
        # Output results
        print(json.dumps(summary, indent=2))
        
        # Recommendations
        recommendations = []
        
        if 'EC2' in summary['top_cost_items']:
            recommendations.append("Consider Reserved Instances for EC2 to save up to 75%")
        
        if total_cost > 1000:
            recommendations.append("Enable AWS Cost Anomaly Detection for this account")
        
        if len(summary['top_cost_items']) > 5:
            recommendations.append("Review unused resources in top spending services")
        
        if recommendations:
            print("\\n--- Cost Optimization Recommendations ---")
            for rec in recommendations:
                print(f"• {rec}")
        
        return 0
        
    except Exception as e:
        error_result = {
            'error': str(e),
            'message': 'Failed to analyze AWS costs'
        }
        print(json.dumps(error_result, indent=2))
        return 1

if __name__ == "__main__":
    start_date = sys.argv[1]
    end_date = sys.argv[2] 
    service_filter = sys.argv[3]
    group_by = sys.argv[4]
    generate_chart = sys.argv[5].lower() == 'true'
    
    exit_code = analyze_aws_costs(start_date, end_date, service_filter, group_by, generate_chart)
    sys.exit(exit_code)
"""
)

DevOps Tools

# @community-tools/github-pr-analyzer
from kubiya_sdk.tools import Tool, Arg

github_pr_analyzer = Tool(
    name="github-pr-analyzer",
    description="Analyze GitHub pull requests for code quality and review metrics",
    image="python:3.11-slim",
    requirements=["requests", "python-dateutil"],
    
    args=[
        Arg("repository", "GitHub repository (owner/repo)", type="str", required=True),
        Arg("pr_number", "Pull request number to analyze", type="int", required=True),
        Arg("github_token", "GitHub API token", type="str", required=True, sensitive=True),
        Arg("include_files", "Include changed files analysis", type="bool", default=True),
        Arg("check_tests", "Check if tests are included", type="bool", default=True)
    ],
    
    content="""
import requests
import json
import sys
from datetime import datetime
from dateutil import parser

def analyze_pr(repo, pr_number, token, include_files, check_tests):
    headers = {
        'Authorization': f'token {token}',
        'Accept': 'application/vnd.github.v3+json'
    }
    
    base_url = f'https://api.github.com/repos/{repo}'
    
    try:
        # Get PR details
        pr_response = requests.get(f'{base_url}/pulls/{pr_number}', headers=headers)
        pr_response.raise_for_status()
        pr_data = pr_response.json()
        
        # Get PR reviews
        reviews_response = requests.get(f'{base_url}/pulls/{pr_number}/reviews', headers=headers)
        reviews_response.raise_for_status()
        reviews = reviews_response.json()
        
        # Get PR files if requested
        files = []
        if include_files:
            files_response = requests.get(f'{base_url}/pulls/{pr_number}/files', headers=headers)
            files_response.raise_for_status()
            files = files_response.json()
        
        # Analyze the PR
        analysis = {
            'pr_info': {
                'number': pr_data['number'],
                'title': pr_data['title'],
                'author': pr_data['user']['login'],
                'state': pr_data['state'],
                'created_at': pr_data['created_at'],
                'updated_at': pr_data['updated_at'],
                'mergeable': pr_data['mergeable'],
                'draft': pr_data['draft']
            },
            'metrics': {
                'additions': pr_data['additions'],
                'deletions': pr_data['deletions'],
                'changed_files': pr_data['changed_files'],
                'commits': pr_data['commits']
            },
            'reviews': {
                'total_reviews': len(reviews),
                'approved_reviews': len([r for r in reviews if r['state'] == 'APPROVED']),
                'changes_requested': len([r for r in reviews if r['state'] == 'CHANGES_REQUESTED']),
                'reviewers': list(set([r['user']['login'] for r in reviews]))
            }
        }
        
        # File analysis
        if include_files:
            file_analysis = {
                'languages': {},
                'file_types': {},
                'largest_files': []
            }
            
            for file in files:
                filename = file['filename']
                changes = file['additions'] + file['deletions']
                
                # Determine language/type
                if '.' in filename:
                    ext = filename.split('.')[-1].lower()
                    file_analysis['file_types'][ext] = file_analysis['file_types'].get(ext, 0) + 1
                
                # Track largest files by changes
                file_analysis['largest_files'].append({
                    'filename': filename,
                    'changes': changes,
                    'additions': file['additions'],
                    'deletions': file['deletions']
                })
            
            # Sort by changes
            file_analysis['largest_files'].sort(key=lambda x: x['changes'], reverse=True)
            file_analysis['largest_files'] = file_analysis['largest_files'][:10]
            
            analysis['file_analysis'] = file_analysis
            
            # Check for tests if requested
            if check_tests:
                test_files = [f for f in files if 'test' in f['filename'].lower() or 'spec' in f['filename'].lower()]
                analysis['testing'] = {
                    'has_test_files': len(test_files) > 0,
                    'test_files_count': len(test_files),
                    'test_files': [f['filename'] for f in test_files]
                }
        
        # Calculate PR health score
        health_score = 100
        
        # Reduce score for large PRs
        if pr_data['changed_files'] > 20:
            health_score -= 10
        if pr_data['additions'] + pr_data['deletions'] > 1000:
            health_score -= 15
        
        # Reduce score for no reviews
        if len(reviews) == 0:
            health_score -= 20
        elif analysis['reviews']['approved_reviews'] == 0:
            health_score -= 10
        
        # Reduce score if no tests (when checking)
        if check_tests and include_files:
            if not analysis['testing']['has_test_files'] and analysis['metrics']['additions'] > 50:
                health_score -= 15
        
        analysis['health_score'] = max(0, health_score)
        
        # Recommendations
        recommendations = []
        
        if pr_data['changed_files'] > 15:
            recommendations.append("Consider breaking this PR into smaller chunks for easier review")
        
        if len(reviews) == 0:
            recommendations.append("Request reviews from team members")
        
        if analysis['reviews']['changes_requested'] > analysis['reviews']['approved_reviews']:
            recommendations.append("Address review comments before merging")
        
        if check_tests and include_files and not analysis['testing']['has_test_files'] and analysis['metrics']['additions'] > 20:
            recommendations.append("Consider adding tests for new functionality")
        
        analysis['recommendations'] = recommendations
        
        print(json.dumps(analysis, indent=2))
        return 0
        
    except requests.RequestException as e:
        error_result = {
            'error': str(e),
            'message': 'Failed to analyze GitHub PR'
        }
        print(json.dumps(error_result, indent=2))
        return 1

if __name__ == "__main__":
    repository = sys.argv[1]
    pr_number = int(sys.argv[2])
    github_token = sys.argv[3]
    include_files = sys.argv[4].lower() == 'true'
    check_tests = sys.argv[5].lower() == 'true'
    
    exit_code = analyze_pr(repository, pr_number, github_token, include_files, check_tests)
    sys.exit(exit_code)
"""
)

Monitoring Tools

# @community-tools/prometheus-alert-manager
from kubiya_sdk.tools import Tool, Arg

prometheus_alert_manager = Tool(
    name="prometheus-alert-manager",
    description="Query Prometheus metrics and manage alerts",
    image="prom/prometheus:latest",
    requirements=["requests", "pandas"],
    
    args=[
        Arg("prometheus_url", "Prometheus server URL", type="str", required=True),
        Arg("query", "PromQL query to execute", type="str", required=True),
        Arg("start_time", "Start time for range query (ISO format)", type="str", default=""),
        Arg("end_time", "End time for range query (ISO format)", type="str", default=""),
        Arg("step", "Query resolution step", type="str", default="15s"),
        Arg("alert_threshold", "Threshold value for alerting", type="float", default=0)
    ],
    
    content="""
import requests
import json
import sys
from datetime import datetime, timedelta
import pandas as pd

def query_prometheus(prom_url, query, start_time, end_time, step, alert_threshold):
    try:
        # Determine query type
        if start_time and end_time:
            # Range query
            params = {
                'query': query,
                'start': start_time,
                'end': end_time,
                'step': step
            }
            response = requests.get(f'{prom_url}/api/v1/query_range', params=params)
        else:
            # Instant query
            params = {'query': query}
            response = requests.get(f'{prom_url}/api/v1/query', params=params)
        
        response.raise_for_status()
        data = response.json()
        
        if data['status'] != 'success':
            raise Exception(f"Prometheus query failed: {data.get('error', 'Unknown error')}")
        
        result = data['data']['result']
        
        # Process results
        analysis = {
            'query': query,
            'query_type': 'range' if start_time and end_time else 'instant',
            'timestamp': datetime.now().isoformat(),
            'result_count': len(result),
            'results': []
        }
        
        alerts_triggered = []
        
        for series in result:
            metric = series['metric']
            
            if 'values' in series:  # Range query
                values = series['values']
                latest_value = float(values[-1][1]) if values else 0
                
                series_data = {
                    'metric': metric,
                    'latest_value': latest_value,
                    'data_points': len(values),
                    'min_value': min([float(v[1]) for v in values]) if values else 0,
                    'max_value': max([float(v[1]) for v in values]) if values else 0,
                    'avg_value': sum([float(v[1]) for v in values]) / len(values) if values else 0
                }
                
            else:  # Instant query
                timestamp, value = series['value']
                latest_value = float(value)
                
                series_data = {
                    'metric': metric,
                    'value': latest_value,
                    'timestamp': timestamp
                }
            
            analysis['results'].append(series_data)
            
            # Check alert threshold
            if alert_threshold > 0 and latest_value > alert_threshold:
                alerts_triggered.append({
                    'metric': metric,
                    'current_value': latest_value,
                    'threshold': alert_threshold,
                    'severity': 'warning' if latest_value < alert_threshold * 1.5 else 'critical'
                })
        
        if alerts_triggered:
            analysis['alerts'] = {
                'triggered': True,
                'count': len(alerts_triggered),
                'alerts': alerts_triggered
            }
        else:
            analysis['alerts'] = {'triggered': False}
        
        # Summary statistics
        if analysis['results']:
            all_values = []
            for result in analysis['results']:
                if 'latest_value' in result:
                    all_values.append(result['latest_value'])
                elif 'value' in result:
                    all_values.append(result['value'])
            
            if all_values:
                analysis['summary'] = {
                    'total_series': len(all_values),
                    'min_value': min(all_values),
                    'max_value': max(all_values),
                    'avg_value': sum(all_values) / len(all_values),
                    'values_above_threshold': len([v for v in all_values if v > alert_threshold]) if alert_threshold > 0 else 0
                }
        
        print(json.dumps(analysis, indent=2))
        
        # Return non-zero exit code if alerts were triggered
        return 1 if alerts_triggered else 0
        
    except Exception as e:
        error_result = {
            'error': str(e),
            'message': 'Failed to query Prometheus',
            'query': query
        }
        print(json.dumps(error_result, indent=2))
        return 1

if __name__ == "__main__":
    prometheus_url = sys.argv[1].rstrip('/')
    query = sys.argv[2]
    start_time = sys.argv[3] if len(sys.argv) > 3 and sys.argv[3] else ""
    end_time = sys.argv[4] if len(sys.argv) > 4 and sys.argv[4] else ""
    step = sys.argv[5] if len(sys.argv) > 5 else "15s"
    alert_threshold = float(sys.argv[6]) if len(sys.argv) > 6 else 0
    
    exit_code = query_prometheus(prometheus_url, query, start_time, end_time, step, alert_threshold)
    sys.exit(exit_code)
"""
)

Tool Development Best Practices

Structure and Organization

  1. Clear Naming: Use descriptive, kebab-case names
  2. Single Responsibility: Each tool should do one thing well
  3. Comprehensive Documentation: Include detailed descriptions and examples
  4. Error Handling: Always handle errors gracefully with structured output

Input Validation

from kubiya_sdk.tools import Tool, Arg

validated_tool = Tool(
    name="validated-example",
    description="Example tool with input validation",
    
    args=[
        Arg(
            name="email",
            description="User email address",
            type="str",
            required=True,
            validation=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        ),
        Arg(
            name="priority",
            description="Task priority level",
            type="str",
            required=True,
            options=["low", "medium", "high", "critical"]
        ),
        Arg(
            name="count",
            description="Number of items to process",
            type="int",
            default=10,
            min_value=1,
            max_value=1000
        )
    ],
    
    content="""
import sys
import re
import json

def validate_inputs(email, priority, count):
    errors = []
    
    # Email validation
    if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
        errors.append("Invalid email format")
    
    # Priority validation
    if priority not in ["low", "medium", "high", "critical"]:
        errors.append("Invalid priority. Must be: low, medium, high, or critical")
    
    # Count validation  
    if not (1 <= count <= 1000):
        errors.append("Count must be between 1 and 1000")
    
    return errors

if __name__ == "__main__":
    email = sys.argv[1]
    priority = sys.argv[2]
    count = int(sys.argv[3])
    
    validation_errors = validate_inputs(email, priority, count)
    
    if validation_errors:
        result = {
            "success": False,
            "errors": validation_errors
        }
        print(json.dumps(result, indent=2))
        sys.exit(1)
    
    # Process valid inputs
    result = {
        "success": True,
        "processed": {
            "email": email,
            "priority": priority,
            "count": count
        }
    }
    
    print(json.dumps(result, indent=2))
    sys.exit(0)
"""
)

Testing Tools

import unittest
from kubiya_sdk.tools.testing import ToolTest

class TestMyTool(ToolTest):
    def setUp(self):
        self.tool = Tool(
            name="test-tool",
            description="Tool for testing",
            content="echo 'Hello, World!'"
        )
    
    def test_successful_execution(self):
        """Test successful tool execution."""
        result = self.execute_tool(self.tool, {})
        
        self.assertTrue(result.success)
        self.assertIn("Hello, World!", result.output)
    
    def test_input_validation(self):
        """Test input validation."""
        tool_with_args = Tool(
            name="validation-test",
            args=[
                Arg("required_arg", "Required argument", required=True)
            ],
            content="echo $1"
        )
        
        # Should fail without required argument
        result = self.execute_tool(tool_with_args, {})
        self.assertFalse(result.success)
        
        # Should succeed with required argument
        result = self.execute_tool(tool_with_args, {"required_arg": "test"})
        self.assertTrue(result.success)

if __name__ == "__main__":
    unittest.main()

Publishing Tools

Community Tools

To contribute to @community-tools:
  1. Follow Standards: Use consistent naming and structure
  2. Include Tests: Provide comprehensive test coverage
  3. Document Well: Include clear descriptions and examples
  4. Add Categories: Tag tools appropriately (database, cloud, monitoring, etc.)
# Tool metadata for community contribution
community_tool = Tool(
    name="my-awesome-tool",
    description="Brief description of what the tool does",
    version="1.0.0",
    author="Your Name <your.email@company.com>",
    tags=["database", "postgresql", "monitoring"],
    category="database",
    license="MIT",
    
    # ... tool definition
)

Private Tool Registry

For internal tools:
from kubiya_sdk import Kubiya

client = Kubiya()

# Register tool in private registry
client.tools.register(
    tool=my_private_tool,
    visibility="private",  # or "team", "organization"
    namespace="my-company"
)

# Use in workflows
client.workflows.create({
    "name": "internal-process",
    "steps": [
        {
            "name": "custom-step",
            "tool": "my-company/my-private-tool",
            "inputs": {"param": "value"}
        }
    ]
})

Next Steps