Unit Testing Steps
Basic Step Testing
Copy
Ask AI
import unittest
from kubiya_sdk.testing import StepTest
class TestDeploymentSteps(StepTest):
def test_build_docker_image_step(self):
"""Test Docker image build step"""
# Create step to test
build_step = Step("build-image").tool("docker-builder").inputs(
dockerfile_path="./Dockerfile",
image_name="myapp",
tag="v1.0.0"
)
# Mock tool response
self.mock_tool("docker-builder", {
"status": "success",
"image_id": "sha256:abc123...",
"image_size": "245MB"
})
# Execute step
result = self.execute_step(build_step, {
"BUILD_VERSION": "v1.0.0"
})
# Assertions
self.assertEqual(result.status, "success")
self.assertIn("image_id", result.output)
self.assertEqual(result.output["image_size"], "245MB")
Testing Step Conditions
Copy
Ask AI
class TestConditionalSteps(StepTest):
def test_production_only_step(self):
"""Test step that only runs in production"""
production_step = Step("production-backup").tool("backup-tool").inputs(
database="prod-db"
).condition("${ENVIRONMENT} == 'production'")
# Test with production environment
result = self.execute_step(production_step, {
"ENVIRONMENT": "production"
})
self.assertTrue(result.executed)
# Test with staging environment
result = self.execute_step(production_step, {
"ENVIRONMENT": "staging"
})
self.assertFalse(result.executed)
self.assertEqual(result.skip_reason, "Condition not met")
def test_complex_condition_logic(self):
"""Test step with complex conditional logic"""
complex_step = Step("complex-deployment").tool("deployer").condition("""
(${ENVIRONMENT} == 'production' and ${APPROVAL_RECEIVED} == 'true') or
(${ENVIRONMENT} == 'staging' and ${TEST_RESULTS.success_rate} > 0.95)
""")
# Test production with approval
result = self.execute_step(complex_step, {
"ENVIRONMENT": "production",
"APPROVAL_RECEIVED": "true"
})
self.assertTrue(result.executed)
# Test staging with high success rate
result = self.execute_step(complex_step, {
"ENVIRONMENT": "staging",
"TEST_RESULTS": {"success_rate": 0.98}
})
self.assertTrue(result.executed)
# Test staging with low success rate
result = self.execute_step(complex_step, {
"ENVIRONMENT": "staging",
"TEST_RESULTS": {"success_rate": 0.85}
})
self.assertFalse(result.executed)
Testing Error Handling
Copy
Ask AI
class TestErrorHandling(StepTest):
def test_step_retry_mechanism(self):
"""Test step retry behavior"""
retry_step = Step("flaky-operation").tool("unreliable-service").retry(
max_attempts=3,
backoff_strategy="exponential"
)
# Mock service to fail twice then succeed
self.mock_tool_sequence("unreliable-service", [
{"status": "error", "message": "Service temporarily unavailable"},
{"status": "error", "message": "Service temporarily unavailable"},
{"status": "success", "data": "Operation completed"}
])
result = self.execute_step(retry_step)
self.assertEqual(result.status, "success")
self.assertEqual(result.attempts, 3)
self.assertEqual(result.output["data"], "Operation completed")
def test_error_handler_execution(self):
"""Test error handler execution on step failure"""
failing_step = Step("failing-operation").tool("faulty-service").on_error(
ErrorHandler().on_failure([
Step("cleanup").tool("cleanup-service"),
Step("notify").tool("notification-service").inputs(
message="Operation failed, cleanup completed"
)
])
)
# Mock main service to fail
self.mock_tool("faulty-service", {
"status": "error",
"message": "Critical system failure"
})
# Mock error handling services
self.mock_tool("cleanup-service", {"status": "success"})
self.mock_tool("notification-service", {"status": "success"})
result = self.execute_step(failing_step)
self.assertEqual(result.status, "error")
self.assertTrue(result.error_handlers_executed)
self.assertEqual(len(result.error_handler_results), 2)
Workflow Testing
Integration Testing
Copy
Ask AI
from kubiya_sdk.testing import WorkflowTest
class TestDeploymentWorkflow(WorkflowTest):
def test_successful_deployment_flow(self):
"""Test complete successful deployment workflow"""
workflow = self.create_deployment_workflow()
# Mock all tools for success scenario
self.mock_tool("docker-builder", {"status": "success", "image": "myapp:v1.0.0"})
self.mock_tool("test-runner", {"status": "success", "tests_passed": 45, "tests_failed": 0})
self.mock_tool("kubernetes-deployer", {"status": "success", "deployment": "myapp-v1.0.0"})
self.mock_tool("health-checker", {"status": "healthy", "response_time": 120})
self.mock_tool("slack", {"status": "sent", "message_id": "msg_123"})
# Execute workflow
result = self.execute_workflow(workflow, {
"SERVICE_NAME": "myapp",
"VERSION": "v1.0.0",
"ENVIRONMENT": "staging"
})
# Verify workflow completion
self.assertEqual(result.status, "completed")
self.assertEqual(len(result.completed_steps), 5)
# Verify step execution order
step_names = [step.name for step in result.completed_steps]
expected_order = ["build-image", "run-tests", "deploy-service", "health-check", "notify-success"]
self.assertEqual(step_names, expected_order)
def test_deployment_with_test_failures(self):
"""Test workflow behavior when tests fail"""
workflow = self.create_deployment_workflow()
# Mock successful build but failed tests
self.mock_tool("docker-builder", {"status": "success"})
self.mock_tool("test-runner", {
"status": "failed",
"tests_passed": 40,
"tests_failed": 5,
"failed_tests": ["test_payment_flow", "test_user_auth"]
})
result = self.execute_workflow(workflow, {
"SERVICE_NAME": "myapp",
"VERSION": "v1.0.0"
})
# Workflow should stop at test failure
self.assertEqual(result.status, "failed")
self.assertEqual(len(result.completed_steps), 2) # build + tests only
self.assertEqual(result.failed_step.name, "run-tests")
# Deployment and health check should not execute
executed_step_names = [step.name for step in result.completed_steps]
self.assertNotIn("deploy-service", executed_step_names)
self.assertNotIn("health-check", executed_step_names)
Testing Branching Logic
Copy
Ask AI
class TestWorkflowBranching(WorkflowTest):
def test_environment_based_branching(self):
"""Test different execution paths based on environment"""
workflow = self.create_environment_aware_workflow()
# Test production path (requires approval)
self.mock_tool("approval-gate", {"status": "approved", "approved_by": "admin@company.com"})
self.mock_tool("production-deployer", {"status": "success"})
prod_result = self.execute_workflow(workflow, {
"ENVIRONMENT": "production",
"SERVICE_NAME": "critical-service"
})
self.assertEqual(prod_result.status, "completed")
self.assertIn("request-approval", [s.name for s in prod_result.completed_steps])
# Test staging path (automatic deployment)
self.mock_tool("staging-deployer", {"status": "success"})
staging_result = self.execute_workflow(workflow, {
"ENVIRONMENT": "staging",
"SERVICE_NAME": "critical-service"
})
self.assertEqual(staging_result.status, "completed")
self.assertNotIn("request-approval", [s.name for s in staging_result.completed_steps])
Parallel Step Testing
Copy
Ask AI
class TestParallelExecution(WorkflowTest):
def test_parallel_step_execution(self):
"""Test parallel step execution and synchronization"""
# Create workflow with parallel health checks
parallel_checks = Parallel([
Step("check-frontend").tool("health-checker").inputs(service="frontend"),
Step("check-api").tool("health-checker").inputs(service="api"),
Step("check-database").tool("health-checker").inputs(service="database")
])
workflow = Workflow([parallel_checks])
# Mock all health checks to succeed
self.mock_tool("health-checker", {"status": "healthy"})
result = self.execute_workflow(workflow)
self.assertEqual(result.status, "completed")
# Verify all parallel steps executed
parallel_step_names = {step.name for step in result.completed_steps}
expected_steps = {"check-frontend", "check-api", "check-database"}
self.assertEqual(parallel_step_names, expected_steps)
# Verify parallel execution timing
self.assertLess(result.total_duration, result.sum_of_step_durations)
def test_parallel_step_failure_handling(self):
"""Test workflow behavior when one parallel step fails"""
parallel_checks = Parallel([
Step("check-frontend").tool("health-checker"),
Step("check-api").tool("health-checker"),
Step("check-database").tool("health-checker")
])
# Mock database check to fail
self.mock_tool_by_inputs("health-checker",
{"service": "frontend"}, {"status": "healthy"}
)
self.mock_tool_by_inputs("health-checker",
{"service": "api"}, {"status": "healthy"}
)
self.mock_tool_by_inputs("health-checker",
{"service": "database"}, {"status": "unhealthy", "error": "Connection timeout"}
)
workflow = Workflow([parallel_checks])
result = self.execute_workflow(workflow)
# Workflow should fail due to database health check
self.assertEqual(result.status, "failed")
self.assertEqual(result.failed_step.name, "check-database")
Mock and Stub Utilities
Tool Mocking
Copy
Ask AI
class TestWithMocks(WorkflowTest):
def setUp(self):
"""Set up common mocks"""
super().setUp()
# Mock common tools
self.mock_tool("logger", {"status": "logged"})
self.mock_tool("metrics-collector", {"status": "collected"})
# Set up realistic external service mocks
self.setup_kubernetes_mocks()
self.setup_slack_mocks()
def setup_kubernetes_mocks(self):
"""Set up Kubernetes-related tool mocks"""
self.mock_tool("kubectl", {
"status": "success",
"resources_applied": 3,
"namespace": "default"
})
self.mock_tool("helm", {
"status": "success",
"release_name": "myapp-v1.0.0",
"revision": 1
})
def setup_slack_mocks(self):
"""Set up Slack notification mocks"""
self.mock_tool("slack", {
"status": "sent",
"channel": "#deployments",
"timestamp": "2024-01-15T10:30:00Z"
})
def test_with_realistic_mocks(self):
"""Test workflow with realistic tool responses"""
deployment_step = Step("deploy-with-helm").tool("helm").inputs(
chart="./myapp-chart",
release_name="myapp",
namespace="production"
)
result = self.execute_step(deployment_step)
self.assertEqual(result.status, "success")
self.assertEqual(result.output["release_name"], "myapp-v1.0.0")
Dynamic Mocks
Copy
Ask AI
class TestDynamicMocking(WorkflowTest):
def test_stateful_service_interaction(self):
"""Test workflow with stateful service interactions"""
# Create mock that changes behavior based on previous calls
database_mock = StatefulMock()
database_mock.on_call("backup", returns={"backup_id": "backup_123"})
database_mock.on_call("migrate",
condition="backup_exists",
returns={"migration_status": "success"}
)
database_mock.on_call("migrate",
condition="no_backup",
returns={"status": "error", "message": "No backup available"}
)
self.register_stateful_mock("database-manager", database_mock)
workflow = Workflow([
Step("create-backup").tool("database-manager").action("backup"),
Step("run-migration").tool("database-manager").action("migrate")
])
result = self.execute_workflow(workflow)
self.assertEqual(result.status, "completed")
self.assertIn("backup_123", str(result.step_outputs))
Performance Testing
Load Testing Workflows
Copy
Ask AI
class TestWorkflowPerformance(WorkflowTest):
def test_workflow_performance_under_load(self):
"""Test workflow performance with concurrent executions"""
workflow = self.create_simple_workflow()
# Mock tools with realistic response times
self.mock_tool_with_latency("http-client",
response={"status": "success"},
latency_ms=100
)
# Execute workflow multiple times concurrently
concurrent_executions = 10
results = self.execute_workflow_concurrent(
workflow,
count=concurrent_executions,
inputs={"SERVICE_NAME": "test-service"}
)
# Analyze performance results
successful_executions = [r for r in results if r.status == "completed"]
self.assertEqual(len(successful_executions), concurrent_executions)
# Check average execution time
avg_duration = sum(r.duration for r in results) / len(results)
self.assertLess(avg_duration, 5.0) # Should complete in under 5 seconds
# Check resource usage
max_memory_usage = max(r.peak_memory for r in results)
self.assertLess(max_memory_usage, 500 * 1024 * 1024) # Under 500MB
def test_workflow_scalability(self):
"""Test workflow behavior with varying load"""
workflow = self.create_scalable_workflow()
load_levels = [1, 5, 10, 20, 50]
performance_results = {}
for load in load_levels:
results = self.execute_workflow_concurrent(workflow, count=load)
performance_results[load] = {
"avg_duration": sum(r.duration for r in results) / len(results),
"success_rate": len([r for r in results if r.status == "completed"]) / len(results),
"peak_memory": max(r.peak_memory for r in results)
}
# Verify scalability characteristics
for load in load_levels:
perf = performance_results[load]
self.assertGreater(perf["success_rate"], 0.95) # 95% success rate
# Duration shouldn't increase linearly with load
if load > 1:
prev_perf = performance_results[load // 2] if load // 2 in performance_results else performance_results[1]
duration_increase = perf["avg_duration"] / prev_perf["avg_duration"]
self.assertLess(duration_increase, 2.0) # Less than linear increase
Integration Testing
End-to-End Testing
Copy
Ask AI
import pytest
from kubiya_sdk import Kubiya
class TestWorkflowIntegration:
@pytest.fixture
def kubiya_client(self):
"""Create Kubiya client for testing"""
return Kubiya(
api_key=os.environ["KUBIYA_TEST_API_KEY"],
base_url="https://test-api.kubiya.ai"
)
def test_real_workflow_execution(self, kubiya_client):
"""Test workflow execution against real Kubiya environment"""
# Create test workflow
workflow = self.create_test_workflow()
# Register workflow
registered_workflow = kubiya_client.workflows.register(workflow)
try:
# Execute workflow
execution = kubiya_client.workflows.execute(
workflow_id=registered_workflow.id,
inputs={
"SERVICE_NAME": "test-service",
"ENVIRONMENT": "test"
},
wait=True,
timeout=300 # 5 minutes
)
# Verify results
assert execution.status == "completed"
assert execution.error is None
# Check step outputs
step_outputs = kubiya_client.workflows.get_execution_outputs(execution.id)
assert "deployment_id" in step_outputs
assert step_outputs["health_status"] == "healthy"
finally:
# Cleanup
kubiya_client.workflows.delete(registered_workflow.id)
def test_workflow_with_real_integrations(self, kubiya_client):
"""Test workflow using real external integrations"""
# Verify test integrations are available
integrations = kubiya_client.integrations.list(tags=["test"])
test_k8s = next((i for i in integrations if i.type == "kubernetes"), None)
test_slack = next((i for i in integrations if i.type == "slack"), None)
assert test_k8s is not None, "Test Kubernetes integration not found"
assert test_slack is not None, "Test Slack integration not found"
# Create workflow using real integrations
workflow = Workflow([
Step("test-k8s-connection").tool("kubectl").inputs(
command="get pods",
namespace="test"
),
Step("send-test-notification").tool("slack").inputs(
channel="#test-notifications",
message="Integration test completed successfully"
)
])
# Execute and verify
registered = kubiya_client.workflows.register(workflow)
execution = kubiya_client.workflows.execute(
registered.id,
wait=True,
timeout=120
)
assert execution.status == "completed"
Test Data Management
Test Fixtures
Copy
Ask AI
class WorkflowTestFixtures:
@staticmethod
def create_sample_deployment_workflow():
"""Create a sample deployment workflow for testing"""
return Workflow([
Step("validate-input").tool("validator").inputs(
schema="deployment-schema.json"
),
Step("build-application").tool("docker-builder").inputs(
dockerfile="Dockerfile",
tag="${VERSION}"
),
Step("run-tests").tool("test-runner").inputs(
test_suite="integration"
),
Step("deploy-service").tool("kubernetes-deployer").depends_on([
"build-application", "run-tests"
]),
Step("verify-deployment").tool("health-checker").depends_on("deploy-service")
])
@staticmethod
def create_test_data():
"""Create test data for workflow execution"""
return {
"SERVICE_NAME": "test-service",
"VERSION": "v1.0.0-test",
"ENVIRONMENT": "test",
"NAMESPACE": "test-namespace",
"REPLICAS": 1
}
@staticmethod
def create_mock_responses():
"""Create standard mock responses for common tools"""
return {
"validator": {"status": "valid", "errors": []},
"docker-builder": {"status": "success", "image_id": "sha256:test123"},
"test-runner": {"status": "success", "tests_passed": 25, "tests_failed": 0},
"kubernetes-deployer": {"status": "success", "deployment_name": "test-service"},
"health-checker": {"status": "healthy", "response_time": 150}
}
Best Practices
Test Organization
- Separate unit tests from integration tests
- Use descriptive test names that explain the scenario
- Group related tests in test classes
- Use fixtures for common test data and mocks
Mock Strategy
- Mock external dependencies consistently
- Use realistic mock responses
- Test both success and failure scenarios
- Implement stateful mocks for complex interactions
Test Coverage
- Test all workflow paths and branches
- Test error handling and recovery scenarios
- Test performance under load
- Test with realistic data volumes
Continuous Testing
- Run tests in CI/CD pipelines
- Include performance regression tests
- Test against multiple environments
- Validate backwards compatibility