Docker tools are the most common type of custom tools in Kubiya. They provide isolated, reproducible execution environments with full control over dependencies and runtime configurations.

Basic Docker Tool Structure

Simple Docker Tool

from kubiya_sdk.tools import Tool, Arg

# Basic Docker tool
simple_tool = Tool(
    name="file-processor",
    description="Process files using Python scripts",
    image="python:3.11-slim",
    requirements=["pandas", "requests"],
    
    args=[
        Arg("input_file", "Path to input file", type="str", required=True),
        Arg("operation", "Operation to perform", type="str", 
            options=["validate", "transform", "analyze"], default="analyze")
    ],
    
    content="""
import pandas as pd
import sys
import json

def process_file(input_file, operation):
    try:
        # Read the file
        if input_file.endswith('.csv'):
            df = pd.read_csv(input_file)
        elif input_file.endswith('.json'):
            with open(input_file, 'r') as f:
                data = json.load(f)
            df = pd.DataFrame(data)
        else:
            raise ValueError(f"Unsupported file format: {input_file}")
        
        result = {"input_file": input_file, "operation": operation}
        
        if operation == "validate":
            result["valid"] = not df.empty
            result["row_count"] = len(df)
            result["column_count"] = len(df.columns)
            
        elif operation == "transform":
            # Example transformation: clean null values
            df_clean = df.dropna()
            output_file = input_file.replace('.', '_clean.')
            df_clean.to_csv(output_file, index=False)
            result["output_file"] = output_file
            result["rows_removed"] = len(df) - len(df_clean)
            
        elif operation == "analyze":
            result["statistics"] = {
                "row_count": len(df),
                "column_count": len(df.columns),
                "null_values": df.isnull().sum().to_dict(),
                "memory_usage": df.memory_usage(deep=True).sum()
            }
        
        print(json.dumps(result, indent=2))
        return 0
        
    except Exception as e:
        error_result = {
            "error": str(e),
            "input_file": input_file,
            "operation": operation
        }
        print(json.dumps(error_result, indent=2))
        return 1

if __name__ == "__main__":
    input_file = sys.argv[1]
    operation = sys.argv[2]
    exit_code = process_file(input_file, operation)
    sys.exit(exit_code)
"""
)

Advanced Docker Tool Features

Multi-Stage Tool with Complex Dependencies

