Policies Service API Reference

Complete reference documentation for all classes, methods, and exceptions in the Kubiya Policies service.

Classes

PolicyService

Main service class for managing OPA policies and access control.
class PolicyService(BaseService):
    """Service for managing OPA policies"""

Methods

list() -> Union[List[Dict[str, Any]], str]
List all OPA policies in the system. Parameters:
  • None
Returns:
  • Union[List[Dict[str, Any]], str]: List of policy objects or JSON string
Example:
try:
    policies = client.policies.list()
    
    print(f"Found {len(policies)} policies:")
    for policy in policies:
        name = policy.get("name", "Unknown")
        envs = policy.get("env", [])
        print(f"- {name}: {', '.join(envs) if envs else 'No environments'}")
        
except PolicyError as e:
    print(f"Failed to list policies: {e}")

get(policy_name: str) -> Union[Dict[str, Any], str]
Retrieve details for a specific policy. Parameters:
  • policy_name (str): Name of the policy to retrieve
Returns:
  • Union[Dict[str, Any], str]: Policy details or JSON string
Raises:
  • PolicyError: If policy is not found or request fails
Example:
try:
    policy = client.policies.get("admin-access-policy")
    
    print(f"Policy: {policy['name']}")
    print(f"Environments: {policy.get('env', [])}")
    print(f"Content length: {len(policy.get('policy', ''))} characters")
    
except PolicyError as e:
    print(f"Failed to get policy: {e}")

create(name: str, policy: Optional[str] = None, file: Optional[str] = None, env: Optional[List[str]] = None, validate: bool = True) -> Dict[str, Any]
Create a new OPA policy with optional validation. Parameters:
  • name (str): Policy name
  • policy (Optional[str]): Policy content directly as string
  • file (Optional[str]): Policy file path (alternative to policy parameter)
  • env (Optional[List[str]]): Target environments (list of environment names)
  • validate (bool): Validate policy before creating (default: True)
Returns:
  • Dict[str, Any]: Created policy details
Raises:
  • PolicyValidationError: If policy validation fails when validate=True
  • PolicyError: If either policy or file parameter is missing, or file cannot be read
Example:
# Create with inline policy content
policy_content = '''
package tools

import future.keywords.if

allow if {
    input.user.role == "admin"
}

allow if {
    input.user.role == "user"
    input.tool_name in ["read-tool", "list-tool"]
}
'''

try:
    result = client.policies.create(
        name="basic-tool-access",
        policy=policy_content,
        env=["production", "staging"],
        validate=True
    )
    
    print(f"Policy created: {result['name']}")
    print(f"Status: {result.get('status')}")
    
except PolicyValidationError as e:
    print(f"Policy validation failed: {e}")
    
except PolicyError as e:
    print(f"Policy creation failed: {e}")

# Create from file
try:
    result = client.policies.create(
        name="file-based-policy",
        file="./policies/access-control.rego",
        env=["production"],
        validate=True
    )
    
    print(f"Policy created from file: {result['name']}")
    
except PolicyError as e:
    print(f"Failed to create policy from file: {e}")

update(policy_name: str, policy: Optional[str] = None, file: Optional[str] = None, env: Optional[List[str]] = None, validate: bool = True) -> Dict[str, Any]
Update an existing OPA policy. Parameters:
  • policy_name (str): Name of the policy to update
  • policy (Optional[str]): New policy content directly as string
  • file (Optional[str]): Policy file path (alternative to policy parameter)
  • env (Optional[List[str]]): Target environments (list of environment names)
  • validate (bool): Validate policy before updating (default: True)
Returns:
  • Dict[str, Any]: Updated policy details
Raises:
  • PolicyValidationError: If policy validation fails when validate=True
  • PolicyError: If policy doesn’t exist or update fails
Example:
# Update policy content and environments
updated_policy = '''
package tools

import future.keywords.if

# Enhanced policy with time restrictions
allow if {
    input.user.role == "admin"
}

allow if {
    input.user.role == "user"
    input.tool_name in ["read-tool", "list-tool"]
    input.time.hour >= 9
    input.time.hour <= 17
}
'''

try:
    result = client.policies.update(
        policy_name="basic-tool-access",
        policy=updated_policy,
        env=["production", "staging", "development"],
        validate=True
    )
    
    print(f"Policy updated: {result['name']}")
    print(f"New environments: {result.get('env', [])}")
    
except PolicyValidationError as e:
    print(f"Policy validation failed: {e}")
    
except PolicyError as e:
    print(f"Policy update failed: {e}")

delete(policy_name: str, confirm: bool = False) -> Dict[str, Any]
Delete an OPA policy with confirmation requirement. Parameters:
  • policy_name (str): Name of the policy to delete
  • confirm (bool): Confirm deletion (default: False, must be True to proceed)
