Skip to main content
Testing workflows ensures reliability and correctness before deploying to production. Learn how to test workflows locally, mock dependencies, and integrate with CI/CD pipelines.

Overview

Testing Kubiya workflows involves several strategies:
  • Unit Testing: Test individual workflow steps in isolation
  • Integration Testing: Test complete workflows with real or mocked services
  • Validation Testing: Verify workflow syntax and structure
  • End-to-End Testing: Test workflows in realistic environments
  • CI/CD Integration: Automate testing in deployment pipelines
Start with unit tests for quick feedback, then add integration tests for confidence, and finally end-to-end tests for production readiness.

Quick Start

import pytest
from kubiya import ControlPlaneClient

@pytest.fixture
def client():
    """Create test client."""
    return ControlPlaneClient(api_key="test-api-key")

def test_workflow_validation(client):
    """Test that workflow structure is valid."""
    workflow_spec = {
        "name": "test-workflow",
        "steps": [
            {"name": "step1", "action": "echo", "params": {"message": "Hello"}}
        ]
    }

    # Validate workflow structure
    assert "name" in workflow_spec
    assert "steps" in workflow_spec
    assert len(workflow_spec["steps"]) > 0
When you define workflows with the Kubiya Workflow DSL instead of raw dicts, you can use the DSL’s built-in validation to catch structural issues before submitting them to the Control Plane.
from kubiya.dsl import chain


def test_dsl_workflow_validation():
    """Validate a DSL-defined workflow using Workflow.validate()."""
    wf = (
        chain("nightly-backup")
            .description("Nightly database backup")
            .schedule("0 2 * * *")
            .queue("backups", max_active_runs=1)
            .step("dump-db", "pg_dump mydb > backup.sql")
            .step("upload", callback=lambda s:
                s.shell("aws s3 cp backup.sql s3://my-bucket/backups/")
            )
    )

    result = wf.validate()

    assert result["valid"] is True
    assert result["errors"] == []
    assert result["warnings"] == []
This kind of test runs entirely locally: it never executes the workflow, but it guarantees that the compiled spec has a name, at least one step, unique step names, and no obvious structural problems for a given execution type (chain vs graph).

Testing Strategies

1. Unit Testing Workflow Steps

Test individual steps in isolation:
import pytest

def execute_step(step_config: dict) -> dict:
    """Execute a single workflow step."""
    action = step_config.get("action")
    params = step_config.get("params", {})

    if action == "echo":
        return {"output": params.get("message")}
    elif action == "http_request":
        # Implementation
        pass

    return {}

def test_echo_step():
    """Test echo step execution."""
    step = {
        "action": "echo",
        "params": {"message": "test message"}
    }

    result = execute_step(step)

    assert result["output"] == "test message"

def test_step_with_missing_params():
    """Test step gracefully handles missing parameters."""
    step = {"action": "echo", "params": {}}

    result = execute_step(step)

    assert "output" in result

Testing DSL workflow definitions

When you build workflows with the Kubiya Workflow DSL (workflow, chain, or graph), you can unit-test the specification itself without running any containers or hitting the Control Plane. This is useful for catching breaking changes early (for example, when someone renames a step or removes a queue configuration).

Validating workflows with Workflow.validate()

from kubiya.dsl import chain


def build_backup_workflow():
    """Helper that returns a fully configured workflow instance."""
    return (
        chain("nightly-backup")
            .description("Nightly database backup")
            .schedule("0 2 * * *")
            .queue("backups", max_active_runs=1)
            .step("dump-db", "pg_dump mydb > backup.sql")
            .step("upload", callback=lambda s:
                s.shell("aws s3 cp backup.sql s3://my-bucket/backups/")
            )
    )


def test_backup_workflow_is_valid():
    wf = build_backup_workflow()
    result = wf.validate()

    assert result["valid"] is True
    assert result["errors"] == []
    assert result["warnings"] == []
This mirrors how Workflow.validate() is implemented: it ensures the workflow has a name, at least one step, no duplicate step names, and that each step has either a command, run, or type field set. In chain mode it also flags explicit dependencies with warnings, which you can turn into failed tests if your team prefers to keep chains strictly sequential.

Asserting compiled workflow structure with to_dict()

You can also assert against the compiled workflow spec to make sure it includes key fields like type, schedule, queue, and executor configuration.
from kubiya.dsl import chain


def test_backup_workflow_shape():
    wf = (
        chain("nightly-backup")
            .schedule("0 2 * * *")
            .queue("backups", max_active_runs=1)
            .step("dump-db", "pg_dump mydb > backup.sql")
            .step("upload-to-s3", callback=lambda s:
                s.docker_run(
                    "amazon/aws-cli:2.15.0",
                    command=["s3", "cp", "backup.sql", "s3://my-bucket/backups/"],
                    env={"AWS_REGION": "us-east-1"},
                )
            )
    )

    spec = wf.to_dict()

    # High-level workflow properties
    assert spec["name"] == "nightly-backup"
    assert spec["type"] == "chain"
    assert spec["schedule"] == "0 2 * * *"
    assert spec["queue"] == "backups"

    # Step list and executor details
    assert [s["name"] for s in spec["steps"]] == ["dump-db", "upload-to-s3"]

    upload_step = spec["steps"][1]
    assert upload_step["executor"]["type"] == "docker_run"
    assert upload_step["executor"]["config"]["image"].startswith("amazon/aws-cli")
