Basic Docker Tool Structure
Simple Docker Tool
Copy
Ask AI
from kubiya.tools import Tool, Arg
# Basic Docker tool
simple_tool = Tool(
name="file-processor",
description="Process files using Python scripts",
image="python:3.11-slim",
requirements=["pandas", "requests"],
args=[
Arg("input_file", "Path to input file", type="str", required=True),
Arg("operation", "Operation to perform", type="str",
options=["validate", "transform", "analyze"], default="analyze")
],
content="""
import pandas as pd
import sys
import json
def process_file(input_file, operation):
try:
# Read the file
if input_file.endswith('.csv'):
df = pd.read_csv(input_file)
elif input_file.endswith('.json'):
with open(input_file, 'r') as f:
data = json.load(f)
df = pd.DataFrame(data)
else:
raise ValueError(f"Unsupported file format: {input_file}")
result = {"input_file": input_file, "operation": operation}
if operation == "validate":
result["valid"] = not df.empty
result["row_count"] = len(df)
result["column_count"] = len(df.columns)
elif operation == "transform":
# Example transformation: clean null values
df_clean = df.dropna()
output_file = input_file.replace('.', '_clean.')
df_clean.to_csv(output_file, index=False)
result["output_file"] = output_file
result["rows_removed"] = len(df) - len(df_clean)
elif operation == "analyze":
result["statistics"] = {
"row_count": len(df),
"column_count": len(df.columns),
"null_values": df.isnull().sum().to_dict(),
"memory_usage": df.memory_usage(deep=True).sum()
}
print(json.dumps(result, indent=2))
return 0
except Exception as e:
error_result = {
"error": str(e),
"input_file": input_file,
"operation": operation
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
input_file = sys.argv[1]
operation = sys.argv[2]
exit_code = process_file(input_file, operation)
sys.exit(exit_code)
"""
)
Advanced Docker Tool Features
Multi-Stage Tool with Complex Dependencies
Copy
Ask AI
# Advanced data science tool
advanced_data_tool = Tool(
name="ml-model-trainer",
description="Train and evaluate machine learning models",
image="python:3.11-slim",
requirements=[
"scikit-learn>=1.3.0",
"pandas>=2.0.0",
"numpy>=1.24.0",
"matplotlib>=3.7.0",
"seaborn>=0.12.0",
"joblib>=1.3.0"
],
args=[
Arg("dataset_path", "Path to training dataset", type="str", required=True),
Arg("model_type", "Type of ML model", type="str",
options=["linear_regression", "random_forest", "svm", "neural_network"]),
Arg("test_split", "Test set split ratio", type="float", default=0.2),
Arg("cross_validation", "Use cross-validation", type="bool", default=True),
Arg("output_dir", "Output directory for results", type="str", default="/tmp/ml_results")
],
# Custom entrypoint for complex setup
entrypoint=["python", "-u"],
content="""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import json
import sys
import os
class ModelTrainer:
def __init__(self, dataset_path, model_type, test_split, cross_validation, output_dir):
self.dataset_path = dataset_path
self.model_type = model_type
self.test_split = test_split
self.cross_validation = cross_validation
self.output_dir = output_dir
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Initialize model
self.models = {
"linear_regression": LinearRegression(),
"random_forest": RandomForestRegressor(n_estimators=100, random_state=42),
"svm": SVR(kernel='rbf'),
"neural_network": MLPRegressor(hidden_layer_sizes=(100,), random_state=42)
}
if model_type not in self.models:
raise ValueError(f"Unsupported model type: {model_type}")
self.model = self.models[model_type]
def load_and_prepare_data(self):
\"\"\"Load and prepare the dataset\"\"\"
print(f"Loading dataset from {self.dataset_path}")
if self.dataset_path.endswith('.csv'):
df = pd.read_csv(self.dataset_path)
else:
raise ValueError("Only CSV files are supported")
# Basic data validation
if df.empty:
raise ValueError("Dataset is empty")
# Assume last column is target, rest are features
X = df.iloc[:, :-1]
y = df.iloc[:, -1]
# Handle categorical variables (simple encoding)
X_encoded = pd.get_dummies(X, drop_first=True)
print(f"Dataset shape: {df.shape}")
print(f"Features: {X_encoded.shape[1]}")
print(f"Target variable: {y.name}")
return X_encoded, y
def train_model(self, X, y):
\"\"\"Train the model\"\"\"
print(f"Training {self.model_type} model...")
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=self.test_split, random_state=42
)
# Train model
self.model.fit(X_train, y_train)
# Make predictions
y_pred_train = self.model.predict(X_train)
y_pred_test = self.model.predict(X_test)
# Calculate metrics
train_mse = mean_squared_error(y_train, y_pred_train)
test_mse = mean_squared_error(y_test, y_pred_test)
train_r2 = r2_score(y_train, y_pred_train)
test_r2 = r2_score(y_test, y_pred_test)
results = {
"model_type": self.model_type,
"training_samples": len(X_train),
"test_samples": len(X_test),
"metrics": {
"train_mse": float(train_mse),
"test_mse": float(test_mse),
"train_r2": float(train_r2),
"test_r2": float(test_r2)
}
}
# Cross-validation if requested
if self.cross_validation:
cv_scores = cross_val_score(self.model, X, y, cv=5,
scoring='neg_mean_squared_error')
results["cross_validation"] = {
"mean_mse": float(-cv_scores.mean()),
"std_mse": float(cv_scores.std()),
"scores": [-float(score) for score in cv_scores]
}
# Generate visualizations
self.create_visualizations(y_test, y_pred_test, results)
# Save model
model_path = os.path.join(self.output_dir, f"{self.model_type}_model.pkl")
joblib.dump(self.model, model_path)
results["model_path"] = model_path
return results, X_test, y_test, y_pred_test
def create_visualizations(self, y_true, y_pred, results):
\"\"\"Create visualization plots\"\"\"
plt.figure(figsize=(12, 8))
# Prediction vs Actual plot
plt.subplot(2, 2, 1)
plt.scatter(y_true, y_pred, alpha=0.6)
plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.title(f'{self.model_type.title()} - Predictions vs Actual')
# Residuals plot
plt.subplot(2, 2, 2)
residuals = y_true - y_pred
plt.scatter(y_pred, residuals, alpha=0.6)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('Predicted Values')
plt.ylabel('Residuals')
plt.title('Residuals Plot')
# Distribution of residuals
plt.subplot(2, 2, 3)
plt.hist(residuals, bins=30, alpha=0.7, edgecolor='black')
plt.xlabel('Residuals')
plt.ylabel('Frequency')
plt.title('Distribution of Residuals')
# Metrics summary
plt.subplot(2, 2, 4)
plt.text(0.1, 0.8, f"Model: {self.model_type}", fontsize=12, weight='bold')
plt.text(0.1, 0.7, f"Test R²: {results['metrics']['test_r2']:.4f}", fontsize=10)
plt.text(0.1, 0.6, f"Test MSE: {results['metrics']['test_mse']:.4f}", fontsize=10)
plt.text(0.1, 0.5, f"Train R²: {results['metrics']['train_r2']:.4f}", fontsize=10)
plt.text(0.1, 0.4, f"Train MSE: {results['metrics']['train_mse']:.4f}", fontsize=10)
if 'cross_validation' in results:
plt.text(0.1, 0.3, f"CV MSE: {results['cross_validation']['mean_mse']:.4f} ± {results['cross_validation']['std_mse']:.4f}", fontsize=10)
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.axis('off')
plt.title('Model Performance Summary')
plt.tight_layout()
# Save plot
plot_path = os.path.join(self.output_dir, f"{self.model_type}_results.png")
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"Visualizations saved to: {plot_path}")
def run(self):
\"\"\"Main execution method\"\"\"
try:
# Load and prepare data
X, y = self.load_and_prepare_data()
# Train model
results, X_test, y_test, y_pred = self.train_model(X, y)
# Save detailed results
results_path = os.path.join(self.output_dir, f"{self.model_type}_results.json")
with open(results_path, 'w') as f:
json.dump(results, f, indent=2)
results["results_path"] = results_path
# Print results
print("\\n" + "="*50)
print("MODEL TRAINING COMPLETED")
print("="*50)
print(json.dumps(results, indent=2))
return 0
except Exception as e:
error_result = {
"error": str(e),
"model_type": self.model_type,
"dataset_path": self.dataset_path
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
dataset_path = sys.argv[1]
model_type = sys.argv[2]
test_split = float(sys.argv[3])
cross_validation = sys.argv[4].lower() == 'true'
output_dir = sys.argv[5]
trainer = ModelTrainer(dataset_path, model_type, test_split, cross_validation, output_dir)
exit_code = trainer.run()
sys.exit(exit_code)
"""
)
Docker Tool Configuration
Custom Docker Image
Copy
Ask AI
# Tool with custom Docker image
custom_image_tool = Tool(
name="nodejs-app-builder",
description="Build Node.js applications with custom optimizations",
image="node:18-alpine", # Use specific Node.js version
# Install additional system packages
on_build="""
apk add --no-cache git python3 make g++
npm install -g @angular/cli @nestjs/cli typescript
""",
args=[
Arg("project_path", "Path to Node.js project", type="str", required=True),
Arg("build_type", "Type of build", type="str",
options=["development", "production"], default="production"),
Arg("run_tests", "Run tests before build", type="bool", default=True)
],
content="""
#!/bin/bash
set -e
PROJECT_PATH="$1"
BUILD_TYPE="$2"
RUN_TESTS="$3"
cd "$PROJECT_PATH"
echo "Building Node.js application..."
echo "Project: $PROJECT_PATH"
echo "Build type: $BUILD_TYPE"
echo "Run tests: $RUN_TESTS"
# Install dependencies
echo "Installing dependencies..."
npm ci
# Run tests if requested
if [ "$RUN_TESTS" = "true" ]; then
echo "Running tests..."
npm test
fi
# Build application
echo "Building application..."
if [ "$BUILD_TYPE" = "production" ]; then
npm run build:prod
else
npm run build
fi
# Create build summary
cat > build-summary.json << EOF
{
"project_path": "$PROJECT_PATH",
"build_type": "$BUILD_TYPE",
"tests_run": $RUN_TESTS,
"build_timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"build_status": "success"
}
EOF
echo "Build completed successfully!"
cat build-summary.json
"""
)
Tool with Volume Mounts
Copy
Ask AI
# Tool that works with persistent volumes
volume_tool = Tool(
name="database-backup-manager",
description="Manage database backups with persistent storage",
image="postgres:15-alpine",
# Define volumes
with_volumes=[
Volume(name="backup-storage", path="/backups"),
Volume(name="config-storage", path="/config")
],
args=[
Arg("action", "Action to perform", type="str",
options=["backup", "restore", "list", "cleanup"], required=True),
Arg("database_url", "Database connection URL", type="str", required=True),
Arg("backup_name", "Name for backup file", type="str", default=""),
Arg("retention_days", "Days to retain backups", type="int", default=30)
],
content="""
#!/bin/bash
set -e
ACTION="$1"
DATABASE_URL="$2"
BACKUP_NAME="$3"
RETENTION_DAYS="$4"
BACKUP_DIR="/backups"
CONFIG_DIR="/config"
# Ensure directories exist
mkdir -p "$BACKUP_DIR" "$CONFIG_DIR"
case "$ACTION" in
"backup")
if [ -z "$BACKUP_NAME" ]; then
BACKUP_NAME="backup_$(date +%Y%m%d_%H%M%S).sql"
fi
echo "Creating backup: $BACKUP_NAME"
pg_dump "$DATABASE_URL" > "$BACKUP_DIR/$BACKUP_NAME"
# Compress backup
gzip "$BACKUP_DIR/$BACKUP_NAME"
BACKUP_FILE="$BACKUP_DIR/$BACKUP_NAME.gz"
# Save metadata
cat > "$CONFIG_DIR/last_backup.json" << EOF
{
"backup_file": "$BACKUP_FILE",
"database_url": "$DATABASE_URL",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"size_bytes": $(stat -c%s "$BACKUP_FILE")
}
EOF
echo "Backup completed: $BACKUP_FILE"
cat "$CONFIG_DIR/last_backup.json"
;;
"restore")
if [ -z "$BACKUP_NAME" ]; then
echo "Error: backup_name required for restore action"
exit 1
fi
BACKUP_FILE="$BACKUP_DIR/$BACKUP_NAME"
if [ ! -f "$BACKUP_FILE" ]; then
echo "Error: Backup file not found: $BACKUP_FILE"
exit 1
fi
echo "Restoring from backup: $BACKUP_FILE"
# Decompress if needed
if [[ "$BACKUP_FILE" == *.gz ]]; then
gunzip -c "$BACKUP_FILE" | psql "$DATABASE_URL"
else
psql "$DATABASE_URL" < "$BACKUP_FILE"
fi
echo "Restore completed successfully"
;;
"list")
echo "Available backups:"
ls -la "$BACKUP_DIR"/*.sql* 2>/dev/null || echo "No backups found"
;;
"cleanup")
echo "Cleaning up backups older than $RETENTION_DAYS days..."
find "$BACKUP_DIR" -name "*.sql*" -type f -mtime +$RETENTION_DAYS -delete
echo "Cleanup completed"
;;
*)
echo "Error: Unknown action: $ACTION"
exit 1
;;
esac
"""
)
Docker Tool Security
Secure Tool with Secrets
Copy
Ask AI
# Tool that uses secrets securely
secure_tool = Tool(
name="secure-api-client",
description="Securely interact with external APIs using secrets",
image="python:3.11-slim",
requirements=["requests", "cryptography"],
# Define required secrets
secrets=["api_key", "client_secret", "encryption_key"],
args=[
Arg("api_endpoint", "API endpoint URL", type="str", required=True),
Arg("action", "API action to perform", type="str", required=True),
Arg("data_payload", "JSON data to send", type="str", default="{}"),
Arg("encrypt_response", "Encrypt API response", type="bool", default=False)
],
content="""
import requests
import json
import os
import sys
from cryptography.fernet import Fernet
import base64
class SecureAPIClient:
def __init__(self):
# Load secrets from environment variables
self.api_key = os.environ.get('API_KEY')
self.client_secret = os.environ.get('CLIENT_SECRET')
self.encryption_key = os.environ.get('ENCRYPTION_KEY')
if not all([self.api_key, self.client_secret]):
raise ValueError("Required secrets (API_KEY, CLIENT_SECRET) not found")
# Initialize encryption if key provided
self.cipher = None
if self.encryption_key:
try:
self.cipher = Fernet(self.encryption_key.encode())
except Exception as e:
print(f"Warning: Invalid encryption key: {e}")
def encrypt_data(self, data):
\"\"\"Encrypt sensitive data\"\"\"
if not self.cipher:
return data
if isinstance(data, str):
return self.cipher.encrypt(data.encode()).decode()
return data
def decrypt_data(self, encrypted_data):
\"\"\"Decrypt data\"\"\"
if not self.cipher:
return encrypted_data
try:
return self.cipher.decrypt(encrypted_data.encode()).decode()
except Exception:
return encrypted_data # Return as-is if decryption fails
def make_request(self, endpoint, action, payload, encrypt_response=False):
\"\"\"Make secure API request\"\"\"
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
'X-Client-Secret': self.client_secret,
'User-Agent': 'Kubiya-Secure-Client/1.0'
}
try:
# Parse payload
if isinstance(payload, str):
payload = json.loads(payload)
# Make request based on action
if action.upper() == 'GET':
response = requests.get(endpoint, headers=headers, params=payload, timeout=30)
elif action.upper() == 'POST':
response = requests.post(endpoint, headers=headers, json=payload, timeout=30)
elif action.upper() == 'PUT':
response = requests.put(endpoint, headers=headers, json=payload, timeout=30)
elif action.upper() == 'DELETE':
response = requests.delete(endpoint, headers=headers, timeout=30)
else:
raise ValueError(f"Unsupported action: {action}")
# Process response
result = {
'status_code': response.status_code,
'success': response.status_code < 400,
'headers': dict(response.headers),
'endpoint': endpoint,
'action': action.upper()
}
# Handle response content
try:
response_data = response.json()
except ValueError:
response_data = response.text
# Encrypt response if requested
if encrypt_response and self.cipher:
result['response_data_encrypted'] = self.encrypt_data(json.dumps(response_data))
result['encrypted'] = True
else:
result['response_data'] = response_data
result['encrypted'] = False
# Add error details if request failed
if not result['success']:
result['error_message'] = f"HTTP {response.status_code}: {response.reason}"
return result
except requests.exceptions.Timeout:
return {
'success': False,
'error_message': 'Request timeout',
'endpoint': endpoint,
'action': action.upper()
}
except requests.exceptions.ConnectionError:
return {
'success': False,
'error_message': 'Connection error',
'endpoint': endpoint,
'action': action.upper()
}
except Exception as e:
return {
'success': False,
'error_message': str(e),
'endpoint': endpoint,
'action': action.upper()
}
def main():
try:
api_endpoint = sys.argv[1]
action = sys.argv[2]
data_payload = sys.argv[3]
encrypt_response = sys.argv[4].lower() == 'true'
client = SecureAPIClient()
result = client.make_request(api_endpoint, action, data_payload, encrypt_response)
print(json.dumps(result, indent=2))
# Exit with error code if request failed
return 0 if result.get('success', False) else 1
except Exception as e:
error_result = {
'success': False,
'error_message': str(e),
'error_type': 'client_error'
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)
"""
)
Docker Tool Performance
Optimized Tool with Caching
Copy
Ask AI
# Performance-optimized tool with caching
optimized_tool = Tool(
name="performance-analyzer",
description="High-performance log analysis with caching",
image="python:3.11-slim",
requirements=["pandas", "numpy", "redis", "matplotlib"],
# Service dependencies
with_services=[
ServiceSpec(
name="redis-cache",
image="redis:7-alpine",
exposed_ports=[6379],
env={"REDIS_PASSWORD": "cache_password"}
)
],
args=[
Arg("log_file", "Path to log file", type="str", required=True),
Arg("analysis_type", "Type of analysis", type="str",
options=["error_rate", "performance", "traffic"], default="error_rate"),
Arg("time_window", "Time window in hours", type="int", default=24),
Arg("use_cache", "Use Redis caching", type="bool", default=True),
Arg("cache_ttl", "Cache TTL in seconds", type="int", default=3600)
],
content="""
import pandas as pd
import numpy as np
import redis
import json
import sys
import hashlib
import time
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
class PerformanceAnalyzer:
def __init__(self, use_cache=True, cache_ttl=3600):
self.use_cache = use_cache
self.cache_ttl = cache_ttl
self.redis_client = None
if use_cache:
try:
# Connect to Redis service
self.redis_client = redis.Redis(
host='redis-cache-svc', # Service name from Kubernetes
port=6379,
password='cache_password',
decode_responses=True
)
# Test connection
self.redis_client.ping()
print("Connected to Redis cache")
except Exception as e:
print(f"Warning: Could not connect to Redis: {e}")
self.use_cache = False
def get_cache_key(self, *args):
\"\"\"Generate cache key from arguments\"\"\"
cache_string = '|'.join(str(arg) for arg in args)
return hashlib.md5(cache_string.encode()).hexdigest()
def get_cached_result(self, cache_key):
\"\"\"Get result from cache\"\"\"
if not self.use_cache or not self.redis_client:
return None
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
except Exception as e:
print(f"Cache read error: {e}")
return None
def cache_result(self, cache_key, result):
\"\"\"Store result in cache\"\"\"
if not self.use_cache or not self.redis_client:
return
try:
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(result, default=str)
)
except Exception as e:
print(f"Cache write error: {e}")
def parse_log_file(self, log_file, time_window):
\"\"\"Parse log file efficiently\"\"\"
print(f"Parsing log file: {log_file}")
# Check cache first
cache_key = self.get_cache_key('parse_log', log_file, time_window)
cached_result = self.get_cached_result(cache_key)
if cached_result:
print("Using cached log parsing result")
return pd.DataFrame(cached_result)
# Define log parsing patterns
log_patterns = {
'timestamp': r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})',
'level': r'(ERROR|WARN|INFO|DEBUG)',
'response_time': r'response_time=(\d+)ms',
'status_code': r'status=(\d{3})',
'endpoint': r'endpoint=([^\s]+)'
}
entries = []
cutoff_time = datetime.now() - timedelta(hours=time_window)
with open(log_file, 'r') as f:
for line_num, line in enumerate(f):
try:
entry = {}
# Extract timestamp
import re
timestamp_match = re.search(log_patterns['timestamp'], line)
if timestamp_match:
timestamp = datetime.fromisoformat(timestamp_match.group(1))
if timestamp < cutoff_time:
continue # Skip old entries
entry['timestamp'] = timestamp
else:
continue # Skip lines without timestamp
# Extract other fields
for field, pattern in log_patterns.items():
if field == 'timestamp':
continue
match = re.search(pattern, line)
if match:
value = match.group(1)
if field in ['response_time', 'status_code']:
entry[field] = int(value)
else:
entry[field] = value
if entry:
entries.append(entry)
except Exception as e:
print(f"Error parsing line {line_num}: {e}")
continue
df = pd.DataFrame(entries)
# Cache the parsed data
self.cache_result(cache_key, df.to_dict('records'))
print(f"Parsed {len(df)} log entries")
return df
def analyze_error_rate(self, df):
\"\"\"Analyze error rates\"\"\"
if df.empty:
return {"error": "No data to analyze"}
# Group by hour
df['hour'] = df['timestamp'].dt.floor('H')
hourly_stats = df.groupby('hour').agg({
'status_code': ['count', lambda x: sum(x >= 400)],
'response_time': ['mean', 'max', 'std']
}).round(2)
hourly_stats.columns = ['total_requests', 'error_count', 'avg_response_time', 'max_response_time', 'response_time_std']
hourly_stats['error_rate'] = (hourly_stats['error_count'] / hourly_stats['total_requests'] * 100).round(2)
# Overall statistics
total_requests = len(df)
total_errors = len(df[df['status_code'] >= 400])
overall_error_rate = (total_errors / total_requests * 100) if total_requests > 0 else 0
result = {
'analysis_type': 'error_rate',
'total_requests': total_requests,
'total_errors': total_errors,
'overall_error_rate': round(overall_error_rate, 2),
'hourly_breakdown': hourly_stats.to_dict('index'),
'time_range': {
'start': df['timestamp'].min().isoformat(),
'end': df['timestamp'].max().isoformat()
}
}
return result
def analyze_performance(self, df):
\"\"\"Analyze performance metrics\"\"\"
if df.empty or 'response_time' not in df.columns:
return {"error": "No performance data available"}
# Performance percentiles
percentiles = df['response_time'].quantile([0.5, 0.75, 0.90, 0.95, 0.99]).round(2)
# Group by endpoint
endpoint_stats = df.groupby('endpoint')['response_time'].agg([
'count', 'mean', 'median', 'max', 'std'
]).round(2)
# Slow requests (>1000ms)
slow_requests = df[df['response_time'] > 1000]
result = {
'analysis_type': 'performance',
'response_time_percentiles': {
'p50': percentiles[0.5],
'p75': percentiles[0.75],
'p90': percentiles[0.90],
'p95': percentiles[0.95],
'p99': percentiles[0.99]
},
'endpoint_performance': endpoint_stats.to_dict('index'),
'slow_requests_count': len(slow_requests),
'avg_response_time': df['response_time'].mean().round(2),
'max_response_time': df['response_time'].max()
}
return result
def analyze_traffic(self, df):
\"\"\"Analyze traffic patterns\"\"\"
if df.empty:
return {"error": "No traffic data available"}
# Requests per hour
hourly_traffic = df.groupby(df['timestamp'].dt.floor('H')).size()
# Peak traffic analysis
peak_hour = hourly_traffic.idxmax()
peak_requests = hourly_traffic.max()
# Traffic by endpoint
endpoint_traffic = df['endpoint'].value_counts().head(10)
result = {
'analysis_type': 'traffic',
'total_requests': len(df),
'requests_per_hour': hourly_traffic.to_dict(),
'peak_traffic': {
'hour': peak_hour.isoformat(),
'requests': int(peak_requests)
},
'top_endpoints': endpoint_traffic.to_dict(),
'avg_requests_per_hour': hourly_traffic.mean().round(2)
}
return result
def create_visualizations(self, df, analysis_result):
\"\"\"Create performance visualizations\"\"\"
if df.empty:
return
plt.figure(figsize=(15, 10))
# Response time distribution
plt.subplot(2, 3, 1)
df['response_time'].hist(bins=50, alpha=0.7, edgecolor='black')
plt.xlabel('Response Time (ms)')
plt.ylabel('Frequency')
plt.title('Response Time Distribution')
# Error rate over time
plt.subplot(2, 3, 2)
if 'hourly_breakdown' in analysis_result:
hours = list(analysis_result['hourly_breakdown'].keys())
error_rates = [analysis_result['hourly_breakdown'][h]['error_rate'] for h in hours]
plt.plot(hours, error_rates, marker='o')
plt.xlabel('Hour')
plt.ylabel('Error Rate (%)')
plt.title('Error Rate Over Time')
plt.xticks(rotation=45)
# Traffic over time
plt.subplot(2, 3, 3)
hourly_counts = df.groupby(df['timestamp'].dt.floor('H')).size()
hourly_counts.plot(kind='line', marker='o')
plt.xlabel('Hour')
plt.ylabel('Requests')
plt.title('Traffic Over Time')
plt.xticks(rotation=45)
# Status code distribution
plt.subplot(2, 3, 4)
status_counts = df['status_code'].value_counts()
status_counts.plot(kind='bar')
plt.xlabel('Status Code')
plt.ylabel('Count')
plt.title('Status Code Distribution')
# Top endpoints by traffic
plt.subplot(2, 3, 5)
if 'endpoint' in df.columns:
top_endpoints = df['endpoint'].value_counts().head(10)
top_endpoints.plot(kind='barh')
plt.xlabel('Request Count')
plt.title('Top Endpoints by Traffic')
# Response time box plot by endpoint
plt.subplot(2, 3, 6)
if 'endpoint' in df.columns and 'response_time' in df.columns:
top_5_endpoints = df['endpoint'].value_counts().head(5).index
endpoint_response_times = [df[df['endpoint'] == ep]['response_time'] for ep in top_5_endpoints]
plt.boxplot(endpoint_response_times, labels=top_5_endpoints)
plt.xlabel('Endpoint')
plt.ylabel('Response Time (ms)')
plt.title('Response Time by Endpoint')
plt.xticks(rotation=45)
plt.tight_layout()
# Save visualization
plot_path = '/tmp/performance_analysis.png'
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"Visualization saved to: {plot_path}")
return plot_path
def analyze(self, log_file, analysis_type, time_window):
\"\"\"Main analysis method\"\"\"
start_time = time.time()
# Parse log file
df = self.parse_log_file(log_file, time_window)
# Perform analysis based on type
if analysis_type == 'error_rate':
result = self.analyze_error_rate(df)
elif analysis_type == 'performance':
result = self.analyze_performance(df)
elif analysis_type == 'traffic':
result = self.analyze_traffic(df)
else:
result = {
'error': f'Unknown analysis type: {analysis_type}',
'available_types': ['error_rate', 'performance', 'traffic']
}
# Add metadata
result['analysis_duration'] = round(time.time() - start_time, 2)
result['cache_used'] = self.use_cache
result['log_file'] = log_file
# Create visualizations
if not result.get('error'):
plot_path = self.create_visualizations(df, result)
result['visualization_path'] = plot_path
return result
def main():
try:
log_file = sys.argv[1]
analysis_type = sys.argv[2]
time_window = int(sys.argv[3])
use_cache = sys.argv[4].lower() == 'true'
cache_ttl = int(sys.argv[5])
analyzer = PerformanceAnalyzer(use_cache, cache_ttl)
result = analyzer.analyze(log_file, analysis_type, time_window)
print(json.dumps(result, indent=2, default=str))
return 0 if not result.get('error') else 1
except Exception as e:
error_result = {
'error': str(e),
'error_type': 'analyzer_error'
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)
"""
)
Best Practices for Docker Tools
Efficient Resource Usage
Copy
Ask AI
# Resource-efficient tool
efficient_tool = Tool(
name="efficient-processor",
description="Memory and CPU efficient data processing",
image="python:3.11-slim",
requirements=["pandas", "pyarrow"], # Use efficient libraries
# Resource limits
env=["OMP_NUM_THREADS=2", "PANDAS_MAX_MEMORY=1GB"],
content="""
import pandas as pd
import sys
import gc
import os
def process_large_file_efficiently(file_path, chunk_size=10000):
\"\"\"Process large files in chunks to manage memory\"\"\"
results = []
# Process in chunks
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# Process chunk
processed_chunk = chunk.groupby('category').agg({
'value': ['sum', 'mean', 'count']
})
results.append(processed_chunk)
# Force garbage collection
del chunk
gc.collect()
# Combine results
final_result = pd.concat(results).groupby(level=0).sum()
return final_result
# Main processing
if __name__ == "__main__":
file_path = sys.argv[1]
result = process_large_file_efficiently(file_path)
result.to_csv('/tmp/output.csv')
print(f"Processing complete. Output saved to /tmp/output.csv")
"""
)
Error Handling and Logging
Copy
Ask AI
# Tool with comprehensive error handling
robust_tool = Tool(
name="robust-data-processor",
description="Data processor with comprehensive error handling",
image="python:3.11-slim",
requirements=["pandas", "logging"],
content="""
import pandas as pd
import logging
import sys
import json
from datetime import datetime
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/tmp/processor.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class DataProcessor:
def __init__(self, input_file):
self.input_file = input_file
self.start_time = datetime.now()
def validate_input(self):
\"\"\"Validate input file\"\"\"
try:
if not os.path.exists(self.input_file):
raise FileNotFoundError(f"Input file not found: {self.input_file}")
file_size = os.path.getsize(self.input_file)
logger.info(f"Input file size: {file_size} bytes")
if file_size == 0:
raise ValueError("Input file is empty")
return True
except Exception as e:
logger.error(f"Input validation failed: {e}")
raise
def process_data(self):
\"\"\"Main data processing with error handling\"\"\"
try:
logger.info(f"Starting data processing for {self.input_file}")
# Read data with error handling
try:
df = pd.read_csv(self.input_file)
logger.info(f"Successfully loaded {len(df)} rows")
except pd.errors.EmptyDataError:
raise ValueError("CSV file is empty or invalid")
except pd.errors.ParserError as e:
raise ValueError(f"CSV parsing error: {e}")
# Data validation
if df.empty:
raise ValueError("DataFrame is empty after loading")
# Process data with error handling
results = {}
try:
# Basic statistics
results['row_count'] = len(df)
results['column_count'] = len(df.columns)
results['columns'] = list(df.columns)
# Data quality checks
results['null_values'] = df.isnull().sum().to_dict()
results['duplicate_rows'] = int(df.duplicated().sum())
# Numeric column analysis
numeric_cols = df.select_dtypes(include=['number']).columns
if len(numeric_cols) > 0:
results['numeric_summary'] = df[numeric_cols].describe().to_dict()
logger.info("Data processing completed successfully")
except Exception as e:
logger.error(f"Data processing error: {e}")
raise
# Add metadata
results['processing_time'] = (datetime.now() - self.start_time).total_seconds()
results['status'] = 'success'
results['timestamp'] = datetime.now().isoformat()
return results
except Exception as e:
logger.error(f"Fatal error during processing: {e}")
return {
'status': 'error',
'error_message': str(e),
'processing_time': (datetime.now() - self.start_time).total_seconds(),
'timestamp': datetime.now().isoformat()
}
def main():
try:
input_file = sys.argv[1]
processor = DataProcessor(input_file)
processor.validate_input()
result = processor.process_data()
# Output results
print(json.dumps(result, indent=2))
return 0 if result['status'] == 'success' else 1
except Exception as e:
error_result = {
'status': 'error',
'error_message': str(e),
'error_type': 'initialization_error'
}
print(json.dumps(error_result, indent=2))
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)
"""
)