Skip to main content
Testing workflows is essential for ensuring reliability and correctness. The Kubiya Workflow DSL provides multiple testing approaches from unit testing individual steps to full end-to-end workflow validation.

Unit Testing Steps

Basic Step Testing

import unittest
from kubiya.testing import StepTest

class TestDeploymentSteps(StepTest):
    def test_build_docker_image_step(self):
        """Test Docker image build step"""
        
        # Create step to test
        build_step = Step("build-image").tool("docker-builder").inputs(
            dockerfile_path="./Dockerfile",
            image_name="myapp",
            tag="v1.0.0"
        )
        
        # Mock tool response
        self.mock_tool("docker-builder", {
            "status": "success",
            "image_id": "sha256:abc123...",
            "image_size": "245MB"
        })
        
        # Execute step
        result = self.execute_step(build_step, {
            "BUILD_VERSION": "v1.0.0"
        })
        
        # Assertions
        self.assertEqual(result.status, "success")
        self.assertIn("image_id", result.output)
        self.assertEqual(result.output["image_size"], "245MB")

Testing Step Conditions

class TestConditionalSteps(StepTest):
    def test_production_only_step(self):
        """Test step that only runs in production"""
        
        production_step = Step("production-backup").tool("backup-tool").inputs(
            database="prod-db"
        ).condition("${ENVIRONMENT} == 'production'")
        
        # Test with production environment
        result = self.execute_step(production_step, {
            "ENVIRONMENT": "production"
        })
        self.assertTrue(result.executed)
        
        # Test with staging environment
        result = self.execute_step(production_step, {
            "ENVIRONMENT": "staging"
        })
        self.assertFalse(result.executed)
        self.assertEqual(result.skip_reason, "Condition not met")
    
    def test_complex_condition_logic(self):
        """Test step with complex conditional logic"""
        
        complex_step = Step("complex-deployment").tool("deployer").condition("""
            (${ENVIRONMENT} == 'production' and ${APPROVAL_RECEIVED} == 'true') or
            (${ENVIRONMENT} == 'staging' and ${TEST_RESULTS.success_rate} > 0.95)
        """)
        
        # Test production with approval
        result = self.execute_step(complex_step, {
            "ENVIRONMENT": "production",
            "APPROVAL_RECEIVED": "true"
        })
        self.assertTrue(result.executed)
        
        # Test staging with high success rate
        result = self.execute_step(complex_step, {
            "ENVIRONMENT": "staging", 
            "TEST_RESULTS": {"success_rate": 0.98}
        })
        self.assertTrue(result.executed)
        
        # Test staging with low success rate
        result = self.execute_step(complex_step, {
            "ENVIRONMENT": "staging",
            "TEST_RESULTS": {"success_rate": 0.85}
        })
        self.assertFalse(result.executed)

Testing Error Handling

class TestErrorHandling(StepTest):
    def test_step_retry_mechanism(self):
        """Test step retry behavior"""
        
        retry_step = Step("flaky-operation").tool("unreliable-service").retry(
            max_attempts=3,
            backoff_strategy="exponential"
        )
        
        # Mock service to fail twice then succeed
        self.mock_tool_sequence("unreliable-service", [
            {"status": "error", "message": "Service temporarily unavailable"},
            {"status": "error", "message": "Service temporarily unavailable"},
            {"status": "success", "data": "Operation completed"}
        ])
        
        result = self.execute_step(retry_step)
        
        self.assertEqual(result.status, "success")
        self.assertEqual(result.attempts, 3)
        self.assertEqual(result.output["data"], "Operation completed")
    
    def test_error_handler_execution(self):
        """Test error handler execution on step failure"""
        
        failing_step = Step("failing-operation").tool("faulty-service").on_error(
            ErrorHandler().on_failure([
                Step("cleanup").tool("cleanup-service"),
                Step("notify").tool("notification-service").inputs(
                    message="Operation failed, cleanup completed"
                )
            ])
        )
        
        # Mock main service to fail
        self.mock_tool("faulty-service", {
            "status": "error",
            "message": "Critical system failure"
        })
        
        # Mock error handling services
        self.mock_tool("cleanup-service", {"status": "success"})
        self.mock_tool("notification-service", {"status": "success"})
        
        result = self.execute_step(failing_step)
        
        self.assertEqual(result.status, "error")
        self.assertTrue(result.error_handlers_executed)
        self.assertEqual(len(result.error_handler_results), 2)

Workflow Testing

Integration Testing

from kubiya.testing import WorkflowTest