# Advanced data science tool
advanced_data_tool = Tool(
    name="ml-model-trainer",
    description="Train and evaluate machine learning models",
    image="python:3.11-slim",
    requirements=[
        "scikit-learn>=1.3.0",
        "pandas>=2.0.0", 
        "numpy>=1.24.0",
        "matplotlib>=3.7.0",
        "seaborn>=0.12.0",
        "joblib>=1.3.0"
    ],
    
    args=[
        Arg("dataset_path", "Path to training dataset", type="str", required=True),
        Arg("model_type", "Type of ML model", type="str", 
            options=["linear_regression", "random_forest", "svm", "neural_network"]),
        Arg("test_split", "Test set split ratio", type="float", default=0.2),
        Arg("cross_validation", "Use cross-validation", type="bool", default=True),
        Arg("output_dir", "Output directory for results", type="str", default="/tmp/ml_results")
    ],
    
    # Custom entrypoint for complex setup
    entrypoint=["python", "-u"],
    
    content="""
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import json
import sys
import os

class ModelTrainer:
    def __init__(self, dataset_path, model_type, test_split, cross_validation, output_dir):
        self.dataset_path = dataset_path
        self.model_type = model_type
        self.test_split = test_split
        self.cross_validation = cross_validation
        self.output_dir = output_dir
        
        # Create output directory
        os.makedirs(output_dir, exist_ok=True)
        
        # Initialize model
        self.models = {
            "linear_regression": LinearRegression(),
            "random_forest": RandomForestRegressor(n_estimators=100, random_state=42),
            "svm": SVR(kernel='rbf'),
            "neural_network": MLPRegressor(hidden_layer_sizes=(100,), random_state=42)
        }
        
        if model_type not in self.models:
            raise ValueError(f"Unsupported model type: {model_type}")
        
        self.model = self.models[model_type]
    
    def load_and_prepare_data(self):
        \"\"\"Load and prepare the dataset\"\"\"
        print(f"Loading dataset from {self.dataset_path}")
        
        if self.dataset_path.endswith('.csv'):
            df = pd.read_csv(self.dataset_path)
        else:
            raise ValueError("Only CSV files are supported")
        
        # Basic data validation
        if df.empty:
            raise ValueError("Dataset is empty")
        
        # Assume last column is target, rest are features
        X = df.iloc[:, :-1]
        y = df.iloc[:, -1]
        
        # Handle categorical variables (simple encoding)
        X_encoded = pd.get_dummies(X, drop_first=True)
        
        print(f"Dataset shape: {df.shape}")
        print(f"Features: {X_encoded.shape[1]}")
        print(f"Target variable: {y.name}")
        
        return X_encoded, y
    
    def train_model(self, X, y):
        \"\"\"Train the model\"\"\"
        print(f"Training {self.model_type} model...")
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=self.test_split, random_state=42
        )
        
        # Train model
        self.model.fit(X_train, y_train)
        
        # Make predictions
        y_pred_train = self.model.predict(X_train)
        y_pred_test = self.model.predict(X_test)
        
        # Calculate metrics
        train_mse = mean_squared_error(y_train, y_pred_train)
        test_mse = mean_squared_error(y_test, y_pred_test)
        train_r2 = r2_score(y_train, y_pred_train)
        test_r2 = r2_score(y_test, y_pred_test)
        
        results = {
            "model_type": self.model_type,
            "training_samples": len(X_train),
            "test_samples": len(X_test),
            "metrics": {
                "train_mse": float(train_mse),
                "test_mse": float(test_mse),
                "train_r2": float(train_r2),
                "test_r2": float(test_r2)
            }
        }
        
        # Cross-validation if requested
        if self.cross_validation:
            cv_scores = cross_val_score(self.model, X, y, cv=5, 
                                      scoring='neg_mean_squared_error')
            results["cross_validation"] = {
                "mean_mse": float(-cv_scores.mean()),
                "std_mse": float(cv_scores.std()),
                "scores": [-float(score) for score in cv_scores]
            }
        
        # Generate visualizations
        self.create_visualizations(y_test, y_pred_test, results)
        
        # Save model
        model_path = os.path.join(self.output_dir, f"{self.model_type}_model.pkl")
        joblib.dump(self.model, model_path)
        results["model_path"] = model_path
        
        return results, X_test, y_test, y_pred_test
    
    def create_visualizations(self, y_true, y_pred, results):
        \"\"\"Create visualization plots\"\"\"
        plt.figure(figsize=(12, 8))
        
        # Prediction vs Actual plot
        plt.subplot(2, 2, 1)
        plt.scatter(y_true, y_pred, alpha=0.6)
        plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
        plt.xlabel('Actual Values')
        plt.ylabel('Predicted Values')
        plt.title(f'{self.model_type.title()} - Predictions vs Actual')
        
        # Residuals plot
        plt.subplot(2, 2, 2)
        residuals = y_true - y_pred
        plt.scatter(y_pred, residuals, alpha=0.6)
        plt.axhline(y=0, color='r', linestyle='--')
        plt.xlabel('Predicted Values')
        plt.ylabel('Residuals')
        plt.title('Residuals Plot')
        
        # Distribution of residuals
        plt.subplot(2, 2, 3)
        plt.hist(residuals, bins=30, alpha=0.7, edgecolor='black')
        plt.xlabel('Residuals')
        plt.ylabel('Frequency')
        plt.title('Distribution of Residuals')
        
        # Metrics summary
        plt.subplot(2, 2, 4)
        plt.text(0.1, 0.8, f"Model: {self.model_type}", fontsize=12, weight='bold')
        plt.text(0.1, 0.7, f"Test R²: {results['metrics']['test_r2']:.4f}", fontsize=10)
        plt.text(0.1, 0.6, f"Test MSE: {results['metrics']['test_mse']:.4f}", fontsize=10)
        plt.text(0.1, 0.5, f"Train R²: {results['metrics']['train_r2']:.4f}", fontsize=10)
        plt.text(0.1, 0.4, f"Train MSE: {results['metrics']['train_mse']:.4f}", fontsize=10)
        
        if 'cross_validation' in results:
            plt.text(0.1, 0.3, f"CV MSE: {results['cross_validation']['mean_mse']:.4f} ± {results['cross_validation']['std_mse']:.4f}", fontsize=10)
        
        plt.xlim(0, 1)
        plt.ylim(0, 1)
        plt.axis('off')
        plt.title('Model Performance Summary')
        
        plt.tight_layout()
        
        # Save plot
        plot_path = os.path.join(self.output_dir, f"{self.model_type}_results.png")
        plt.savefig(plot_path, dpi=300, bbox_inches='tight')
        plt.close()
        
        print(f"Visualizations saved to: {plot_path}")
    
    def run(self):
        \"\"\"Main execution method\"\"\"
        try:
            # Load and prepare data
            X, y = self.load_and_prepare_data()
            
            # Train model
            results, X_test, y_test, y_pred = self.train_model(X, y)
            
            # Save detailed results
            results_path = os.path.join(self.output_dir, f"{self.model_type}_results.json")
            with open(results_path, 'w') as f:
                json.dump(results, f, indent=2)
            
            results["results_path"] = results_path
            
            # Print results
            print("\\n" + "="*50)
            print("MODEL TRAINING COMPLETED")
            print("="*50)
            print(json.dumps(results, indent=2))
            
            return 0
            
        except Exception as e:
            error_result = {
                "error": str(e),
                "model_type": self.model_type,
                "dataset_path": self.dataset_path
            }
            print(json.dumps(error_result, indent=2))
            return 1

if __name__ == "__main__":
    dataset_path = sys.argv[1]
    model_type = sys.argv[2]
    test_split = float(sys.argv[3])
    cross_validation = sys.argv[4].lower() == 'true'
    output_dir = sys.argv[5]
    
    trainer = ModelTrainer(dataset_path, model_type, test_split, cross_validation, output_dir)
    exit_code = trainer.run()
    sys.exit(exit_code)
"""
)

