The Task Planning Service provides AI-powered task planning capabilities, automatically breaking down complex tasks into actionable steps. It helps convert high-level task descriptions into structured execution plans.
Quick Start
from kubiya import ControlPlaneClient
client = ControlPlaneClient()
# Generate a task plan
plan = client.task_planning.plan(
task_description = "Deploy a new microservice to production" ,
context = {
"service_name" : "payment-api" ,
"environment" : "production"
}
)
print ( f "Plan: { plan[ 'plan' ] } " )
print ( f "Steps: { len (plan[ 'steps' ]) } " )
# Generate plan with streaming
for update in client.task_planning.plan_stream(
task_description = "Troubleshoot database performance issues"
):
print (update[ 'data' ])
Features
Task Decomposition Break complex tasks into steps
AI-Powered Planning Leverage AI for intelligent planning
Streaming Plans Real-time plan generation updates
Context-Aware Plans consider provided context
Generate Task Plan
Create a structured plan from a task description:
# Basic task planning
plan = client.task_planning.plan(
task_description = "Set up monitoring for the application"
)
print ( f "Generated Plan:" )
print ( f " Title: { plan[ 'title' ] } " )
print ( f " Description: { plan[ 'description' ] } " )
print ( f " Steps:" )
for i, step in enumerate (plan[ 'steps' ], 1 ):
print ( f " { i } . { step[ 'description' ] } " )
print ( f " Estimated time: { step.get( 'estimated_time' , 'N/A' ) } " )
Parameters:
task_description (str, required): High-level description of the task
context (dict, optional): Additional context for planning
Returns: Dictionary containing the generated plan with steps
Plan with Context
Provide context to generate more accurate plans:
# Plan with detailed context
plan = client.task_planning.plan(
task_description = "Migrate database to new version" ,
context = {
"database_type" : "PostgreSQL" ,
"current_version" : "12.0" ,
"target_version" : "15.0" ,
"environment" : "production" ,
"backup_required" : True ,
"downtime_allowed" : False
}
)
print ( f "Steps for migration:" )
for step in plan[ 'steps' ]:
print ( f " - { step[ 'description' ] } " )
if 'prerequisites' in step:
print ( f " Prerequisites: { step[ 'prerequisites' ] } " )
if 'risks' in step:
print ( f " Risks: { step[ 'risks' ] } " )
Context Fields (examples):
environment: Target environment (dev, staging, production)
service_name: Service or application name
resources: Available resources
constraints: Time, budget, or other constraints
requirements: Specific requirements or standards
existing_state: Current state information
Streaming Task Plans
Generate plans with real-time streaming updates:
# Stream plan generation
print ( "Generating plan..." )
for update in client.task_planning.plan_stream(
task_description = "Implement CI/CD pipeline for microservices" ,
context = {
"tech_stack" : "Kubernetes, Docker, GitHub Actions" ,
"team_size" : 5 ,
"timeline" : "2 weeks"
}
):
# Each update contains plan generation progress
data = update.get( 'data' , '' )
print (data, end = '' , flush = True )
print ( " \n Plan generation complete!" )
Parameters:
task_description (str, required): Task to plan
context (dict, optional): Planning context
Yields: Plan generation updates as dictionaries
Use streaming for complex tasks that take longer to plan. This provides real-time feedback to users as the plan is being generated.
Complete Example
Here’s a complete example showing task planning workflow:
from kubiya import ControlPlaneClient
client = ControlPlaneClient()
# 1. Define a complex task
task = "Implement zero-downtime deployment for our e-commerce platform"
context = {
"current_setup" : {
"infrastructure" : "Kubernetes on AWS" ,
"deployment_method" : "manual kubectl apply" ,
"services" : 12 ,
"traffic" : "10k req/min peak"
},
"requirements" : {
"downtime" : "none allowed" ,
"rollback" : "must be instant" ,
"monitoring" : "comprehensive metrics required"
},
"constraints" : {
"budget" : "medium" ,
"timeline" : "1 month" ,
"team_size" : 3
}
}
# 2. Generate the plan
print ( "Generating deployment plan..." )
plan = client.task_planning.plan(
task_description = task,
context = context
)
# 3. Display the plan
print ( f " \n { '=' * 60 } " )
print ( f "Task Plan: { plan[ 'title' ] } " )
print ( f " { '=' * 60 } \n " )
print ( f "Overview: { plan[ 'description' ] } \n " )
print ( "Execution Steps:" )
for i, step in enumerate (plan[ 'steps' ], 1 ):
print ( f " \n { i } . { step[ 'name' ] } " )
print ( f " Description: { step[ 'description' ] } " )
if 'subtasks' in step:
print ( f " Subtasks:" )
for j, subtask in enumerate (step[ 'subtasks' ], 1 ):
print ( f " { i } . { j } . { subtask } " )
if 'estimated_time' in step:
print ( f " Est. Time: { step[ 'estimated_time' ] } " )
if 'required_skills' in step:
print ( f " Skills: { ', ' .join(step[ 'required_skills' ]) } " )
if 'dependencies' in step:
print ( f " Dependencies: { step[ 'dependencies' ] } " )
# 4. Generate follow-up plan for a specific step
print ( " \n\n Generating detailed plan for first step..." )
detailed_plan = client.task_planning.plan(
task_description = f "Detailed breakdown: { plan[ 'steps' ][ 0 ][ 'name' ] } " ,
context = {
** context,
"parent_task" : task,
"step_number" : 1
}
)
print ( f "Detailed steps for ' { plan[ 'steps' ][ 0 ][ 'name' ] } ':" )
for substep in detailed_plan[ 'steps' ]:
print ( f " - { substep[ 'description' ] } " )
Error Handling
Handle task planning errors:
from kubiya.resources.exceptions import TaskPlanningError
try :
plan = client.task_planning.plan(
task_description = "Deploy the thing" # Too vague
)
except TaskPlanningError as e:
print ( f "Planning error: { e } " )
print ( "Tip: Provide more specific task description" )
except Exception as e:
print ( f "Unexpected error: { e } " )
Use Cases
Deployment Planning
plan = client.task_planning.plan(
task_description = "Deploy new version of payment service" ,
context = {
"service" : "payment-service" ,
"version" : "2.0.0" ,
"environment" : "production" ,
"current_version" : "1.5.0" ,
"rollback_plan" : "required"
}
)
Incident Response
plan = client.task_planning.plan(
task_description = "Respond to database outage in production" ,
context = {
"severity" : "critical" ,
"affected_services" : [ "api" , "web" , "mobile" ],
"symptoms" : "connection timeouts" ,
"time_detected" : "2024-12-11 14:30 UTC"
}
)
Infrastructure Setup
plan = client.task_planning.plan(
task_description = "Set up new Kubernetes cluster for staging" ,
context = {
"cloud_provider" : "AWS" ,
"region" : "us-west-2" ,
"node_count" : 5 ,
"requirements" : [ "monitoring" , "logging" , "auto-scaling" ]
}
)
Onboarding
plan = client.task_planning.plan(
task_description = "Onboard new developer to the platform team" ,
context = {
"role" : "Senior DevOps Engineer" ,
"experience_level" : "senior" ,
"tech_stack" : [ "Kubernetes" , "Terraform" , "Python" ],
"timeline" : "2 weeks"
}
)
Migration Planning
plan = client.task_planning.plan(
task_description = "Migrate from monolith to microservices" ,
context = {
"current_architecture" : "monolithic Ruby on Rails app" ,
"target_architecture" : "microservices with Node.js" ,
"team_size" : 10 ,
"timeline" : "6 months" ,
"business_continuity" : "critical"
}
)
Best Practices
Task Descriptions
Be Specific : Provide clear, detailed task descriptions
Define Success : Include success criteria in the description
Set Constraints : Mention time, budget, or resource constraints
Specify Dependencies : Note any dependencies or prerequisites
Context Provision
Relevant Information : Include all relevant context
Current State : Describe the current state clearly
Target State : Define the desired end state
Constraints : List limitations and requirements
Risk Factors : Mention known risks or concerns
Plan Usage
Review Plans : Always review generated plans before execution
Iterative Refinement : Generate detailed plans for complex steps
Team Input : Share plans with team for feedback
Adapt as Needed : Plans are guidelines; adapt based on reality
Streaming
Long Tasks : Use streaming for tasks that take time to plan
User Feedback : Show progress to users during planning
Handle Interruptions : Handle stream interruptions gracefully
Buffer Output : Buffer output for better user experience
Integration with Execution
Task planning integrates with execution services:
# 1. Generate plan
plan = client.task_planning.plan(
task_description = "Deploy application update"
)
# 2. Use plan with agent execution
for step in plan[ 'steps' ]:
# Execute each step with an agent
result = client.agents.execute(
agent_id = "deployment-agent-uuid" ,
execution_data = {
"prompt" : step[ 'description' ],
"context" : step.get( 'context' , {})
}
)
print ( f "Step ' { step[ 'name' ] } ': { result[ 'status' ] } " )
# 3. Or use with team execution
team_result = client.teams.execute(
team_id = "deployment-team-uuid" ,
execution_data = {
"prompt" : f "Execute this plan: { plan } " ,
"system_prompt" : "Follow the plan step by step"
}
)
API Reference
Method Endpoint Pattern Description plan()POST /task-planning/plan Generate task plan plan_stream()POST /task-planning/plan/stream Generate plan with streaming
Next Steps