Audit Service Overview

The Kubiya Audit service provides a comprehensive interface for monitoring, querying, and analyzing audit logs within the Kubiya platform. It enables you to track actions, monitor system activity, and investigate security events with real-time streaming support and powerful search capabilities.

Features

  • Comprehensive Logging: Track all actions and events across the platform
  • Advanced Filtering: Filter by category, resource type, action type, session, and time ranges
  • Real-time Streaming: Stream audit logs in real-time with configurable polling
  • Text Search: Search through audit logs with full-text search capabilities
  • Status Filtering: Filter events by success/failure status
  • Time-based Queries: Query logs within specific time ranges using RFC3339 format
  • Pagination Support: Handle large result sets with built-in pagination
  • Detailed Event Information: Get comprehensive details about specific audit events

Core Components

AuditService

The main service class provides four core operations:
  • list(): Query audit logs with filtering and pagination
  • stream(): Stream real-time audit logs with polling
  • describe(): Get detailed information about a specific audit event
  • search(): Search audit logs with text queries and advanced filtering

Quick Start

Basic Usage

from kubiya_workflow_sdk import KubiyaClient
from kubiya_workflow_sdk.kubiya_services.exceptions import AuditError

# Initialize client
client = KubiyaClient(
    api_key="your-api-key",
    base_url="https://api.kubiya.ai"
)

try:
    # List recent audit logs (last 24 hours)
    recent_logs = client.audit.list()
    print(f"Found {len(recent_logs.get('items', []))} recent audit events")
    
    # List logs from a specific category
    agent_logs = client.audit.list(
        category_type="agents",
        page_size=10
    )
    
    # Search for specific text in logs
    search_results = client.audit.search(
        text="authentication",
        category_type="users"
    )
    
    print(f"Found {len(search_results.get('items', []))} authentication events")
    
except AuditError as e:
    print(f"Audit query failed: {e}")

Time-based Filtering

Query audit logs within specific time ranges:
from datetime import datetime, timedelta, UTC

# Last 6 hours
start_time = (datetime.now(UTC) - timedelta(hours=6)).strftime('%Y-%m-%dT%H:%M:%SZ')
recent_logs = client.audit.list(
    start_time=start_time,
    sort_direction="desc"
)

# Specific date range
start_time = "2024-01-01T00:00:00Z"
end_time = "2024-01-02T00:00:00Z"
daily_logs = client.audit.list(
    start_time=start_time,
    end_time=end_time,
    page_size=100
)

print(f"Events from Jan 1st: {len(daily_logs.get('items', []))}")

Advanced Filtering

Filter audit logs by multiple criteria:
# Filter by category and action type
workflow_executions = client.audit.list(
    category_type="workflows",
    action_type="execute",
    start_time="2024-01-01T00:00:00Z"
)

# Filter by resource and session
session_logs = client.audit.list(
    resource_type="agent",
    session_id="session-123",
    page_size=50
)

# Combine multiple filters
complex_query = client.audit.list(
    category_type="agents",
    category_name="my-agent",
    action_type="invoke",
    start_time="2024-01-01T00:00:00Z",
    end_time="2024-01-02T00:00:00Z",
    sort_direction="asc"
)

Real-time Streaming

Stream audit logs in real-time to monitor ongoing activity:
import time

try:
    print("Starting real-time audit log streaming...")
    
    # Stream all new audit events
    for audit_event in client.audit.stream(verbose=True):
        timestamp = audit_event.get('timestamp', 'Unknown')
        category = audit_event.get('category_type', 'Unknown')
        action = audit_event.get('action_type', 'Unknown')
        
        print(f"[{timestamp}] {category}: {action}")
        
        # Check for specific events
        if audit_event.get('action_successful') is False:
            print(f"⚠️  Failed action detected: {audit_event}")
        
except KeyboardInterrupt:
    print("Streaming stopped by user")
except AuditError as e:
    print(f"Streaming failed: {e}")

Filtered Real-time Streaming

Stream only specific types of events:
# Stream only agent-related events
for event in client.audit.stream(
    category_type="agents",
    timeout_minutes=30,
    verbose=False
):
    print(f"Agent event: {event.get('action_type')} - {event.get('category_name')}")

# Stream failed events only
for event in client.audit.stream(
    start_time="2024-01-01T12:00:00Z",
    timeout_minutes=10
):
    if event.get('action_successful') is False:
        print(f"Failed event: {event}")
Search through audit logs with text queries:
# Search for error-related events
error_events = client.audit.search(
    text="error",
    start_time="2024-01-01T00:00:00Z",
    page_size=20
)

# Search with status filter
failed_auth = client.audit.search(
    text="authentication",
    status="failed",
    category_type="users"
)