class TestDeploymentWorkflow(WorkflowTest):
    def test_successful_deployment_flow(self):
        """Test complete successful deployment workflow"""
        
        workflow = self.create_deployment_workflow()
        
        # Mock all tools for success scenario
        self.mock_tool("docker-builder", {"status": "success", "image": "myapp:v1.0.0"})
        self.mock_tool("test-runner", {"status": "success", "tests_passed": 45, "tests_failed": 0})
        self.mock_tool("kubernetes-deployer", {"status": "success", "deployment": "myapp-v1.0.0"})
        self.mock_tool("health-checker", {"status": "healthy", "response_time": 120})
        self.mock_tool("slack", {"status": "sent", "message_id": "msg_123"})
        
        # Execute workflow
        result = self.execute_workflow(workflow, {
            "SERVICE_NAME": "myapp",
            "VERSION": "v1.0.0",
            "ENVIRONMENT": "staging"
        })
        
        # Verify workflow completion
        self.assertEqual(result.status, "completed")
        self.assertEqual(len(result.completed_steps), 5)
        
        # Verify step execution order
        step_names = [step.name for step in result.completed_steps]
        expected_order = ["build-image", "run-tests", "deploy-service", "health-check", "notify-success"]
        self.assertEqual(step_names, expected_order)
    
    def test_deployment_with_test_failures(self):
        """Test workflow behavior when tests fail"""
        
        workflow = self.create_deployment_workflow()
        
        # Mock successful build but failed tests
        self.mock_tool("docker-builder", {"status": "success"})
        self.mock_tool("test-runner", {
            "status": "failed", 
            "tests_passed": 40, 
            "tests_failed": 5,
            "failed_tests": ["test_payment_flow", "test_user_auth"]
        })
        
        result = self.execute_workflow(workflow, {
            "SERVICE_NAME": "myapp",
            "VERSION": "v1.0.0"
        })
        
        # Workflow should stop at test failure
        self.assertEqual(result.status, "failed")
        self.assertEqual(len(result.completed_steps), 2)  # build + tests only
        self.assertEqual(result.failed_step.name, "run-tests")
        
        # Deployment and health check should not execute
        executed_step_names = [step.name for step in result.completed_steps]
        self.assertNotIn("deploy-service", executed_step_names)
        self.assertNotIn("health-check", executed_step_names)

Testing Branching Logic

class TestWorkflowBranching(WorkflowTest):
    def test_environment_based_branching(self):
        """Test different execution paths based on environment"""
        
        workflow = self.create_environment_aware_workflow()
        
        # Test production path (requires approval)
        self.mock_tool("approval-gate", {"status": "approved", "approved_by": "admin@company.com"})
        self.mock_tool("production-deployer", {"status": "success"})
        
        prod_result = self.execute_workflow(workflow, {
            "ENVIRONMENT": "production",
            "SERVICE_NAME": "critical-service"
        })
        
        self.assertEqual(prod_result.status, "completed")
        self.assertIn("request-approval", [s.name for s in prod_result.completed_steps])
        
        # Test staging path (automatic deployment)
        self.mock_tool("staging-deployer", {"status": "success"})
        
        staging_result = self.execute_workflow(workflow, {
            "ENVIRONMENT": "staging",
            "SERVICE_NAME": "critical-service"
        })
        
        self.assertEqual(staging_result.status, "completed")
        self.assertNotIn("request-approval", [s.name for s in staging_result.completed_steps])

Parallel Step Testing

class TestParallelExecution(WorkflowTest):
    def test_parallel_step_execution(self):
        """Test parallel step execution and synchronization"""
        
        # Create workflow with parallel health checks
        parallel_checks = Parallel([
            Step("check-frontend").tool("health-checker").inputs(service="frontend"),
            Step("check-api").tool("health-checker").inputs(service="api"),
            Step("check-database").tool("health-checker").inputs(service="database")
        ])
        
        workflow = Workflow([parallel_checks])
        
        # Mock all health checks to succeed
        self.mock_tool("health-checker", {"status": "healthy"})
        
        result = self.execute_workflow(workflow)
        
        self.assertEqual(result.status, "completed")
        
        # Verify all parallel steps executed
        parallel_step_names = {step.name for step in result.completed_steps}
        expected_steps = {"check-frontend", "check-api", "check-database"}
        self.assertEqual(parallel_step_names, expected_steps)
        
        # Verify parallel execution timing
        self.assertLess(result.total_duration, result.sum_of_step_durations)
    
    def test_parallel_step_failure_handling(self):
        """Test workflow behavior when one parallel step fails"""
        
        parallel_checks = Parallel([
            Step("check-frontend").tool("health-checker"),
            Step("check-api").tool("health-checker"),
            Step("check-database").tool("health-checker")
        ])
        
        # Mock database check to fail
        self.mock_tool_by_inputs("health-checker", 
            {"service": "frontend"}, {"status": "healthy"}
        )
        self.mock_tool_by_inputs("health-checker",
            {"service": "api"}, {"status": "healthy"}
        )
        self.mock_tool_by_inputs("health-checker",
            {"service": "database"}, {"status": "unhealthy", "error": "Connection timeout"}
        )
        
        workflow = Workflow([parallel_checks])
        result = self.execute_workflow(workflow)
        
        # Workflow should fail due to database health check
        self.assertEqual(result.status, "failed")
        self.assertEqual(result.failed_step.name, "check-database")