Docker Tool Configuration

Custom Docker Image

# Tool with custom Docker image
custom_image_tool = Tool(
    name="nodejs-app-builder",
    description="Build Node.js applications with custom optimizations",
    image="node:18-alpine",  # Use specific Node.js version
    
    # Install additional system packages
    on_build="""
    apk add --no-cache git python3 make g++
    npm install -g @angular/cli @nestjs/cli typescript
    """,
    
    args=[
        Arg("project_path", "Path to Node.js project", type="str", required=True),
        Arg("build_type", "Type of build", type="str", 
            options=["development", "production"], default="production"),
        Arg("run_tests", "Run tests before build", type="bool", default=True)
    ],
    
    content="""
#!/bin/bash
set -e

PROJECT_PATH="$1"
BUILD_TYPE="$2"
RUN_TESTS="$3"

cd "$PROJECT_PATH"

echo "Building Node.js application..."
echo "Project: $PROJECT_PATH"
echo "Build type: $BUILD_TYPE"
echo "Run tests: $RUN_TESTS"

# Install dependencies
echo "Installing dependencies..."
npm ci

# Run tests if requested
if [ "$RUN_TESTS" = "true" ]; then
    echo "Running tests..."
    npm test
fi

# Build application
echo "Building application..."
if [ "$BUILD_TYPE" = "production" ]; then
    npm run build:prod
else
    npm run build
fi

# Create build summary
cat > build-summary.json << EOF
{
    "project_path": "$PROJECT_PATH",
    "build_type": "$BUILD_TYPE",
    "tests_run": $RUN_TESTS,
    "build_timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
    "build_status": "success"
}
EOF

echo "Build completed successfully!"
cat build-summary.json
"""
)

Tool with Volume Mounts

# Tool that works with persistent volumes
volume_tool = Tool(
    name="database-backup-manager",
    description="Manage database backups with persistent storage",
    image="postgres:15-alpine",
    
    # Define volumes
    with_volumes=[
        Volume(name="backup-storage", path="/backups"),
        Volume(name="config-storage", path="/config")
    ],
    
    args=[
        Arg("action", "Action to perform", type="str",
            options=["backup", "restore", "list", "cleanup"], required=True),
        Arg("database_url", "Database connection URL", type="str", required=True),
        Arg("backup_name", "Name for backup file", type="str", default=""),
        Arg("retention_days", "Days to retain backups", type="int", default=30)
    ],
    
    content="""
#!/bin/bash
set -e

ACTION="$1"
DATABASE_URL="$2"
BACKUP_NAME="$3"
RETENTION_DAYS="$4"

BACKUP_DIR="/backups"
CONFIG_DIR="/config"

# Ensure directories exist
mkdir -p "$BACKUP_DIR" "$CONFIG_DIR"

case "$ACTION" in
    "backup")
        if [ -z "$BACKUP_NAME" ]; then
            BACKUP_NAME="backup_$(date +%Y%m%d_%H%M%S).sql"
        fi
        
        echo "Creating backup: $BACKUP_NAME"
        pg_dump "$DATABASE_URL" > "$BACKUP_DIR/$BACKUP_NAME"
        
        # Compress backup
        gzip "$BACKUP_DIR/$BACKUP_NAME"
        BACKUP_FILE="$BACKUP_DIR/$BACKUP_NAME.gz"
        
        # Save metadata
        cat > "$CONFIG_DIR/last_backup.json" << EOF
{
    "backup_file": "$BACKUP_FILE",
    "database_url": "$DATABASE_URL",
    "timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
    "size_bytes": $(stat -c%s "$BACKUP_FILE")
}
EOF
        
        echo "Backup completed: $BACKUP_FILE"
        cat "$CONFIG_DIR/last_backup.json"
        ;;
        
    "restore")
        if [ -z "$BACKUP_NAME" ]; then
            echo "Error: backup_name required for restore action"
            exit 1
        fi
        
        BACKUP_FILE="$BACKUP_DIR/$BACKUP_NAME"
        if [ ! -f "$BACKUP_FILE" ]; then
            echo "Error: Backup file not found: $BACKUP_FILE"
            exit 1
        fi
        
        echo "Restoring from backup: $BACKUP_FILE"
        
        # Decompress if needed
        if [[ "$BACKUP_FILE" == *.gz ]]; then
            gunzip -c "$BACKUP_FILE" | psql "$DATABASE_URL"
        else
            psql "$DATABASE_URL" < "$BACKUP_FILE"
        fi
        
        echo "Restore completed successfully"
        ;;
        
    "list")
        echo "Available backups:"
        ls -la "$BACKUP_DIR"/*.sql* 2>/dev/null || echo "No backups found"
        ;;
        
    "cleanup")
        echo "Cleaning up backups older than $RETENTION_DAYS days..."
        find "$BACKUP_DIR" -name "*.sql*" -type f -mtime +$RETENTION_DAYS -delete
        echo "Cleanup completed"
        ;;
        
    *)
        echo "Error: Unknown action: $ACTION"
        exit 1
        ;;
esac
"""
)