These tests stay fast and deterministic: they only touch in-memory Python objects, but still protect you against accidental changes to your workflow shape (for example, changing the workflow type from chain to graph or dropping the schedule/queue configuration).

2. Mocking External Dependencies

Mock external services and APIs:
import pytest
from unittest.mock import Mock, patch
from kubiya import ControlPlaneClient

@pytest.fixture
def mock_client():
    """Create mock Control Plane client."""
    client = Mock(spec=ControlPlaneClient)

    # Mock graph operations
    client.graph.intelligent_search.return_value = {
        "answer": "Test answer",
        "nodes": [],
        "confidence": "high"
    }

    # Mock dataset operations
    client.datasets.list_datasets.return_value = [
        {"id": "test-dataset", "name": "test"}
    ]

    return client

def test_workflow_with_mock_client(mock_client):
    """Test workflow using mocked client."""
    # Execute workflow logic
    result = mock_client.graph.intelligent_search(keywords="test")

    # Verify
    assert result["answer"] == "Test answer"
    mock_client.graph.intelligent_search.assert_called_once_with(keywords="test")

3. Integration Testing

Test workflows with real SDK calls in test environment:
import pytest
from kubiya import ControlPlaneClient
import os

@pytest.fixture
def test_client():
    """Create client for test environment."""
    api_key = os.getenv("KUBIYA_TEST_API_KEY")
    if not api_key:
        pytest.skip("Test API key not available")

    return ControlPlaneClient(api_key=api_key)

@pytest.fixture
def test_dataset(test_client):
    """Create test dataset."""
    dataset = test_client.datasets.create_dataset(
        name=f"test-dataset-{os.urandom(4).hex()}",
        scope="user"
    )

    yield dataset

    # Cleanup
    test_client.datasets.delete_dataset(dataset_id=dataset['id'])

def test_workflow_end_to_end(test_client, test_dataset):
    """Test complete workflow execution."""
    # Store test data
    memory = test_client.graph.store_memory(
        dataset_id=test_dataset['id'],
        context="Test data for integration test"
    )

    assert memory['status'] == 'completed'

    # Recall data
    memories = test_client.graph.recall_memory(
        query="integration test",
        limit=1
    )

    assert len(memories) > 0
    assert "test" in memories[0]['content'].lower()

4. Parameterized Testing

Test workflows with various inputs:
import pytest

@pytest.mark.parametrize("input,expected", [
    ("hello", "HELLO"),
    ("world", "WORLD"),
    ("", ""),
    ("Test123", "TEST123")
])
def test_uppercase_transformation(input, expected):
    """Test string transformation with various inputs."""
    result = input.upper()
    assert result == expected

@pytest.mark.parametrize("query,min_results", [
    ("production databases", 1),
    ("kubernetes services", 2),
    ("aws ec2 instances", 3)
])
def test_search_results(test_client, query, min_results):
    """Test search returns minimum expected results."""
    result = test_client.graph.intelligent_search(keywords=query)

    assert len(result['nodes']) >= min_results

Testing Patterns

Arrange-Act-Assert Pattern

def test_dataset_creation():
    """Test dataset creation follows AAA pattern."""
    # Arrange
    client = ControlPlaneClient(api_key="test-key")
    dataset_name = "test-dataset"

    # Act
    dataset = client.datasets.create_dataset(
        name=dataset_name,
        scope="user"
    )

    # Assert
    assert dataset['name'] == dataset_name
    assert dataset['scope'] == "user"
    assert 'id' in dataset

Setup and Teardown

import pytest

@pytest.fixture(scope="session")
def setup_test_environment():
    """Setup test environment once per session."""
    print("\nSetting up test environment...")
    # Setup code
    yield
    print("\nTearing down test environment...")
    # Cleanup code

@pytest.fixture(scope="function")
def test_data():
    """Provide test data for each test."""
    data = {"test": "value"}
    yield data
    # Cleanup after each test
    data.clear()

def test_with_fixtures(setup_test_environment, test_data):
    """Test using fixtures."""
    assert test_data["test"] == "value"

Error Testing

import pytest
from kubiya.resources.exceptions import GraphError

def test_error_handling():
    """Test that errors are handled properly."""
    client = ControlPlaneClient(api_key="invalid-key")

    with pytest.raises(GraphError):
        client.graph.intelligent_search(keywords="test")

def test_validation_errors():
    """Test validation error handling."""
    client = ControlPlaneClient(api_key="test-key")

    with pytest.raises(GraphError) as exc_info:
        client.ingestion.ingest_node(
            id="",  # Invalid empty ID
            labels=[],
            properties={}
        )

    assert "invalid" in str(exc_info.value).lower()

