Skip to main content
Testing workflows ensures reliability and correctness before deploying to production. Learn how to test workflows locally, mock dependencies, and integrate with CI/CD pipelines.

Overview

Testing Kubiya workflows involves several strategies:
  • Unit Testing: Test individual workflow steps in isolation
  • Integration Testing: Test complete workflows with real or mocked services
  • Validation Testing: Verify workflow syntax and structure
  • End-to-End Testing: Test workflows in realistic environments
  • CI/CD Integration: Automate testing in deployment pipelines
Start with unit tests for quick feedback, then add integration tests for confidence, and finally end-to-end tests for production readiness.

Quick Start

import pytest
from kubiya import ControlPlaneClient

@pytest.fixture
def client():
    """Create test client."""
    return ControlPlaneClient(api_key="test-api-key")

def test_workflow_validation(client):
    """Test that workflow structure is valid."""
    workflow_spec = {
        "name": "test-workflow",
        "steps": [
            {"name": "step1", "action": "echo", "params": {"message": "Hello"}}
        ]
    }

    # Validate workflow structure
    assert "name" in workflow_spec
    assert "steps" in workflow_spec
    assert len(workflow_spec["steps"]) > 0

Testing Strategies

1. Unit Testing Workflow Steps

Test individual steps in isolation:
import pytest

def execute_step(step_config: dict) -> dict:
    """Execute a single workflow step."""
    action = step_config.get("action")
    params = step_config.get("params", {})

    if action == "echo":
        return {"output": params.get("message")}
    elif action == "http_request":
        # Implementation
        pass

    return {}

def test_echo_step():
    """Test echo step execution."""
    step = {
        "action": "echo",
        "params": {"message": "test message"}
    }

    result = execute_step(step)

    assert result["output"] == "test message"

def test_step_with_missing_params():
    """Test step gracefully handles missing parameters."""
    step = {"action": "echo", "params": {}}

    result = execute_step(step)

    assert "output" in result

2. Mocking External Dependencies

Mock external services and APIs:
import pytest
from unittest.mock import Mock, patch
from kubiya import ControlPlaneClient

@pytest.fixture
def mock_client():
    """Create mock Control Plane client."""
    client = Mock(spec=ControlPlaneClient)

    # Mock graph operations
    client.graph.intelligent_search.return_value = {
        "answer": "Test answer",
        "nodes": [],
        "confidence": "high"
    }

    # Mock dataset operations
    client.datasets.list_datasets.return_value = [
        {"id": "test-dataset", "name": "test"}
    ]

    return client

def test_workflow_with_mock_client(mock_client):
    """Test workflow using mocked client."""
    # Execute workflow logic
    result = mock_client.graph.intelligent_search(keywords="test")

    # Verify
    assert result["answer"] == "Test answer"
    mock_client.graph.intelligent_search.assert_called_once_with(keywords="test")

3. Integration Testing

Test workflows with real SDK calls in test environment:
import pytest
from kubiya import ControlPlaneClient
import os

@pytest.fixture
def test_client():
    """Create client for test environment."""
    api_key = os.getenv("KUBIYA_TEST_API_KEY")
    if not api_key:
        pytest.skip("Test API key not available")

    return ControlPlaneClient(api_key=api_key)

@pytest.fixture
def test_dataset(test_client):
    """Create test dataset."""
    dataset = test_client.datasets.create_dataset(
        name=f"test-dataset-{os.urandom(4).hex()}",
        scope="user"
    )

    yield dataset

    # Cleanup
    test_client.datasets.delete_dataset(dataset_id=dataset['id'])

def test_workflow_end_to_end(test_client, test_dataset):
    """Test complete workflow execution."""
    # Store test data
    memory = test_client.graph.store_memory(
        dataset_id=test_dataset['id'],
        context="Test data for integration test"
    )

    assert memory['status'] == 'completed'

    # Recall data
    memories = test_client.graph.recall_memory(
        query="integration test",
        limit=1
    )

    assert len(memories) > 0
    assert "test" in memories[0]['content'].lower()

4. Parameterized Testing

Test workflows with various inputs:
import pytest

@pytest.mark.parametrize("input,expected", [
    ("hello", "HELLO"),
    ("world", "WORLD"),
    ("", ""),
    ("Test123", "TEST123")
])
def test_uppercase_transformation(input, expected):
    """Test string transformation with various inputs."""
    result = input.upper()
    assert result == expected

@pytest.mark.parametrize("query,min_results", [
    ("production databases", 1),
    ("kubernetes services", 2),
    ("aws ec2 instances", 3)
])
def test_search_results(test_client, query, min_results):
    """Test search returns minimum expected results."""
    result = test_client.graph.intelligent_search(keywords=query)

    assert len(result['nodes']) >= min_results

Testing Patterns

Arrange-Act-Assert Pattern

def test_dataset_creation():
    """Test dataset creation follows AAA pattern."""
    # Arrange
    client = ControlPlaneClient(api_key="test-key")
    dataset_name = "test-dataset"

    # Act
    dataset = client.datasets.create_dataset(
        name=dataset_name,
        scope="user"
    )

    # Assert
    assert dataset['name'] == dataset_name
    assert dataset['scope'] == "user"
    assert 'id' in dataset

Setup and Teardown

import pytest

@pytest.fixture(scope="session")
def setup_test_environment():
    """Setup test environment once per session."""
    print("\nSetting up test environment...")
    # Setup code
    yield
    print("\nTearing down test environment...")
    # Cleanup code

@pytest.fixture(scope="function")
def test_data():
    """Provide test data for each test."""
    data = {"test": "value"}
    yield data
    # Cleanup after each test
    data.clear()