Docker Tool Security

Secure Tool with Secrets

# Tool that uses secrets securely
secure_tool = Tool(
    name="secure-api-client",
    description="Securely interact with external APIs using secrets",
    image="python:3.11-slim",
    requirements=["requests", "cryptography"],
    
    # Define required secrets
    secrets=["api_key", "client_secret", "encryption_key"],
    
    args=[
        Arg("api_endpoint", "API endpoint URL", type="str", required=True),
        Arg("action", "API action to perform", type="str", required=True),
        Arg("data_payload", "JSON data to send", type="str", default="{}"),
        Arg("encrypt_response", "Encrypt API response", type="bool", default=False)
    ],
    
    content="""
import requests
import json
import os
import sys
from cryptography.fernet import Fernet
import base64

class SecureAPIClient:
    def __init__(self):
        # Load secrets from environment variables
        self.api_key = os.environ.get('API_KEY')
        self.client_secret = os.environ.get('CLIENT_SECRET')
        self.encryption_key = os.environ.get('ENCRYPTION_KEY')
        
        if not all([self.api_key, self.client_secret]):
            raise ValueError("Required secrets (API_KEY, CLIENT_SECRET) not found")
        
        # Initialize encryption if key provided
        self.cipher = None
        if self.encryption_key:
            try:
                self.cipher = Fernet(self.encryption_key.encode())
            except Exception as e:
                print(f"Warning: Invalid encryption key: {e}")
    
    def encrypt_data(self, data):
        \"\"\"Encrypt sensitive data\"\"\"
        if not self.cipher:
            return data
        
        if isinstance(data, str):
            return self.cipher.encrypt(data.encode()).decode()
        return data
    
    def decrypt_data(self, encrypted_data):
        \"\"\"Decrypt data\"\"\"
        if not self.cipher:
            return encrypted_data
        
        try:
            return self.cipher.decrypt(encrypted_data.encode()).decode()
        except Exception:
            return encrypted_data  # Return as-is if decryption fails
    
    def make_request(self, endpoint, action, payload, encrypt_response=False):
        \"\"\"Make secure API request\"\"\"
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json',
            'X-Client-Secret': self.client_secret,
            'User-Agent': 'Kubiya-Secure-Client/1.0'
        }
        
        try:
            # Parse payload
            if isinstance(payload, str):
                payload = json.loads(payload)
            
            # Make request based on action
            if action.upper() == 'GET':
                response = requests.get(endpoint, headers=headers, params=payload, timeout=30)
            elif action.upper() == 'POST':
                response = requests.post(endpoint, headers=headers, json=payload, timeout=30)
            elif action.upper() == 'PUT':
                response = requests.put(endpoint, headers=headers, json=payload, timeout=30)
            elif action.upper() == 'DELETE':
                response = requests.delete(endpoint, headers=headers, timeout=30)
            else:
                raise ValueError(f"Unsupported action: {action}")
            
            # Process response
            result = {
                'status_code': response.status_code,
                'success': response.status_code < 400,
                'headers': dict(response.headers),
                'endpoint': endpoint,
                'action': action.upper()
            }
            
            # Handle response content
            try:
                response_data = response.json()
            except ValueError:
                response_data = response.text
            
            # Encrypt response if requested
            if encrypt_response and self.cipher:
                result['response_data_encrypted'] = self.encrypt_data(json.dumps(response_data))
                result['encrypted'] = True
            else:
                result['response_data'] = response_data
                result['encrypted'] = False
            
            # Add error details if request failed
            if not result['success']:
                result['error_message'] = f"HTTP {response.status_code}: {response.reason}"
            
            return result
            
        except requests.exceptions.Timeout:
            return {
                'success': False,
                'error_message': 'Request timeout',
                'endpoint': endpoint,
                'action': action.upper()
            }
        except requests.exceptions.ConnectionError:
            return {
                'success': False,
                'error_message': 'Connection error',
                'endpoint': endpoint,
                'action': action.upper()
            }
        except Exception as e:
            return {
                'success': False,
                'error_message': str(e),
                'endpoint': endpoint,
                'action': action.upper()
            }

def main():
    try:
        api_endpoint = sys.argv[1]
        action = sys.argv[2]
        data_payload = sys.argv[3]
        encrypt_response = sys.argv[4].lower() == 'true'
        
        client = SecureAPIClient()
        result = client.make_request(api_endpoint, action, data_payload, encrypt_response)
        
        print(json.dumps(result, indent=2))
        
        # Exit with error code if request failed
        return 0 if result.get('success', False) else 1
        
    except Exception as e:
        error_result = {
            'success': False,
            'error_message': str(e),
            'error_type': 'client_error'
        }
        print(json.dumps(error_result, indent=2))
        return 1

if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)
"""
)

