The Task Planning Service provides AI-powered task planning capabilities, automatically breaking down complex tasks into actionable steps. It helps convert high-level task descriptions into structured execution plans.
Quick Start
from kubiya import ControlPlaneClient
client = ControlPlaneClient()
# Generate a task plan
plan = client.task_planning.plan(
task_description = "Deploy a new microservice to production" ,
context = {
"service_name" : "payment-api" ,
"environment" : "production"
}
)
print ( f "Plan: { plan[ 'plan' ] } " )
print ( f "Steps: { len (plan[ 'steps' ]) } " )
# Generate plan with streaming
for update in client.task_planning.plan_stream(
task_description = "Troubleshoot database performance issues"
):
print (update[ 'data' ])
Features
Task Decomposition Break complex tasks into steps
AI-Powered Planning Leverage AI for intelligent planning
Streaming Plans Real-time plan generation updates
Context-Aware Plans consider provided context
Generate Task Plan
Create a structured plan from a task description:
# Basic task planning
plan = client.task_planning.plan(
task_description = "Set up monitoring for the application"
)
print ( f "Generated Plan:" )
print ( f " Title: { plan[ 'title' ] } " )
print ( f " Description: { plan[ 'description' ] } " )
print ( f " Steps:" )
for i, step in enumerate (plan[ 'steps' ], 1 ):
print ( f " { i } . { step[ 'description' ] } " )
print ( f " Estimated time: { step.get( 'estimated_time' , 'N/A' ) } " )
Parameters:
task_description (str, required): High-level description of the task
context (dict, optional): Additional context for planning
Returns: Dictionary containing the generated plan with steps
Plan with Context
Provide context to generate more accurate plans:
# Plan with detailed context
plan = client.task_planning.plan(
task_description = "Migrate database to new version" ,
context = {
"database_type" : "PostgreSQL" ,
"current_version" : "12.0" ,
"target_version" : "15.0" ,
"environment" : "production" ,
"backup_required" : True ,
"downtime_allowed" : False
}
)
print ( f "Steps for migration:" )
for step in plan[ 'steps' ]:
print ( f " - { step[ 'description' ] } " )
if 'prerequisites' in step:
print ( f " Prerequisites: { step[ 'prerequisites' ] } " )
if 'risks' in step:
print ( f " Risks: { step[ 'risks' ] } " )
Context Fields (examples):
environment: Target environment (dev, staging, production)
service_name: Service or application name
resources: Available resources
constraints: Time, budget, or other constraints
requirements: Specific requirements or standards
existing_state: Current state information
Streaming Task Plans
Generate plans with real-time streaming updates:
# Stream plan generation
print ( "Generating plan..." )
for update in client.task_planning.plan_stream(
task_description = "Implement CI/CD pipeline for microservices" ,
context = {
"tech_stack" : "Kubernetes, Docker, GitHub Actions" ,
"team_size" : 5 ,
"timeline" : "2 weeks"
}
):
# Each update contains plan generation progress
data = update.get( 'data' , '' )
print (data, end = '' , flush = True )
print ( " \n Plan generation complete!" )
Parameters:
task_description (str, required): Task to plan
context (dict, optional): Planning context
Yields: Plan generation updates as dictionaries
Use streaming for complex tasks that take longer to plan. This provides real-time feedback to users as the plan is being generated.
Complete Example
Here’s a complete example showing task planning workflow:
from kubiya import ControlPlaneClient
client = ControlPlaneClient()
# 1. Define a complex task
task = "Implement zero-downtime deployment for our e-commerce platform"
context = {
"current_setup" : {
"infrastructure" : "Kubernetes on AWS" ,
"deployment_method" : "manual kubectl apply" ,
"services" : 12 ,
"traffic" : "10k req/min peak"
},
"requirements" : {
"downtime" : "none allowed" ,
"rollback" : "must be instant" ,
"monitoring" : "comprehensive metrics required"
},
"constraints" : {
"budget" : "medium" ,
"timeline" : "1 month" ,
"team_size" : 3
}
}
# 2. Generate the plan
print ( "Generating deployment plan..." )
plan = client.task_planning.plan(
task_description = task,
context = context
)
# 3. Display the plan
print ( f " \n { '=' * 60 } " )
print ( f "Task Plan: { plan[ 'title' ] } " )
print ( f " { '=' * 60 } \n " )
print ( f "Overview: { plan[ 'description' ] } \n " )
print ( "Execution Steps:" )
for i, step in enumerate (plan[ 'steps' ], 1 ):
print ( f " \n { i } . { step[ 'name' ] } " )
print ( f " Description: { step[ 'description' ] } " )
if 'subtasks' in step:
print ( f " Subtasks:" )
for j, subtask in enumerate (step[ 'subtasks' ], 1 ):
print ( f " { i } . { j } . { subtask } " )
if 'estimated_time' in step:
print ( f " Est. Time: { step[ 'estimated_time' ] } " )
if 'required_skills' in step:
print ( f " Skills: { ', ' .join(step[ 'required_skills' ]) } " )
if 'dependencies' in step:
print ( f " Dependencies: { step[ 'dependencies' ] } " )
# 4. Generate follow-up plan for a specific step
print ( " \n\n Generating detailed plan for first step..." )
detailed_plan = client.task_planning.plan(
task_description = f "Detailed breakdown: { plan[ 'steps' ][ 0 ][ 'name' ] } " ,
context = {
** context,
"parent_task" : task,
"step_number" : 1
}
)
print ( f "Detailed steps for ' { plan[ 'steps' ][ 0 ][ 'name' ] } ':" )
for substep in detailed_plan[ 'steps' ]:
print ( f " - { substep[ 'description' ] } " )
Error Handling
Handle task planning errors:
from kubiya.resources.exceptions import TaskPlanningError
try :
plan = client.task_planning.plan(
task_description = "Deploy the thing" # Too vague
)
except TaskPlanningError as e:
print ( f "Planning error: { e } " )
print ( "Tip: Provide more specific task description" )
except Exception as e:
print ( f "Unexpected error: { e } " )
Use Cases
Deployment Planning
plan = client.task_planning.plan(
task_description = "Deploy new version of payment service" ,
context = {
"service" : "payment-service" ,
"version" : "2.0.0" ,
"environment" : "production" ,
"current_version" : "1.5.0" ,
"rollback_plan" : "required"
}
)
Incident Response
plan = client.task_planning.plan(
task_description = "Respond to database outage in production" ,
context = {
"severity" : "critical" ,
"affected_services" : [ "api" , "web" , "mobile" ],
"symptoms" : "connection timeouts" ,
"time_detected" : "2024-12-11 14:30 UTC"
}
)
Infrastructure Setup
plan = client.task_planning.plan(
task_description = "Set up new Kubernetes cluster for staging" ,
context = {
"cloud_provider" : "AWS" ,
"region" : "us-west-2" ,
"node_count" : 5 ,
"requirements" : [ "monitoring" , "logging" , "auto-scaling" ]
}
)
Onboarding
plan = client.task_planning.plan(
task_description = "Onboard new developer to the platform team" ,
context = {
"role" : "Senior DevOps Engineer" ,
"experience_level" : "senior" ,
"tech_stack" : [ "Kubernetes" , "Terraform" , "Python" ],
"timeline" : "2 weeks"
}
)
Migration Planning
plan = client.task_planning.plan(
task_description = "Migrate from monolith to microservices" ,
context = {
"current_architecture" : "monolithic Ruby on Rails app" ,
"target_architecture" : "microservices with Node.js" ,
"team_size" : 10 ,
"timeline" : "6 months" ,
"business_continuity" : "critical"
}
)
Best Practices
Task Descriptions
Be Specific : Provide clear, detailed task descriptions
Define Success : Include success criteria in the description
Set Constraints : Mention time, budget, or resource constraints
Specify Dependencies : Note any dependencies or prerequisites
Context Provision
Relevant Information : Include all relevant context
Current State : Describe the current state clearly
Target State : Define the desired end state
Constraints : List limitations and requirements
Risk Factors : Mention known risks or concerns
Plan Usage
Review Plans : Always review generated plans before execution
Iterative Refinement : Generate detailed plans for complex steps
Team Input : Share plans with team for feedback
Adapt as Needed : Plans are guidelines; adapt based on reality
Streaming
Long Tasks : Use streaming for tasks that take time to plan
User Feedback : Show progress to users during planning
Handle Interruptions : Handle stream interruptions gracefully
Buffer Output : Buffer output for better user experience
Integration with Execution
Task planning integrates with execution services:
# 1. Generate plan
plan = client.task_planning.plan(
task_description = "Deploy application update"
)
# 2. Use plan with agent execution
for step in plan[ 'steps' ]:
# Execute each step with an agent
result = client.agents.execute(
agent_id = "deployment-agent-uuid" ,
execution_data = {
"prompt" : step[ 'description' ],
"context" : step.get( 'context' , {})
}
)
print ( f "Step ' { step[ 'name' ] } ': { result[ 'status' ] } " )
# 3. Or use with team execution
team_result = client.teams.execute(
team_id = "deployment-team-uuid" ,
execution_data = {
"prompt" : f "Execute this plan: { plan } " ,
"system_prompt" : "Follow the plan step by step"
}
)
API Reference
Method Endpoint Pattern Description plan()POST /task-planning/plan Generate task plan plan_stream()POST /task-planning/plan/stream Generate plan with streaming
Next Steps
Agents Service Execute planned tasks with agents
Teams Service Execute complex plans with teams
Jobs Service Schedule planned tasks
Context Graph Use graph context in planning