Mock and Stub Utilities

Tool Mocking

class TestWithMocks(WorkflowTest):
    def setUp(self):
        """Set up common mocks"""
        super().setUp()
        
        # Mock common tools
        self.mock_tool("logger", {"status": "logged"})
        self.mock_tool("metrics-collector", {"status": "collected"})
        
        # Set up realistic external service mocks
        self.setup_kubernetes_mocks()
        self.setup_slack_mocks()
    
    def setup_kubernetes_mocks(self):
        """Set up Kubernetes-related tool mocks"""
        self.mock_tool("kubectl", {
            "status": "success",
            "resources_applied": 3,
            "namespace": "default"
        })
        
        self.mock_tool("helm", {
            "status": "success",
            "release_name": "myapp-v1.0.0",
            "revision": 1
        })
    
    def setup_slack_mocks(self):
        """Set up Slack notification mocks"""
        self.mock_tool("slack", {
            "status": "sent",
            "channel": "#deployments",
            "timestamp": "2024-01-15T10:30:00Z"
        })
    
    def test_with_realistic_mocks(self):
        """Test workflow with realistic tool responses"""
        
        deployment_step = Step("deploy-with-helm").tool("helm").inputs(
            chart="./myapp-chart",
            release_name="myapp",
            namespace="production"
        )
        
        result = self.execute_step(deployment_step)
        
        self.assertEqual(result.status, "success")
        self.assertEqual(result.output["release_name"], "myapp-v1.0.0")

Dynamic Mocks

class TestDynamicMocking(WorkflowTest):
    def test_stateful_service_interaction(self):
        """Test workflow with stateful service interactions"""
        
        # Create mock that changes behavior based on previous calls
        database_mock = StatefulMock()
        database_mock.on_call("backup", returns={"backup_id": "backup_123"})
        database_mock.on_call("migrate", 
            condition="backup_exists",
            returns={"migration_status": "success"}
        )
        database_mock.on_call("migrate",
            condition="no_backup",
            returns={"status": "error", "message": "No backup available"}
        )
        
        self.register_stateful_mock("database-manager", database_mock)
        
        workflow = Workflow([
            Step("create-backup").tool("database-manager").action("backup"),
            Step("run-migration").tool("database-manager").action("migrate")
        ])
        
        result = self.execute_workflow(workflow)
        
        self.assertEqual(result.status, "completed")
        self.assertIn("backup_123", str(result.step_outputs))

Performance Testing

Load Testing Workflows