Docker Tool Performance

Optimized Tool with Caching

# Performance-optimized tool with caching
optimized_tool = Tool(
    name="performance-analyzer",
    description="High-performance log analysis with caching",
    image="python:3.11-slim",
    requirements=["pandas", "numpy", "redis", "matplotlib"],
    
    # Service dependencies
    with_services=[
        ServiceSpec(
            name="redis-cache",
            image="redis:7-alpine",
            exposed_ports=[6379],
            env={"REDIS_PASSWORD": "cache_password"}
        )
    ],
    
    args=[
        Arg("log_file", "Path to log file", type="str", required=True),
        Arg("analysis_type", "Type of analysis", type="str", 
            options=["error_rate", "performance", "traffic"], default="error_rate"),
        Arg("time_window", "Time window in hours", type="int", default=24),
        Arg("use_cache", "Use Redis caching", type="bool", default=True),
        Arg("cache_ttl", "Cache TTL in seconds", type="int", default=3600)
    ],
    
    content="""
import pandas as pd
import numpy as np
import redis
import json
import sys
import hashlib
import time
from datetime import datetime, timedelta
import matplotlib.pyplot as plt

class PerformanceAnalyzer:
    def __init__(self, use_cache=True, cache_ttl=3600):
        self.use_cache = use_cache
        self.cache_ttl = cache_ttl
        self.redis_client = None
        
        if use_cache:
            try:
                # Connect to Redis service
                self.redis_client = redis.Redis(
                    host='redis-cache-svc',  # Service name from Kubernetes
                    port=6379,
                    password='cache_password',
                    decode_responses=True
                )
                # Test connection
                self.redis_client.ping()
                print("Connected to Redis cache")
            except Exception as e:
                print(f"Warning: Could not connect to Redis: {e}")
                self.use_cache = False
    
    def get_cache_key(self, *args):
        \"\"\"Generate cache key from arguments\"\"\"
        cache_string = '|'.join(str(arg) for arg in args)
        return hashlib.md5(cache_string.encode()).hexdigest()
    
    def get_cached_result(self, cache_key):
        \"\"\"Get result from cache\"\"\"
        if not self.use_cache or not self.redis_client:
            return None
        
        try:
            cached_data = self.redis_client.get(cache_key)
            if cached_data:
                return json.loads(cached_data)
        except Exception as e:
            print(f"Cache read error: {e}")
        
        return None
    
    def cache_result(self, cache_key, result):
        \"\"\"Store result in cache\"\"\"
        if not self.use_cache or not self.redis_client:
            return
        
        try:
            self.redis_client.setex(
                cache_key,
                self.cache_ttl,
                json.dumps(result, default=str)
            )
        except Exception as e:
            print(f"Cache write error: {e}")
    
    def parse_log_file(self, log_file, time_window):
        \"\"\"Parse log file efficiently\"\"\"
        print(f"Parsing log file: {log_file}")
        
        # Check cache first
        cache_key = self.get_cache_key('parse_log', log_file, time_window)
        cached_result = self.get_cached_result(cache_key)
        if cached_result:
            print("Using cached log parsing result")
            return pd.DataFrame(cached_result)
        
        # Define log parsing patterns
        log_patterns = {
            'timestamp': r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})',
            'level': r'(ERROR|WARN|INFO|DEBUG)',
            'response_time': r'response_time=(\d+)ms',
            'status_code': r'status=(\d{3})',
            'endpoint': r'endpoint=([^\s]+)'
        }
        
        entries = []
        cutoff_time = datetime.now() - timedelta(hours=time_window)
        
        with open(log_file, 'r') as f:
            for line_num, line in enumerate(f):
                try:
                    entry = {}
                    
                    # Extract timestamp
                    import re
                    timestamp_match = re.search(log_patterns['timestamp'], line)
                    if timestamp_match:
                        timestamp = datetime.fromisoformat(timestamp_match.group(1))
                        if timestamp < cutoff_time:
                            continue  # Skip old entries
                        entry['timestamp'] = timestamp
                    else:
                        continue  # Skip lines without timestamp
                    
                    # Extract other fields
                    for field, pattern in log_patterns.items():
                        if field == 'timestamp':
                            continue
                        
                        match = re.search(pattern, line)
                        if match:
                            value = match.group(1)
                            if field in ['response_time', 'status_code']:
                                entry[field] = int(value)
                            else:
                                entry[field] = value
                    
                    if entry:
                        entries.append(entry)
                    
                except Exception as e:
                    print(f"Error parsing line {line_num}: {e}")
                    continue
        
        df = pd.DataFrame(entries)
        
        # Cache the parsed data
        self.cache_result(cache_key, df.to_dict('records'))
        
        print(f"Parsed {len(df)} log entries")
        return df
    
    def analyze_error_rate(self, df):
        \"\"\"Analyze error rates\"\"\"
        if df.empty:
            return {"error": "No data to analyze"}
        
        # Group by hour
        df['hour'] = df['timestamp'].dt.floor('H')
        
        hourly_stats = df.groupby('hour').agg({
            'status_code': ['count', lambda x: sum(x >= 400)],
            'response_time': ['mean', 'max', 'std']
        }).round(2)
        
        hourly_stats.columns = ['total_requests', 'error_count', 'avg_response_time', 'max_response_time', 'response_time_std']
        hourly_stats['error_rate'] = (hourly_stats['error_count'] / hourly_stats['total_requests'] * 100).round(2)
        
        # Overall statistics
        total_requests = len(df)
        total_errors = len(df[df['status_code'] >= 400])
        overall_error_rate = (total_errors / total_requests * 100) if total_requests > 0 else 0
        
        result = {
            'analysis_type': 'error_rate',
            'total_requests': total_requests,
            'total_errors': total_errors,
            'overall_error_rate': round(overall_error_rate, 2),
            'hourly_breakdown': hourly_stats.to_dict('index'),
            'time_range': {
                'start': df['timestamp'].min().isoformat(),
                'end': df['timestamp'].max().isoformat()
            }
        }
        
        return result
    
    def analyze_performance(self, df):
        \"\"\"Analyze performance metrics\"\"\"
        if df.empty or 'response_time' not in df.columns:
            return {"error": "No performance data available"}
        
        # Performance percentiles
        percentiles = df['response_time'].quantile([0.5, 0.75, 0.90, 0.95, 0.99]).round(2)
        
        # Group by endpoint
        endpoint_stats = df.groupby('endpoint')['response_time'].agg([
            'count', 'mean', 'median', 'max', 'std'
        ]).round(2)
        
        # Slow requests (>1000ms)
        slow_requests = df[df['response_time'] > 1000]
        
        result = {
            'analysis_type': 'performance',
            'response_time_percentiles': {
                'p50': percentiles[0.5],
                'p75': percentiles[0.75],
                'p90': percentiles[0.90],
                'p95': percentiles[0.95],
                'p99': percentiles[0.99]
            },
            'endpoint_performance': endpoint_stats.to_dict('index'),
            'slow_requests_count': len(slow_requests),
            'avg_response_time': df['response_time'].mean().round(2),
            'max_response_time': df['response_time'].max()
        }
        
        return result
    
    def analyze_traffic(self, df):
        \"\"\"Analyze traffic patterns\"\"\"
        if df.empty:
            return {"error": "No traffic data available"}
        
        # Requests per hour
        hourly_traffic = df.groupby(df['timestamp'].dt.floor('H')).size()
        
        # Peak traffic analysis
        peak_hour = hourly_traffic.idxmax()
        peak_requests = hourly_traffic.max()
        
        # Traffic by endpoint
        endpoint_traffic = df['endpoint'].value_counts().head(10)
        
        result = {
            'analysis_type': 'traffic',
            'total_requests': len(df),
            'requests_per_hour': hourly_traffic.to_dict(),
            'peak_traffic': {
                'hour': peak_hour.isoformat(),
                'requests': int(peak_requests)
            },
            'top_endpoints': endpoint_traffic.to_dict(),
            'avg_requests_per_hour': hourly_traffic.mean().round(2)
        }
        
        return result
    
    def create_visualizations(self, df, analysis_result):
        \"\"\"Create performance visualizations\"\"\"
        if df.empty:
            return
        
        plt.figure(figsize=(15, 10))
        
        # Response time distribution
        plt.subplot(2, 3, 1)
        df['response_time'].hist(bins=50, alpha=0.7, edgecolor='black')
        plt.xlabel('Response Time (ms)')
        plt.ylabel('Frequency')
        plt.title('Response Time Distribution')
        
        # Error rate over time
        plt.subplot(2, 3, 2)
        if 'hourly_breakdown' in analysis_result:
            hours = list(analysis_result['hourly_breakdown'].keys())
            error_rates = [analysis_result['hourly_breakdown'][h]['error_rate'] for h in hours]
            plt.plot(hours, error_rates, marker='o')
            plt.xlabel('Hour')
            plt.ylabel('Error Rate (%)')
            plt.title('Error Rate Over Time')
            plt.xticks(rotation=45)
        
        # Traffic over time
        plt.subplot(2, 3, 3)
        hourly_counts = df.groupby(df['timestamp'].dt.floor('H')).size()
        hourly_counts.plot(kind='line', marker='o')
        plt.xlabel('Hour')
        plt.ylabel('Requests')
        plt.title('Traffic Over Time')
        plt.xticks(rotation=45)
        
        # Status code distribution
        plt.subplot(2, 3, 4)
        status_counts = df['status_code'].value_counts()
        status_counts.plot(kind='bar')
        plt.xlabel('Status Code')
        plt.ylabel('Count')
        plt.title('Status Code Distribution')
        
        # Top endpoints by traffic
        plt.subplot(2, 3, 5)
        if 'endpoint' in df.columns:
            top_endpoints = df['endpoint'].value_counts().head(10)
            top_endpoints.plot(kind='barh')
            plt.xlabel('Request Count')
            plt.title('Top Endpoints by Traffic')
        
        # Response time box plot by endpoint
        plt.subplot(2, 3, 6)
        if 'endpoint' in df.columns and 'response_time' in df.columns:
            top_5_endpoints = df['endpoint'].value_counts().head(5).index
            endpoint_response_times = [df[df['endpoint'] == ep]['response_time'] for ep in top_5_endpoints]
            plt.boxplot(endpoint_response_times, labels=top_5_endpoints)
            plt.xlabel('Endpoint')
            plt.ylabel('Response Time (ms)')
            plt.title('Response Time by Endpoint')
            plt.xticks(rotation=45)
        
        plt.tight_layout()
        
        # Save visualization
        plot_path = '/tmp/performance_analysis.png'
        plt.savefig(plot_path, dpi=300, bbox_inches='tight')
        plt.close()
        
        print(f"Visualization saved to: {plot_path}")
        return plot_path
    
    def analyze(self, log_file, analysis_type, time_window):
        \"\"\"Main analysis method\"\"\"
        start_time = time.time()
        
        # Parse log file
        df = self.parse_log_file(log_file, time_window)
        
        # Perform analysis based on type
        if analysis_type == 'error_rate':
            result = self.analyze_error_rate(df)
        elif analysis_type == 'performance':
            result = self.analyze_performance(df)
        elif analysis_type == 'traffic':
            result = self.analyze_traffic(df)
        else:
            result = {
                'error': f'Unknown analysis type: {analysis_type}',
                'available_types': ['error_rate', 'performance', 'traffic']
            }
        
        # Add metadata
        result['analysis_duration'] = round(time.time() - start_time, 2)
        result['cache_used'] = self.use_cache
        result['log_file'] = log_file
        
        # Create visualizations
        if not result.get('error'):
            plot_path = self.create_visualizations(df, result)
            result['visualization_path'] = plot_path
        
        return result

def main():
    try:
        log_file = sys.argv[1]
        analysis_type = sys.argv[2]
        time_window = int(sys.argv[3])
        use_cache = sys.argv[4].lower() == 'true'
        cache_ttl = int(sys.argv[5])
        
        analyzer = PerformanceAnalyzer(use_cache, cache_ttl)
        result = analyzer.analyze(log_file, analysis_type, time_window)
        
        print(json.dumps(result, indent=2, default=str))
        
        return 0 if not result.get('error') else 1
        
    except Exception as e:
        error_result = {
            'error': str(e),
            'error_type': 'analyzer_error'
        }
        print(json.dumps(error_result, indent=2))
        return 1

if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)
"""
)

