Why Create Custom Tools?
Organization-Specific Logic
Encode your company’s specific processes and procedures
API Integrations
Connect to internal services and proprietary systems
Reusable Components
Share tools across teams and workflows
Community Contribution
Contribute to the @community-tools collection
Tool Structure
Every Kubiya tool follows a consistent structure:Copy
Ask AI
from kubiya_sdk.tools import Tool, Arg
# Define a tool
my_tool = Tool(
name="service-health-checker",
description="Check health status of microservices",
image="python:3.11-slim",
requirements=["requests", "pyyaml"],
# Tool arguments
args=[
Arg(
name="service_url",
description="Base URL of the service to check",
type="str",
required=True
),
Arg(
name="timeout",
description="Request timeout in seconds",
type="int",
default=30
),
Arg(
name="expected_status",
description="Expected HTTP status code",
type="int",
default=200
)
],
# Tool execution script
content="""
import requests
import sys
import json
def check_service_health(url, timeout, expected_status):
try:
response = requests.get(f"{url}/health", timeout=timeout)
result = {
"url": url,
"status_code": response.status_code,
"expected_status": expected_status,
"healthy": response.status_code == expected_status,
"response_time_ms": response.elapsed.total_seconds() * 1000,
"response_body": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text
}
print(json.dumps(result, indent=2))
return 0 if result["healthy"] else 1
except requests.RequestException as e:
error_result = {
"url": url,
"error": str(e),
"healthy": False
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
service_url = sys.argv[1]
timeout = int(sys.argv[2])
expected_status = int(sys.argv[3])
exit_code = check_service_health(service_url, timeout, expected_status)
sys.exit(exit_code)
"""
)
Community Tools Examples
Learn from real community tools in the @community-tools collection:Database Tools
Copy
Ask AI
# @community-tools/postgres-query
from kubiya_sdk.tools import Tool, Arg
postgres_query = Tool(
name="postgres-query",
description="Execute PostgreSQL queries and return results",
image="postgres:15-alpine",
args=[
Arg("query", "SQL query to execute", type="str", required=True),
Arg("database", "Database name", type="str", required=True),
Arg("host", "PostgreSQL host", type="str", default="localhost"),
Arg("port", "PostgreSQL port", type="int", default=5432),
Arg("username", "Database username", type="str", required=True),
Arg("password", "Database password", type="str", required=True, sensitive=True)
],
content="""
#!/bin/bash
set -e
export PGPASSWORD="$6" # password argument
export PGHOST="$3" # host argument
export PGPORT="$4" # port argument
export PGUSER="$5" # username argument
export PGDATABASE="$2" # database argument
echo "Executing query on database: $PGDATABASE"
echo "Query: $1"
echo "---"
# Execute query and format output as JSON
psql -t -c "$1" | jq -R -s 'split("\\n") | map(select(. != "")) | map(split("|") | map(gsub("^\\s+|\\s+$"; "")))'
"""
)
Cloud Tools
Copy
Ask AI
# @community-tools/aws-cost-analyzer
from kubiya_sdk.tools import Tool, Arg
aws_cost_analyzer = Tool(
name="aws-cost-analyzer",
description="Analyze AWS costs and provide optimization recommendations",
image="amazon/aws-cli:latest",
requirements=["boto3", "pandas", "matplotlib"],
args=[
Arg("start_date", "Start date for cost analysis (YYYY-MM-DD)", type="str", required=True),
Arg("end_date", "End date for cost analysis (YYYY-MM-DD)", type="str", required=True),
Arg("service_filter", "AWS service to filter by (optional)", type="str", default=""),
Arg("group_by", "Group costs by (SERVICE, REGION, INSTANCE_TYPE)", type="str", default="SERVICE"),
Arg("generate_chart", "Generate cost visualization chart", type="bool", default=True)
],
env=["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_DEFAULT_REGION"],
content="""
import boto3
import pandas as pd
import matplotlib.pyplot as plt
import json
import sys
from datetime import datetime
def analyze_aws_costs(start_date, end_date, service_filter, group_by, generate_chart):
ce_client = boto3.client('ce')
# Prepare Cost Explorer query
query_params = {
'TimePeriod': {
'Start': start_date,
'End': end_date
},
'Granularity': 'DAILY',
'Metrics': ['BlendedCost'],
'GroupBy': [{'Type': 'DIMENSION', 'Key': group_by}]
}
if service_filter:
query_params['Filter'] = {
'Dimensions': {
'Key': 'SERVICE',
'Values': [service_filter]
}
}
try:
response = ce_client.get_cost_and_usage(**query_params)
# Process results
cost_data = []
total_cost = 0
for result_by_time in response['ResultsByTime']:
date = result_by_time['TimePeriod']['Start']
for group in result_by_time['Groups']:
group_key = group['Keys'][0]
amount = float(group['Metrics']['BlendedCost']['Amount'])
cost_data.append({
'Date': date,
group_by: group_key,
'Cost': amount
})
total_cost += amount
# Create DataFrame for analysis
df = pd.DataFrame(cost_data)
# Generate summary
summary = {
'total_cost': round(total_cost, 2),
'date_range': f"{start_date} to {end_date}",
'group_by': group_by,
'service_filter': service_filter or 'All Services',
'top_cost_items': df.groupby(group_by)['Cost'].sum().sort_values(ascending=False).head(10).to_dict()
}
# Generate chart if requested
chart_path = None
if generate_chart and not df.empty:
plt.figure(figsize=(12, 8))
# Group by the specified dimension and sum costs
grouped_costs = df.groupby(group_by)['Cost'].sum().sort_values(ascending=False)
# Plot top 10 items
top_10 = grouped_costs.head(10)
plt.bar(range(len(top_10)), top_10.values)
plt.xticks(range(len(top_10)), top_10.index, rotation=45, ha='right')
plt.ylabel('Cost ($)')
plt.title(f'AWS Costs by {group_by} ({start_date} to {end_date})')
plt.tight_layout()
chart_path = 'aws_cost_analysis.png'
plt.savefig(chart_path, dpi=150, bbox_inches='tight')
summary['chart_generated'] = chart_path
# Output results
print(json.dumps(summary, indent=2))
# Recommendations
recommendations = []
if 'EC2' in summary['top_cost_items']:
recommendations.append("Consider Reserved Instances for EC2 to save up to 75%")
if total_cost > 1000:
recommendations.append("Enable AWS Cost Anomaly Detection for this account")
if len(summary['top_cost_items']) > 5:
recommendations.append("Review unused resources in top spending services")
if recommendations:
print("\\n--- Cost Optimization Recommendations ---")
for rec in recommendations:
print(f"• {rec}")
return 0
except Exception as e:
error_result = {
'error': str(e),
'message': 'Failed to analyze AWS costs'
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
start_date = sys.argv[1]
end_date = sys.argv[2]
service_filter = sys.argv[3]
group_by = sys.argv[4]
generate_chart = sys.argv[5].lower() == 'true'
exit_code = analyze_aws_costs(start_date, end_date, service_filter, group_by, generate_chart)
sys.exit(exit_code)
"""
)
DevOps Tools
Copy
Ask AI
# @community-tools/github-pr-analyzer
from kubiya_sdk.tools import Tool, Arg
github_pr_analyzer = Tool(
name="github-pr-analyzer",
description="Analyze GitHub pull requests for code quality and review metrics",
image="python:3.11-slim",
requirements=["requests", "python-dateutil"],
args=[
Arg("repository", "GitHub repository (owner/repo)", type="str", required=True),
Arg("pr_number", "Pull request number to analyze", type="int", required=True),
Arg("github_token", "GitHub API token", type="str", required=True, sensitive=True),
Arg("include_files", "Include changed files analysis", type="bool", default=True),
Arg("check_tests", "Check if tests are included", type="bool", default=True)
],
content="""
import requests
import json
import sys
from datetime import datetime
from dateutil import parser
def analyze_pr(repo, pr_number, token, include_files, check_tests):
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
base_url = f'https://api.github.com/repos/{repo}'
try:
# Get PR details
pr_response = requests.get(f'{base_url}/pulls/{pr_number}', headers=headers)
pr_response.raise_for_status()
pr_data = pr_response.json()
# Get PR reviews
reviews_response = requests.get(f'{base_url}/pulls/{pr_number}/reviews', headers=headers)
reviews_response.raise_for_status()
reviews = reviews_response.json()
# Get PR files if requested
files = []
if include_files:
files_response = requests.get(f'{base_url}/pulls/{pr_number}/files', headers=headers)
files_response.raise_for_status()
files = files_response.json()
# Analyze the PR
analysis = {
'pr_info': {
'number': pr_data['number'],
'title': pr_data['title'],
'author': pr_data['user']['login'],
'state': pr_data['state'],
'created_at': pr_data['created_at'],
'updated_at': pr_data['updated_at'],
'mergeable': pr_data['mergeable'],
'draft': pr_data['draft']
},
'metrics': {
'additions': pr_data['additions'],
'deletions': pr_data['deletions'],
'changed_files': pr_data['changed_files'],
'commits': pr_data['commits']
},
'reviews': {
'total_reviews': len(reviews),
'approved_reviews': len([r for r in reviews if r['state'] == 'APPROVED']),
'changes_requested': len([r for r in reviews if r['state'] == 'CHANGES_REQUESTED']),
'reviewers': list(set([r['user']['login'] for r in reviews]))
}
}
# File analysis
if include_files:
file_analysis = {
'languages': {},
'file_types': {},
'largest_files': []
}
for file in files:
filename = file['filename']
changes = file['additions'] + file['deletions']
# Determine language/type
if '.' in filename:
ext = filename.split('.')[-1].lower()
file_analysis['file_types'][ext] = file_analysis['file_types'].get(ext, 0) + 1
# Track largest files by changes
file_analysis['largest_files'].append({
'filename': filename,
'changes': changes,
'additions': file['additions'],
'deletions': file['deletions']
})
# Sort by changes
file_analysis['largest_files'].sort(key=lambda x: x['changes'], reverse=True)
file_analysis['largest_files'] = file_analysis['largest_files'][:10]
analysis['file_analysis'] = file_analysis
# Check for tests if requested
if check_tests:
test_files = [f for f in files if 'test' in f['filename'].lower() or 'spec' in f['filename'].lower()]
analysis['testing'] = {
'has_test_files': len(test_files) > 0,
'test_files_count': len(test_files),
'test_files': [f['filename'] for f in test_files]
}
# Calculate PR health score
health_score = 100
# Reduce score for large PRs
if pr_data['changed_files'] > 20:
health_score -= 10
if pr_data['additions'] + pr_data['deletions'] > 1000:
health_score -= 15
# Reduce score for no reviews
if len(reviews) == 0:
health_score -= 20
elif analysis['reviews']['approved_reviews'] == 0:
health_score -= 10
# Reduce score if no tests (when checking)
if check_tests and include_files:
if not analysis['testing']['has_test_files'] and analysis['metrics']['additions'] > 50:
health_score -= 15
analysis['health_score'] = max(0, health_score)
# Recommendations
recommendations = []
if pr_data['changed_files'] > 15:
recommendations.append("Consider breaking this PR into smaller chunks for easier review")
if len(reviews) == 0:
recommendations.append("Request reviews from team members")
if analysis['reviews']['changes_requested'] > analysis['reviews']['approved_reviews']:
recommendations.append("Address review comments before merging")
if check_tests and include_files and not analysis['testing']['has_test_files'] and analysis['metrics']['additions'] > 20:
recommendations.append("Consider adding tests for new functionality")
analysis['recommendations'] = recommendations
print(json.dumps(analysis, indent=2))
return 0
except requests.RequestException as e:
error_result = {
'error': str(e),
'message': 'Failed to analyze GitHub PR'
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
repository = sys.argv[1]
pr_number = int(sys.argv[2])
github_token = sys.argv[3]
include_files = sys.argv[4].lower() == 'true'
check_tests = sys.argv[5].lower() == 'true'
exit_code = analyze_pr(repository, pr_number, github_token, include_files, check_tests)
sys.exit(exit_code)
"""
)
Monitoring Tools
Copy
Ask AI
# @community-tools/prometheus-alert-manager
from kubiya_sdk.tools import Tool, Arg
prometheus_alert_manager = Tool(
name="prometheus-alert-manager",
description="Query Prometheus metrics and manage alerts",
image="prom/prometheus:latest",
requirements=["requests", "pandas"],
args=[
Arg("prometheus_url", "Prometheus server URL", type="str", required=True),
Arg("query", "PromQL query to execute", type="str", required=True),
Arg("start_time", "Start time for range query (ISO format)", type="str", default=""),
Arg("end_time", "End time for range query (ISO format)", type="str", default=""),
Arg("step", "Query resolution step", type="str", default="15s"),
Arg("alert_threshold", "Threshold value for alerting", type="float", default=0)
],
content="""
import requests
import json
import sys
from datetime import datetime, timedelta
import pandas as pd
def query_prometheus(prom_url, query, start_time, end_time, step, alert_threshold):
try:
# Determine query type
if start_time and end_time:
# Range query
params = {
'query': query,
'start': start_time,
'end': end_time,
'step': step
}
response = requests.get(f'{prom_url}/api/v1/query_range', params=params)
else:
# Instant query
params = {'query': query}
response = requests.get(f'{prom_url}/api/v1/query', params=params)
response.raise_for_status()
data = response.json()
if data['status'] != 'success':
raise Exception(f"Prometheus query failed: {data.get('error', 'Unknown error')}")
result = data['data']['result']
# Process results
analysis = {
'query': query,
'query_type': 'range' if start_time and end_time else 'instant',
'timestamp': datetime.now().isoformat(),
'result_count': len(result),
'results': []
}
alerts_triggered = []
for series in result:
metric = series['metric']
if 'values' in series: # Range query
values = series['values']
latest_value = float(values[-1][1]) if values else 0
series_data = {
'metric': metric,
'latest_value': latest_value,
'data_points': len(values),
'min_value': min([float(v[1]) for v in values]) if values else 0,
'max_value': max([float(v[1]) for v in values]) if values else 0,
'avg_value': sum([float(v[1]) for v in values]) / len(values) if values else 0
}
else: # Instant query
timestamp, value = series['value']
latest_value = float(value)
series_data = {
'metric': metric,
'value': latest_value,
'timestamp': timestamp
}
analysis['results'].append(series_data)
# Check alert threshold
if alert_threshold > 0 and latest_value > alert_threshold:
alerts_triggered.append({
'metric': metric,
'current_value': latest_value,
'threshold': alert_threshold,
'severity': 'warning' if latest_value < alert_threshold * 1.5 else 'critical'
})
if alerts_triggered:
analysis['alerts'] = {
'triggered': True,
'count': len(alerts_triggered),
'alerts': alerts_triggered
}
else:
analysis['alerts'] = {'triggered': False}
# Summary statistics
if analysis['results']:
all_values = []
for result in analysis['results']:
if 'latest_value' in result:
all_values.append(result['latest_value'])
elif 'value' in result:
all_values.append(result['value'])
if all_values:
analysis['summary'] = {
'total_series': len(all_values),
'min_value': min(all_values),
'max_value': max(all_values),
'avg_value': sum(all_values) / len(all_values),
'values_above_threshold': len([v for v in all_values if v > alert_threshold]) if alert_threshold > 0 else 0
}
print(json.dumps(analysis, indent=2))
# Return non-zero exit code if alerts were triggered
return 1 if alerts_triggered else 0
except Exception as e:
error_result = {
'error': str(e),
'message': 'Failed to query Prometheus',
'query': query
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
prometheus_url = sys.argv[1].rstrip('/')
query = sys.argv[2]
start_time = sys.argv[3] if len(sys.argv) > 3 and sys.argv[3] else ""
end_time = sys.argv[4] if len(sys.argv) > 4 and sys.argv[4] else ""
step = sys.argv[5] if len(sys.argv) > 5 else "15s"
alert_threshold = float(sys.argv[6]) if len(sys.argv) > 6 else 0
exit_code = query_prometheus(prometheus_url, query, start_time, end_time, step, alert_threshold)
sys.exit(exit_code)
"""
)
Tool Development Best Practices
Structure and Organization
- Clear Naming: Use descriptive, kebab-case names
- Single Responsibility: Each tool should do one thing well
- Comprehensive Documentation: Include detailed descriptions and examples
- Error Handling: Always handle errors gracefully with structured output
Input Validation
Copy
Ask AI
from kubiya_sdk.tools import Tool, Arg
validated_tool = Tool(
name="validated-example",
description="Example tool with input validation",
args=[
Arg(
name="email",
description="User email address",
type="str",
required=True,
validation=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
),
Arg(
name="priority",
description="Task priority level",
type="str",
required=True,
options=["low", "medium", "high", "critical"]
),
Arg(
name="count",
description="Number of items to process",
type="int",
default=10,
min_value=1,
max_value=1000
)
],
content="""
import sys
import re
import json
def validate_inputs(email, priority, count):
errors = []
# Email validation
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
errors.append("Invalid email format")
# Priority validation
if priority not in ["low", "medium", "high", "critical"]:
errors.append("Invalid priority. Must be: low, medium, high, or critical")
# Count validation
if not (1 <= count <= 1000):
errors.append("Count must be between 1 and 1000")
return errors
if __name__ == "__main__":
email = sys.argv[1]
priority = sys.argv[2]
count = int(sys.argv[3])
validation_errors = validate_inputs(email, priority, count)
if validation_errors:
result = {
"success": False,
"errors": validation_errors
}
print(json.dumps(result, indent=2))
sys.exit(1)
# Process valid inputs
result = {
"success": True,
"processed": {
"email": email,
"priority": priority,
"count": count
}
}
print(json.dumps(result, indent=2))
sys.exit(0)
"""
)
Testing Tools
Copy
Ask AI
import unittest
from kubiya_sdk.tools.testing import ToolTest
class TestMyTool(ToolTest):
def setUp(self):
self.tool = Tool(
name="test-tool",
description="Tool for testing",
content="echo 'Hello, World!'"
)
def test_successful_execution(self):
"""Test successful tool execution."""
result = self.execute_tool(self.tool, {})
self.assertTrue(result.success)
self.assertIn("Hello, World!", result.output)
def test_input_validation(self):
"""Test input validation."""
tool_with_args = Tool(
name="validation-test",
args=[
Arg("required_arg", "Required argument", required=True)
],
content="echo $1"
)
# Should fail without required argument
result = self.execute_tool(tool_with_args, {})
self.assertFalse(result.success)
# Should succeed with required argument
result = self.execute_tool(tool_with_args, {"required_arg": "test"})
self.assertTrue(result.success)
if __name__ == "__main__":
unittest.main()
Publishing Tools
Community Tools
To contribute to @community-tools:- Follow Standards: Use consistent naming and structure
- Include Tests: Provide comprehensive test coverage
- Document Well: Include clear descriptions and examples
- Add Categories: Tag tools appropriately (database, cloud, monitoring, etc.)
Copy
Ask AI
# Tool metadata for community contribution
community_tool = Tool(
name="my-awesome-tool",
description="Brief description of what the tool does",
version="1.0.0",
author="Your Name <your.email@company.com>",
tags=["database", "postgresql", "monitoring"],
category="database",
license="MIT",
# ... tool definition
)
Private Tool Registry
For internal tools:Copy
Ask AI
from kubiya_sdk import Kubiya
client = Kubiya()
# Register tool in private registry
client.tools.register(
tool=my_private_tool,
visibility="private", # or "team", "organization"
namespace="my-company"
)
# Use in workflows
client.workflows.create({
"name": "internal-process",
"steps": [
{
"name": "custom-step",
"tool": "my-company/my-private-tool",
"inputs": {"param": "value"}
}
]
})