Returns:
  • Dict[str, Any]: Deletion status
Raises:
  • PolicyError: If confirmation is False or policy deletion fails
Example:
try:
    result = client.policies.delete(
        policy_name="old-policy",
        confirm=True  # Required for safety
    )
    
    print(f"Policy deleted: {result['name']}")
    print(f"Deleted at: {result.get('deleted_at')}")
    
except PolicyError as e:
    print(f"Deletion failed: {e}")

# This will fail - confirmation required
try:
    client.policies.delete("policy-name", confirm=False)
except PolicyError as e:
    print(f"Expected error: {e}")  # "Deletion not confirmed"

validate(name: str, policy: Optional[str] = None, file: Optional[str] = None, env: Optional[List[str]] = None) -> Dict[str, Any]
Validate an OPA policy for syntax and logical correctness. Parameters:
  • name (str): Policy name for validation context
  • policy (Optional[str]): Policy content directly as string
  • file (Optional[str]): Policy file path (alternative to policy parameter)
  • env (Optional[List[str]]): Target environments for validation context
Returns:
  • Dict[str, Any]: Validation result with ‘valid’ boolean and ‘errors’ list
Raises:
  • PolicyError: If neither policy nor file parameter is provided, or file cannot be read
Example:
# Validate before creating
policy_content = '''
package tools

allow {
    input.user.role == "admin"
    # Missing closing brace - syntax error
'''

try:
    validation_result = client.policies.validate(
        name="test-policy",
        policy=policy_content,
        env=["staging"]
    )
    
    if validation_result.get("valid"):
        print("✅ Policy is valid!")
        
        # Proceed with creation
        client.policies.create(
            name="test-policy",
            policy=policy_content,
            env=["staging"],
            validate=False  # Skip validation since we already checked
        )
    else:
        print("❌ Policy validation failed:")
        for error in validation_result.get("errors", []):
            print(f"  - {error}")
            
        warnings = validation_result.get("warnings", [])
        if warnings:
            print("⚠️ Warnings:")
            for warning in warnings:
                print(f"  - {warning}")
        
except PolicyError as e:
    print(f"Validation request failed: {e}")

evaluate(policy: Optional[str] = None, policy_file: Optional[str] = None, input: Optional[Union[Dict[str, Any], str]] = None, input_file: Optional[str] = None, data: Optional[Union[Dict[str, Any], str]] = None, data_file: Optional[str] = None, query: str = "data") -> Dict[str, Any]
Evaluate a policy with input data and optional additional data context. Parameters:
  • policy (Optional[str]): Policy content directly as string
  • policy_file (Optional[str]): Policy file path (alternative to policy parameter)
  • input (Optional[Union[Dict[str, Any], str]]): Input JSON (dict or JSON string)
  • input_file (Optional[str]): Input JSON file path
  • data (Optional[Union[Dict[str, Any], str]]): Additional data JSON (dict or JSON string)
  • data_file (Optional[str]): Additional data JSON file path
  • query (str): Query string to evaluate (default: “data”)
Returns:
  • Dict[str, Any]: Evaluation result with ‘result’ field and optional ‘error’ field
Raises:
  • PolicyError: If policy/file cannot be read or JSON parsing fails
Example:
# Basic policy evaluation
policy_content = '''
package tools

import future.keywords.if

allow if {
    input.user.role == "admin"
}

allow if {
    input.user.role == "user"
    input.tool_name in data.allowed_tools
}
'''

# Test input
test_input = {
    "user": {"role": "user", "name": "john"},
    "tool_name": "kubectl"
}

# Additional data context
test_data = {
    "allowed_tools": ["kubectl", "helm", "docker"]
}

try:
    result = client.policies.evaluate(
        policy=policy_content,
        input=test_input,
        data=test_data,
        query="data.tools.allow"
    )
    
    if result.get("error"):
        print(f"❌ Evaluation error: {result['error']}")
    else:
        evaluation_result = result.get("result", False)
        if evaluation_result:
            print("✅ Access granted!")
        else:
            print("❌ Access denied!")
            
    print(f"Query: {result.get('query')}")
    
except PolicyError as e:
    print(f"Evaluation failed: {e}")

# Evaluate with files
try:
    result = client.policies.evaluate(
        policy_file="./policies/complex-policy.rego",
        input_file="./test-data/input.json",
        data_file="./test-data/data.json",
        query="data.complex.access.allow"
    )
    
    print(f"File-based evaluation result: {result}")
    
except PolicyError as e:
    print(f"File-based evaluation failed: {e}")