Best Practices for Docker Tools

Efficient Resource Usage

# Resource-efficient tool
efficient_tool = Tool(
    name="efficient-processor",
    description="Memory and CPU efficient data processing",
    image="python:3.11-slim",
    requirements=["pandas", "pyarrow"],  # Use efficient libraries
    
    # Resource limits
    env=["OMP_NUM_THREADS=2", "PANDAS_MAX_MEMORY=1GB"],
    
    content="""
import pandas as pd
import sys
import gc
import os

def process_large_file_efficiently(file_path, chunk_size=10000):
    \"\"\"Process large files in chunks to manage memory\"\"\"
    results = []
    
    # Process in chunks
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        # Process chunk
        processed_chunk = chunk.groupby('category').agg({
            'value': ['sum', 'mean', 'count']
        })
        
        results.append(processed_chunk)
        
        # Force garbage collection
        del chunk
        gc.collect()
    
    # Combine results
    final_result = pd.concat(results).groupby(level=0).sum()
    
    return final_result

# Main processing
if __name__ == "__main__":
    file_path = sys.argv[1]
    result = process_large_file_efficiently(file_path)
    result.to_csv('/tmp/output.csv')
    print(f"Processing complete. Output saved to /tmp/output.csv")
"""
)

Error Handling and Logging