# Complex search query
complex_search = client.audit.search(
    text="timeout",
    category_type="workflows",
    action_type="execute",
    status="failed",
    start_time="2024-01-01T00:00:00Z"
)

print(f"Search summary: {complex_search.get('search_summary', {})}")

Event Details

Get comprehensive information about specific audit events:
# Get event details by ID
try:
    event_details = client.audit.describe("audit-event-id-123")
    
    print(f"Event timestamp: {event_details.get('timestamp')}")
    print(f"Category: {event_details.get('category_type')}")
    print(f"Action: {event_details.get('action_type')}")
    print(f"Success: {event_details.get('action_successful')}")
    
    # Check for additional metadata
    if 'metadata' in event_details:
        print(f"Metadata: {event_details['metadata']}")
    
except AuditError as e:
    print(f"Could not retrieve event details: {e}")

Error Handling

The Audit service provides specialized exceptions for different failure scenarios:

AuditError

Base exception for general audit service errors:
try:
    logs = client.audit.list(start_time="invalid-time-format")
except AuditError as e:
    print(f"Audit operation failed: {e}")
    
    # Check if it's a validation error
    if "time format" in str(e).lower():
        print("Use RFC3339 format: 2024-01-01T00:00:00Z")

Best Practices

1. Use Appropriate Time Ranges

# Good: Specific time range
logs = client.audit.list(
    start_time="2024-01-01T00:00:00Z",
    end_time="2024-01-01T23:59:59Z"
)

# Better: Use relative time for recent data
from datetime import datetime, timedelta, UTC

start_time = (datetime.now(UTC) - timedelta(hours=1)).strftime('%Y-%m-%dT%H:%M:%SZ')
recent_logs = client.audit.list(start_time=start_time)

2. Handle Large Result Sets

# Use pagination for large queries
page = 1
all_events = []

while True:
    result = client.audit.list(
        start_time="2024-01-01T00:00:00Z",
        page=page,
        page_size=100
    )
    
    events = result.get('items', [])
    if not events:
        break
        
    all_events.extend(events)
    page += 1
    
    # Prevent infinite loops
    if page > 100:  # Max 10,000 events
        break

print(f"Total events collected: {len(all_events)}")

3. Efficient Streaming

# Stream with timeout to prevent indefinite running
try:
    event_count = 0
    for event in client.audit.stream(
        timeout_minutes=30,
        verbose=False
    ):
        event_count += 1
        
        # Process event
        if event.get('action_successful') is False:
            print(f"Failed event #{event_count}: {event}")
        
        # Periodic status updates
        if event_count % 100 == 0:
            print(f"Processed {event_count} events...")
            
except KeyboardInterrupt:
    print(f"Streaming stopped. Processed {event_count} events.")

4. Combine Filters for Efficiency

# Efficient: Use multiple filters to reduce result set
specific_logs = client.audit.list(
    category_type="agents",
    action_type="invoke",
    start_time="2024-01-01T00:00:00Z",
    page_size=50
)

# Less efficient: Query all and filter in code
all_logs = client.audit.list(start_time="2024-01-01T00:00:00Z")
filtered_logs = [
    log for log in all_logs.get('items', [])
    if log.get('category_type') == 'agents' and log.get('action_type') == 'invoke'
]

5. Error Recovery in Streaming

import time

max_retries = 3
retry_count = 0

while retry_count < max_retries:
    try:
        for event in client.audit.stream(timeout_minutes=60):
            print(f"Event: {event.get('action_type')}")
        break  # Exit retry loop on success
        
    except AuditError as e:
        retry_count += 1
        print(f"Streaming failed (attempt {retry_count}/{max_retries}): {e}")
        
        if retry_count < max_retries:
            print("Retrying in 10 seconds...")
            time.sleep(10)
        else:
            print("Max retries reached. Streaming failed.")

Integration Examples

The Audit service integrates seamlessly with other Kubiya services for comprehensive monitoring workflows:

Monitor Agent Activities

# Track all agent invocations and their success rates
agent_logs = client.audit.list(
    category_type="agents",
    action_type="invoke",
    start_time="2024-01-01T00:00:00Z"
)

successful = sum(1 for log in agent_logs.get('items', []) if log.get('action_successful'))
total = len(agent_logs.get('items', []))
success_rate = (successful / total * 100) if total > 0 else 0

print(f"Agent success rate: {success_rate:.1f}% ({successful}/{total})")

Security Monitoring

# Monitor failed authentication attempts
failed_auth = client.audit.search(
    text="authentication",
    status="failed",
    start_time="2024-01-01T00:00:00Z"
)

# Alert on suspicious activity
if len(failed_auth.get('items', [])) > 10:
    print("⚠️  High number of failed authentication attempts detected!")

Next Steps

  • Review the API Reference for detailed method documentation
  • Explore the examples directory for complete working examples
  • Check out integration patterns with other Kubiya services