test(tool_name: Optional[str] = None, workflow_file: Optional[str] = None, args: Optional[Union[Dict[str, Any], str]] = None, args_file: Optional[str] = None, params: Optional[Union[Dict[str, Any], str]] = None, params_file: Optional[str] = None, runner: str = "default") -> Dict[str, Any]
Test tool or workflow execution permission against configured policies. Parameters:
  • tool_name (Optional[str]): Tool name (for tool testing)
  • workflow_file (Optional[str]): Workflow definition file (for workflow testing)
  • args (Optional[Union[Dict[str, Any], str]]): Tool arguments (dict or JSON string)
  • args_file (Optional[str]): Tool arguments JSON file
  • params (Optional[Union[Dict[str, Any], str]]): Workflow parameters (dict or JSON string)
  • params_file (Optional[str]): Workflow parameters JSON file
  • runner (str): Runner name (default: “default”)
Returns:
  • Dict[str, Any]: Test result with ‘allowed’ boolean and optional ‘message’ or ‘issues’
Raises:
  • PolicyError: If neither tool_name nor workflow_file is provided, or file reading fails
Example:
# Test tool execution permission
try:
    permission_result = client.policies.test(
        tool_name="kubectl",
        args={
            "namespace": "production",
            "action": "get pods"
        },
        runner="production-runner"
    )
    
    if permission_result.get("allowed"):
        print(f"✅ Tool permission granted: {permission_result.get('message')}")
        
        # Proceed with tool execution
        tool_result = client.tools.execute(
            tool_name="kubectl",
            args={"namespace": "production", "action": "get pods"},
            runner="production-runner"
        )
    else:
        print(f"❌ Tool permission denied: {permission_result.get('message')}")
        
except PolicyError as e:
    print(f"Tool permission test failed: {e}")

# Test workflow execution permission
try:
    workflow_permission = client.policies.test(
        workflow_file="./workflows/deployment.json",
        params={"environment": "production", "version": "v1.2.0"},
        runner="default"
    )
    
    if workflow_permission.get("allowed"):
        print("✅ Workflow execution allowed")
        
        # Proceed with workflow execution
        workflow_result = client.workflows.execute(
            workflow_file="./workflows/deployment.json",
            params={"environment": "production", "version": "v1.2.0"}
        )
    else:
        print("❌ Workflow execution blocked:")
        for issue in workflow_permission.get("issues", []):
            print(f"  - {issue}")
            
except PolicyError as e:
    print(f"Workflow permission test failed: {e}")

# Test with argument files
try:
    permission_result = client.policies.test(
        tool_name="helm",
        args_file="./test-configs/helm-args.json",
        runner="staging-runner"
    )
    
    print(f"Helm permission result: {permission_result}")
    
except PolicyError as e:
    print(f"File-based permission test failed: {e}")

Exceptions

PolicyError (Base Exception)

Base exception class for all policy-related errors.
class PolicyError(KubiyaSDKError):
    """Policy-related errors"""

Attributes

  • message (str): Error message
  • details (Dict[str, Any]): Additional error context and metadata

Example

try:
    client.policies.get("non-existent-policy")
except PolicyError as e:
    print(f"Policy error: {e}")
    print(f"Details: {e.details}")

PolicyValidationError

Specialized exception for policy validation failures.
class PolicyValidationError(ValidationError):
    """Policy validation errors"""

Attributes

  • message (str): Validation error message
  • field (Optional[str]): Field that failed validation
  • value (Optional[Any]): Value that failed validation
  • details (Dict[str, Any]): Complete validation error context

Error Details Structure

{
    "field": "policy",
    "value": "invalid rego content",
    "validation_errors": [
        "line 5: rego_parse_error: unexpected token",
        "line 8: undefined variable 'unknownvar'"
    ]
}

Example

try:
    client.policies.create(
        name="invalid-policy",
        policy="package invalid { syntax error",
        validate=True
    )
except PolicyValidationError as e:
    print(f"Policy validation failed: {e}")
    print(f"Field: {e.field}")
    print(f"Value length: {len(str(e.value)) if e.value else 0}")
    
    # Access detailed validation errors
    validation_errors = e.details.get("validation_errors", [])
    if validation_errors:
        print("Specific validation errors:")
        for error in validation_errors:
            print(f"  - {error}")

Helper Methods

The PolicyService includes several private helper methods that handle common operations:

_get_policy_content(policy: Optional[str], file: Optional[str]) -> str

Internal method to retrieve policy content from either direct string or file path.

_parse_json_input(json_data: Optional[Union[Dict[str, Any], str]], json_file: Optional[str], default: Optional[Dict[str, Any]]) -> Dict[str, Any]

Internal method to parse JSON input from various sources with proper error handling.

_test_tool_permission(tool_name: str, args: Optional[Union[Dict[str, Any], str]], args_file: Optional[str], runner: str) -> Dict[str, Any]

Internal method to test tool execution permission by evaluating against configured policies.

_test_workflow_permission(workflow_file: str, params: Optional[Union[Dict[str, Any], str]], params_file: Optional[str], runner: str) -> Dict[str, Any]

