There is a huge variety of step types available, including shell commands, shell scripts, Python code, etc…
Here are some simple common examples of how to define steps using the DSL:
As we saw in the examples below, we can define workflows and different types of steps, including shell commands, scripts, and Python code.
Now let’s combine them. Here is an example of a more complex workflow with multiple steps:
Copy
from kubiya_workflow_sdk.dsl import Workflow# Here you can see a 3-step workflow, where the first step is independent,# however the third step depends on the output of the second step and uses it as an input.wf = ( Workflow("multistep-workflow") .description("This is a multistep workflow") .step("initial-step", "echo 'This is the first step!'") .step("produce-data", callback = lambda s: s.shell("echo '42'") .output("RESULT_VAR") ) .step("consume-data", callback = lambda s: s.python("print(int('{{RESULT_VAR}}') * 2)") .depends("produce-data") ))
In this example, we will create a more complex workflow that includes multiple steps, dependencies, and outputs.
The workflow will fetch data from an API, perform a health check on a server, and generate a final summary based on the results.
Last 2 steps use Docker containers to run shell scripts. It is possible to use any Docker image, but in this example we will use the alpine:latest image for simplicity.
At the end we will use the async StreamingKubiyaClient to execute the workflow and stream the results.
Copy
from kubiya_workflow_sdk import StreamingKubiyaClient, workflowwf = ( workflow("advanced-workflow") .description("This is a multistep workflow") .step("get-runners", callback=lambda s: s.kubiya(url="api/v3/runners", method="GET") .output("RUNNERS") ) .step("server-healthcheck", callback=lambda s: s.description("Make a server health check") .docker( image="alpine:latest", content= """ #!/bin/sh set -e if ! command -v curl >/dev/null 2>&1; then apk add --no-cache curl fi echo "Checking health of $SERVER_URL ..." if curl -fsS "$SERVER_URL" >/dev/null; then exit 0 else exit 1 fi """ ) .output("SERVER_HEALTH") .depends("get-runners") ) .step("final-summary", callback=lambda s: s.tool_def( name="generate_summary", description="Generate final summary", type="docker", image="alpine:latest", content=""" #!/bin/sh if health; then echo "✅ Server is healthy" else echo "❌ Server health check failed" fi echo "Runners: $runners" """, args={ "health": "${SERVER_HEALTH}", "runners": "${RUNNERS}", }, ) .output("FINAL_SUMMARY") .depends("server-healthcheck") ) )# Execute the workflow using the async clienttry: async with StreamingKubiyaClient(api_key="your Kubiya API key") as client: async for event in client.execute_workflow_stream(wf.to_dict()): print(f"Event: {event}")except Exception as e: print(f"Error: {e}")
# Good: Graceful degradationtry: result = step.primary_service()except ServiceError: result = step.fallback_service() step.alert_team("Using fallback service")# Bad: Let it crashresult = step.primary_service() # No error handling