class TestWorkflowPerformance(WorkflowTest):
    def test_workflow_performance_under_load(self):
        """Test workflow performance with concurrent executions"""
        
        workflow = self.create_simple_workflow()
        
        # Mock tools with realistic response times
        self.mock_tool_with_latency("http-client", 
            response={"status": "success"},
            latency_ms=100
        )
        
        # Execute workflow multiple times concurrently
        concurrent_executions = 10
        results = self.execute_workflow_concurrent(
            workflow, 
            count=concurrent_executions,
            inputs={"SERVICE_NAME": "test-service"}
        )
        
        # Analyze performance results
        successful_executions = [r for r in results if r.status == "completed"]
        self.assertEqual(len(successful_executions), concurrent_executions)
        
        # Check average execution time
        avg_duration = sum(r.duration for r in results) / len(results)
        self.assertLess(avg_duration, 5.0)  # Should complete in under 5 seconds
        
        # Check resource usage
        max_memory_usage = max(r.peak_memory for r in results)
        self.assertLess(max_memory_usage, 500 * 1024 * 1024)  # Under 500MB
    
    def test_workflow_scalability(self):
        """Test workflow behavior with varying load"""
        
        workflow = self.create_scalable_workflow()
        
        load_levels = [1, 5, 10, 20, 50]
        performance_results = {}
        
        for load in load_levels:
            results = self.execute_workflow_concurrent(workflow, count=load)
            
            performance_results[load] = {
                "avg_duration": sum(r.duration for r in results) / len(results),
                "success_rate": len([r for r in results if r.status == "completed"]) / len(results),
                "peak_memory": max(r.peak_memory for r in results)
            }
        
        # Verify scalability characteristics
        for load in load_levels:
            perf = performance_results[load]
            self.assertGreater(perf["success_rate"], 0.95)  # 95% success rate
            
            # Duration shouldn't increase linearly with load
            if load > 1:
                prev_perf = performance_results[load // 2] if load // 2 in performance_results else performance_results[1]
                duration_increase = perf["avg_duration"] / prev_perf["avg_duration"]
                self.assertLess(duration_increase, 2.0)  # Less than linear increase

Integration Testing

End-to-End Testing

import pytest
from kubiya import Kubiya

class TestWorkflowIntegration:
    @pytest.fixture
    def kubiya_client(self):
        """Create Kubiya client for testing"""
        return Kubiya(
            api_key=os.environ["KUBIYA_TEST_API_KEY"],
            base_url="https://test-api.kubiya.ai"
        )
    
    def test_real_workflow_execution(self, kubiya_client):
        """Test workflow execution against real Kubiya environment"""
        
        # Create test workflow
        workflow = self.create_test_workflow()
        
        # Register workflow
        registered_workflow = kubiya_client.workflows.register(workflow)
        
        try:
            # Execute workflow
            execution = kubiya_client.workflows.execute(
                workflow_id=registered_workflow.id,
                inputs={
                    "SERVICE_NAME": "test-service",
                    "ENVIRONMENT": "test"
                },
                wait=True,
                timeout=300  # 5 minutes
            )
            
            # Verify results
            assert execution.status == "completed"
            assert execution.error is None
            
            # Check step outputs
            step_outputs = kubiya_client.workflows.get_execution_outputs(execution.id)
            assert "deployment_id" in step_outputs
            assert step_outputs["health_status"] == "healthy"
            
        finally:
            # Cleanup
            kubiya_client.workflows.delete(registered_workflow.id)
    
    def test_workflow_with_real_integrations(self, kubiya_client):
        """Test workflow using real external integrations"""
        
        # Verify test integrations are available
        integrations = kubiya_client.integrations.list(tags=["test"])
        test_k8s = next((i for i in integrations if i.type == "kubernetes"), None)
        test_slack = next((i for i in integrations if i.type == "slack"), None)
        
        assert test_k8s is not None, "Test Kubernetes integration not found"
        assert test_slack is not None, "Test Slack integration not found"
        
        # Create workflow using real integrations
        workflow = Workflow([
            Step("test-k8s-connection").tool("kubectl").inputs(
                command="get pods",
                namespace="test"
            ),
            Step("send-test-notification").tool("slack").inputs(
                channel="#test-notifications",
                message="Integration test completed successfully"
            )
        ])
        
        # Execute and verify
        registered = kubiya_client.workflows.register(workflow)
        execution = kubiya_client.workflows.execute(
            registered.id,
            wait=True,
            timeout=120
        )
        
        assert execution.status == "completed"

Test Data Management

Test Fixtures

class WorkflowTestFixtures:
    @staticmethod
    def create_sample_deployment_workflow():
        """Create a sample deployment workflow for testing"""
        return Workflow([
            Step("validate-input").tool("validator").inputs(
                schema="deployment-schema.json"
            ),
            Step("build-application").tool("docker-builder").inputs(
                dockerfile="Dockerfile",
                tag="${VERSION}"
            ),
            Step("run-tests").tool("test-runner").inputs(
                test_suite="integration"
            ),
            Step("deploy-service").tool("kubernetes-deployer").depends_on([
                "build-application", "run-tests"
            ]),
            Step("verify-deployment").tool("health-checker").depends_on("deploy-service")
        ])
    
    @staticmethod
    def create_test_data():
        """Create test data for workflow execution"""
        return {
            "SERVICE_NAME": "test-service",
            "VERSION": "v1.0.0-test",
            "ENVIRONMENT": "test",
            "NAMESPACE": "test-namespace",
            "REPLICAS": 1
        }
    
    @staticmethod
    def create_mock_responses():
        """Create standard mock responses for common tools"""
        return {
            "validator": {"status": "valid", "errors": []},
            "docker-builder": {"status": "success", "image_id": "sha256:test123"},
            "test-runner": {"status": "success", "tests_passed": 25, "tests_failed": 0},
            "kubernetes-deployer": {"status": "success", "deployment_name": "test-service"},
            "health-checker": {"status": "healthy", "response_time": 150}
        }

Best Practices

Test Organization

  • Separate unit tests from integration tests
  • Use descriptive test names that explain the scenario
  • Group related tests in test classes
  • Use fixtures for common test data and mocks

Mock Strategy

  • Mock external dependencies consistently
  • Use realistic mock responses
  • Test both success and failure scenarios
  • Implement stateful mocks for complex interactions

Test Coverage

  • Test all workflow paths and branches
  • Test error handling and recovery scenarios
  • Test performance under load
  • Test with realistic data volumes

Continuous Testing

  • Run tests in CI/CD pipelines
  • Include performance regression tests
  • Test against multiple environments
  • Validate backwards compatibility
I