# Tool with comprehensive error handling
robust_tool = Tool(
    name="robust-data-processor",
    description="Data processor with comprehensive error handling",
    image="python:3.11-slim",
    requirements=["pandas", "logging"],
    
    content="""
import pandas as pd
import logging
import sys
import json
from datetime import datetime

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/tmp/processor.log'),
        logging.StreamHandler(sys.stdout)
    ]
)

logger = logging.getLogger(__name__)

class DataProcessor:
    def __init__(self, input_file):
        self.input_file = input_file
        self.start_time = datetime.now()
        
    def validate_input(self):
        \"\"\"Validate input file\"\"\"
        try:
            if not os.path.exists(self.input_file):
                raise FileNotFoundError(f"Input file not found: {self.input_file}")
            
            file_size = os.path.getsize(self.input_file)
            logger.info(f"Input file size: {file_size} bytes")
            
            if file_size == 0:
                raise ValueError("Input file is empty")
            
            return True
            
        except Exception as e:
            logger.error(f"Input validation failed: {e}")
            raise
    
    def process_data(self):
        \"\"\"Main data processing with error handling\"\"\"
        try:
            logger.info(f"Starting data processing for {self.input_file}")
            
            # Read data with error handling
            try:
                df = pd.read_csv(self.input_file)
                logger.info(f"Successfully loaded {len(df)} rows")
            except pd.errors.EmptyDataError:
                raise ValueError("CSV file is empty or invalid")
            except pd.errors.ParserError as e:
                raise ValueError(f"CSV parsing error: {e}")
            
            # Data validation
            if df.empty:
                raise ValueError("DataFrame is empty after loading")
            
            # Process data with error handling
            results = {}
            
            try:
                # Basic statistics
                results['row_count'] = len(df)
                results['column_count'] = len(df.columns)
                results['columns'] = list(df.columns)
                
                # Data quality checks
                results['null_values'] = df.isnull().sum().to_dict()
                results['duplicate_rows'] = int(df.duplicated().sum())
                
                # Numeric column analysis
                numeric_cols = df.select_dtypes(include=['number']).columns
                if len(numeric_cols) > 0:
                    results['numeric_summary'] = df[numeric_cols].describe().to_dict()
                
                logger.info("Data processing completed successfully")
                
            except Exception as e:
                logger.error(f"Data processing error: {e}")
                raise
            
            # Add metadata
            results['processing_time'] = (datetime.now() - self.start_time).total_seconds()
            results['status'] = 'success'
            results['timestamp'] = datetime.now().isoformat()
            
            return results
            
        except Exception as e:
            logger.error(f"Fatal error during processing: {e}")
            return {
                'status': 'error',
                'error_message': str(e),
                'processing_time': (datetime.now() - self.start_time).total_seconds(),
                'timestamp': datetime.now().isoformat()
            }

def main():
    try:
        input_file = sys.argv[1]
        
        processor = DataProcessor(input_file)
        processor.validate_input()
        result = processor.process_data()
        
        # Output results
        print(json.dumps(result, indent=2))
        
        return 0 if result['status'] == 'success' else 1
        
    except Exception as e:
        error_result = {
            'status': 'error',
            'error_message': str(e),
            'error_type': 'initialization_error'
        }
        print(json.dumps(error_result, indent=2))
        return 1

if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)
"""
)
Docker tools provide the most flexibility and power for creating custom automation tools in Kubiya. They offer complete control over the execution environment, dependencies, and can handle complex processing requirements while maintaining security and isolation.