Generate workflows from natural language using Google’s Agent Development Kit
The ADK (Agent Development Kit) provider enables AI-powered workflow generation using advanced language models. It’s the first official provider for the Kubiya Workflow SDK.
from kubiya_workflow_sdk.providers import get_provider# Initialize ADK provideradk = get_provider("adk")# Generate a workflow from natural languageresult = await adk.compose( task="Create a workflow that checks disk space and sends an alert if usage is above 80%", mode="plan" # Just generate, don't execute)# View the generated workflowworkflow = result["workflow"]print(f"Generated workflow: {workflow.name}")print(f"Steps: {len(workflow.steps)}")# Print the workflow as YAMLprint(workflow.to_yaml())
# Generate AND execute in one goasync for event in adk.compose( task="List all Python files in /tmp and count lines of code", mode="act", # Generate and execute stream=True # Stream execution events): if event.get("type") == "text": # Generation progress print(f"AI: {event.get('content')}") elif event.get("type") == "execution": # Execution events print(f"Execution: {event.get('data')}")
task = """Create a comprehensive backup and disaster recovery workflow that:1. Backs up all PostgreSQL databases2. Compresses backups with timestamps3. Uploads to S3 with encryption4. Validates backup integrity5. Removes backups older than 30 days6. Sends detailed report to Slack with: - Backup sizes - Duration - Success/failure status7. Triggers PagerDuty alert if backup fails"""result = await adk.compose(task=task, mode="plan")# The AI will generate a complete workflow with:# - Proper error handling# - Retry logic for network operations # - Parallel execution where appropriate# - Integration with S3, Slack, and PagerDuty
task = """Create a Kubernetes deployment workflow that:1. Validates the deployment manifest2. Checks if the namespace exists, creates if not3. Applies the deployment with rolling update strategy4. Waits for all pods to be ready (timeout 5 minutes)5. Runs smoke tests against the new deployment6. If tests fail, automatically rollback to previous version7. Update deployment status in GitHub PR"""# Generate and execute with streamingasync for event in adk.compose(task=task, mode="act", stream=True): # Handle events as they come handle_event(event)
task = """Create an incident response workflow that:1. Fetches alert details from PagerDuty2. Checks system health metrics for the affected service3. Attempts automatic remediation based on alert type: - For high CPU: restart the service - For high memory: clear caches and restart - For disk space: clean up logs and temp files4. Verifies the issue is resolved5. Creates a JIRA ticket with incident details6. Posts incident summary to #incidents Slack channel"""# With custom contextcontext = { "services": ["api", "web", "worker"], "monitoring_tool": "datadog", "ticketing_system": "jira"}result = await adk.compose( task=task, context=context, mode="plan")