@workflowdef main_pipeline(): # Run data ingestion workflow ingest = step("run-ingestion").workflow( name="data-ingestion", inputs={"source": "production-db"} ) # Run processing workflow process = step("run-processing").workflow( name="data-processing", inputs={"data_path": "${ingest.output_path}"} ).depends("run-ingestion") # Run ML training workflow step("run-training").workflow( name="ml-training", inputs={"features": "${process.features}"} ).depends("run-processing")
@workflowdef dynamic_processing(): # Get list of datasets datasets = step("list-datasets").docker( image="awscli/aws-cli:latest", command="aws s3 ls s3://data-bucket/ --recursive | grep .csv" ).output("DATASET_LIST") # AI agent creates processing plan plan = step("create-plan").inline_agent( message="Create a processing plan for these datasets: ${DATASET_LIST}", agent_name="data-planner", tools=[ { "name": "analyze-dataset", "image": "python:3.11", "script": "analyze_data_schema.py" } ] ).depends("list-datasets").output("PROCESSING_PLAN") # Execute the generated plan step("execute-plan").dynamic_workflow( plan="${PROCESSING_PLAN}" ).depends("create-plan")
Use specific timeouts: Set reasonable timeouts for each step to prevent hanging.Implement proper cleanup: Always include cleanup steps that run even on failure.Monitor resource usage: Set appropriate CPU and memory limits.Handle secrets securely: Use environment variables for sensitive data, never hardcode.Test failure scenarios: Verify your error handling and retry logic work correctly.Use meaningful step names: Make workflow execution logs easy to understand.Optimize for parallelism: Identify independent steps that can run simultaneously.Next: Learn about DSL syntax or see more workflow examples