CI/CD Integration

GitHub Actions Example

name: Test Workflows

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install kubiya-sdk pytest pytest-cov

      - name: Run tests
        env:
          KUBIYA_TEST_API_KEY: ${{ secrets.KUBIYA_TEST_API_KEY }}
        run: |
          pytest tests/ -v --cov=workflows --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v3

pytest Configuration

# pytest.ini
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
    -v
    --tb=short
    --strict-markers
    --disable-warnings

markers =
    unit: Unit tests
    integration: Integration tests
    e2e: End-to-end tests
    slow: Slow-running tests

Test Organization

Directory Structure

project/
├── workflows/
│   ├── __init__.py
│   ├── data_sync.py
│   └── incident_response.py
├── tests/
│   ├── __init__.py
│   ├── conftest.py           # Shared fixtures
│   ├── unit/
│   │   ├── test_data_sync.py
│   │   └── test_incident_response.py
│   ├── integration/
│   │   ├── test_workflows_integration.py
│   │   └── test_graph_operations.py
│   └── e2e/
│       └── test_complete_workflows.py
├── pytest.ini
└── requirements-test.txt

Shared Fixtures (conftest.py)

# tests/conftest.py
import pytest
from kubiya import ControlPlaneClient
import os

@pytest.fixture(scope="session")
def api_key():
    """Get test API key."""
    key = os.getenv("KUBIYA_TEST_API_KEY")
    if not key:
        pytest.skip("KUBIYA_TEST_API_KEY not set")
    return key

@pytest.fixture(scope="session")
def client(api_key):
    """Create shared Control Plane client."""
    return ControlPlaneClient(api_key=api_key)

@pytest.fixture
def test_dataset_id():
    """Provide test dataset ID."""
    return "test-dataset"

def pytest_configure(config):
    """Configure pytest."""
    config.addinivalue_line("markers", "unit: Unit tests")
    config.addinivalue_line("markers", "integration: Integration tests")
    config.addinivalue_line("markers", "e2e: End-to-end tests")

Best Practices

1. Use Test Markers

import pytest

@pytest.mark.unit
def test_unit_logic():
    """Fast unit test."""
    assert True

@pytest.mark.integration
def test_integration_with_api(client):
    """Integration test with real API."""
    result = client.datasets.list_datasets()
    assert isinstance(result, list)

@pytest.mark.e2e
@pytest.mark.slow
def test_full_workflow(client):
    """Slow end-to-end test."""
    # Complete workflow test
    pass
Run specific test types:
# Run only unit tests
pytest -m unit

# Run integration and e2e tests
pytest -m "integration or e2e"

# Skip slow tests
pytest -m "not slow"

2. Clean Up Test Data

@pytest.fixture
def cleanup_datasets(client):
    """Clean up test datasets after tests."""
    created_datasets = []

    yield created_datasets

    # Cleanup
    for dataset_id in created_datasets:
        try:
            client.datasets.delete_dataset(dataset_id=dataset_id)
        except Exception:
            pass  # Already deleted or doesn't exist

def test_with_cleanup(client, cleanup_datasets):
    """Test that cleans up after itself."""
    dataset = client.datasets.create_dataset(name="temp", scope="user")
    cleanup_datasets.append(dataset['id'])

    # Test logic...

3. Test Error Paths

def test_graceful_error_handling(client):
    """Test that errors are handled gracefully."""
    try:
        # Attempt invalid operation
        client.graph.intelligent_search(keywords="", max_turns=-1)
        assert False, "Should have raised an error"
    except Exception as e:
        # Verify error is handled appropriately
        assert "invalid" in str(e).lower() or "error" in str(e).lower()

4. Use Descriptive Test Names

# ❌ BAD - Unclear test names
def test_1():
    pass

def test_workflow():
    pass

# ✅ GOOD - Descriptive test names
def test_dataset_creation_with_org_scope():
    """Test creating organization-scoped dataset."""
    pass

def test_intelligent_search_returns_relevant_results():
    """Test that intelligent search returns results with high relevance scores."""
    pass

Troubleshooting Tests

Debug Failing Tests

import pytest

@pytest.fixture
def debug_mode():
    """Enable debug output."""
    import logging
    logging.basicConfig(level=logging.DEBUG)
    return True

def test_with_debug(client, debug_mode):
    """Test with debug output."""
    result = client.graph.intelligent_search(keywords="test")

    # Add debug output
    print(f"\nResult: {result}")
    print(f"Nodes: {len(result.get('nodes', []))}")

    assert len(result['nodes']) > 0
Run with verbose output:
pytest -v -s tests/test_workflow.py

Handle Flaky Tests

import pytest

@pytest.mark.flaky(reruns=3, reruns_delay=2)
def test_sometimes_fails():
    """Test that might fail due to external factors."""
    # Test that depends on external service availability
    pass

Next Steps