Internal method to test workflow execution permission by validating each step and overall workflow permissions.

Complete Usage Example

Here’s a comprehensive example demonstrating all major PolicyService functionality:
from kubiya_workflow_sdk import KubiyaClient
from kubiya_workflow_sdk.kubiya_services.exceptions import PolicyError, PolicyValidationError

# Initialize client
client = KubiyaClient(
    api_key="your-api-key",
    base_url="https://api.kubiya.ai"
)

# 1. List existing policies
print("=== Listing Policies ===")
try:
    policies = client.policies.list()
    print(f"Found {len(policies)} existing policies")
    for policy in policies:
        print(f"- {policy.get('name')}: {policy.get('env', [])}")
except PolicyError as e:
    print(f"Failed to list policies: {e}")

# 2. Create a new policy
print("\n=== Creating Policy ===")
policy_content = '''
package tools

import future.keywords.if

# Admin users have full access
allow if {
    input.user.role == "admin"
}

# Regular users have limited access during business hours
allow if {
    input.user.role == "user"
    input.tool_name in ["read-tool", "list-tool", "status-tool"]
    input.time.hour >= 9
    input.time.hour <= 17
}

# Emergency access for specific tools
allow if {
    input.user.role == "on-call"
    input.tool_name in ["emergency-restart", "health-check"]
}
'''

try:
    # Validate first
    validation_result = client.policies.validate(
        name="business-hours-access",
        policy=policy_content,
        env=["staging"]
    )
    
    if validation_result.get("valid"):
        print("✅ Policy validation passed")
        
        # Create the policy
        create_result = client.policies.create(
            name="business-hours-access",
            policy=policy_content,
            env=["staging"],
            validate=False  # Already validated
        )
        print(f"✅ Policy created: {create_result['name']}")
    else:
        print("❌ Policy validation failed:")
        for error in validation_result.get("errors", []):
            print(f"  - {error}")
            
except PolicyValidationError as e:
    print(f"❌ Validation failed: {e}")
except PolicyError as e:
    print(f"❌ Creation failed: {e}")

# 3. Test policy evaluation
print("\n=== Testing Policy Evaluation ===")
test_cases = [
    {
        "user": {"role": "admin", "name": "admin1"},
        "tool_name": "production-deploy",
        "time": {"hour": 14}
    },
    {
        "user": {"role": "user", "name": "dev1"},
        "tool_name": "read-tool",
        "time": {"hour": 14}
    },
    {
        "user": {"role": "user", "name": "dev2"},
        "tool_name": "production-deploy",
        "time": {"hour": 14}
    },
    {
        "user": {"role": "on-call", "name": "oncall1"},
        "tool_name": "emergency-restart",
        "time": {"hour": 2}
    }
]

for i, test_case in enumerate(test_cases, 1):
    try:
        result = client.policies.evaluate(
            policy=policy_content,
            input=test_case,
            query="data.tools.allow"
        )
        
        allowed = result.get("result", False)
        user_role = test_case["user"]["role"]
        tool = test_case["tool_name"]
        
        print(f"Test {i}: {user_role} + {tool} = {'✅ ALLOWED' if allowed else '❌ DENIED'}")
        
    except PolicyError as e:
        print(f"Test {i} failed: {e}")

# 4. Test tool permissions
print("\n=== Testing Tool Permissions ===")
try:
    tool_permission = client.policies.test(
        tool_name="kubectl",
        args={"namespace": "staging", "action": "get pods"},
        runner="staging-runner"
    )
    
    if tool_permission.get("allowed"):
        print(f"✅ kubectl permission: {tool_permission.get('message')}")
    else:
        print(f"❌ kubectl denied: {tool_permission.get('message')}")
        
except PolicyError as e:
    print(f"Tool permission test failed: {e}")

# 5. Update policy for production
print("\n=== Updating Policy for Production ===")
try:
    update_result = client.policies.update(
        policy_name="business-hours-access",
        env=["staging", "production"],  # Add production environment
        validate=True
    )
    print(f"✅ Policy updated for environments: {update_result.get('env', [])}")
    
except PolicyError as e:
    print(f"❌ Update failed: {e}")

# 6. Get updated policy details
print("\n=== Getting Policy Details ===")
try:
    policy_details = client.policies.get("business-hours-access")
    print(f"Policy: {policy_details['name']}")
    print(f"Environments: {policy_details.get('env', [])}")
    print(f"Content length: {len(policy_details.get('policy', ''))} characters")
    
except PolicyError as e:
    print(f"Failed to get policy details: {e}")

print("\n=== Policy Management Complete ===")
This API reference provides complete documentation for all public interfaces in the Policies service. Use the examples and error handling patterns to build robust policy management and access control workflows.