def test_with_fixtures(setup_test_environment, test_data):
    """Test using fixtures."""
    assert test_data["test"] == "value"

Error Testing

import pytest
from kubiya.resources.exceptions import GraphError

def test_error_handling():
    """Test that errors are handled properly."""
    client = ControlPlaneClient(api_key="invalid-key")

    with pytest.raises(GraphError):
        client.graph.intelligent_search(keywords="test")

def test_validation_errors():
    """Test validation error handling."""
    client = ControlPlaneClient(api_key="test-key")

    with pytest.raises(GraphError) as exc_info:
        client.ingestion.ingest_node(
            id="",  # Invalid empty ID
            labels=[],
            properties={}
        )

    assert "invalid" in str(exc_info.value).lower()

CI/CD Integration

GitHub Actions Example

name: Test Workflows

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install kubiya-sdk pytest pytest-cov

      - name: Run tests
        env:
          KUBIYA_TEST_API_KEY: ${{ secrets.KUBIYA_TEST_API_KEY }}
        run: |
          pytest tests/ -v --cov=workflows --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v3

pytest Configuration

# pytest.ini
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
    -v
    --tb=short
    --strict-markers
    --disable-warnings

markers =
    unit: Unit tests
    integration: Integration tests
    e2e: End-to-end tests
    slow: Slow-running tests

Test Organization

Directory Structure

project/
├── workflows/
│   ├── __init__.py
│   ├── data_sync.py
│   └── incident_response.py
├── tests/
│   ├── __init__.py
│   ├── conftest.py           # Shared fixtures
│   ├── unit/
│   │   ├── test_data_sync.py
│   │   └── test_incident_response.py
│   ├── integration/
│   │   ├── test_workflows_integration.py
│   │   └── test_graph_operations.py
│   └── e2e/
│       └── test_complete_workflows.py
├── pytest.ini
└── requirements-test.txt

Shared Fixtures (conftest.py)

# tests/conftest.py
import pytest
from kubiya import ControlPlaneClient
import os

@pytest.fixture(scope="session")
def api_key():
    """Get test API key."""
    key = os.getenv("KUBIYA_TEST_API_KEY")
    if not key:
        pytest.skip("KUBIYA_TEST_API_KEY not set")
    return key

@pytest.fixture(scope="session")
def client(api_key):
    """Create shared Control Plane client."""
    return ControlPlaneClient(api_key=api_key)

@pytest.fixture
def test_dataset_id():
    """Provide test dataset ID."""
    return "test-dataset"

def pytest_configure(config):
    """Configure pytest."""
    config.addinivalue_line("markers", "unit: Unit tests")
    config.addinivalue_line("markers", "integration: Integration tests")
    config.addinivalue_line("markers", "e2e: End-to-end tests")

Best Practices

1. Use Test Markers

import pytest

@pytest.mark.unit
def test_unit_logic():
    """Fast unit test."""
    assert True

@pytest.mark.integration
def test_integration_with_api(client):
    """Integration test with real API."""
    result = client.datasets.list_datasets()
    assert isinstance(result, list)

@pytest.mark.e2e
@pytest.mark.slow
def test_full_workflow(client):
    """Slow end-to-end test."""
    # Complete workflow test
    pass
Run specific test types:
# Run only unit tests
pytest -m unit

# Run integration and e2e tests
pytest -m "integration or e2e"

# Skip slow tests
pytest -m "not slow"

2. Clean Up Test Data

@pytest.fixture
def cleanup_datasets(client):
    """Clean up test datasets after tests."""
    created_datasets = []

    yield created_datasets

    # Cleanup
    for dataset_id in created_datasets:
        try:
            client.datasets.delete_dataset(dataset_id=dataset_id)
        except Exception:
            pass  # Already deleted or doesn't exist

def test_with_cleanup(client, cleanup_datasets):
    """Test that cleans up after itself."""
    dataset = client.datasets.create_dataset(name="temp", scope="user")
    cleanup_datasets.append(dataset['id'])

    # Test logic...

3. Test Error Paths

def test_graceful_error_handling(client):
    """Test that errors are handled gracefully."""
    try:
        # Attempt invalid operation
        client.graph.intelligent_search(keywords="", max_turns=-1)
        assert False, "Should have raised an error"
    except Exception as e:
        # Verify error is handled appropriately
        assert "invalid" in str(e).lower() or "error" in str(e).lower()

4. Use Descriptive Test Names

# ❌ BAD - Unclear test names
def test_1():
    pass

def test_workflow():
    pass

# ✅ GOOD - Descriptive test names
def test_dataset_creation_with_org_scope():
    """Test creating organization-scoped dataset."""
    pass

def test_intelligent_search_returns_relevant_results():
    """Test that intelligent search returns results with high relevance scores."""
    pass

Troubleshooting Tests

Debug Failing Tests

import pytest

@pytest.fixture
def debug_mode():
    """Enable debug output."""
    import logging
    logging.basicConfig(level=logging.DEBUG)
    return True

def test_with_debug(client, debug_mode):
    """Test with debug output."""
    result = client.graph.intelligent_search(keywords="test")

    # Add debug output
    print(f"\nResult: {result}")
    print(f"Nodes: {len(result.get('nodes', []))}")

    assert len(result['nodes']) > 0
Run with verbose output:
pytest -v -s tests/test_workflow.py

Handle Flaky Tests

import pytest

@pytest.mark.flaky(reruns=3, reruns_delay=2)
def test_sometimes_fails():
    """Test that might fail due to external factors."""
    # Test that depends on external service availability
    pass

Next Steps