Integrate your existing LangGraph agents with Kubiya’s execution platform
with_files
for dynamic executionfrom kubiya_workflow_sdk import workflow, step
@workflow
def langgraph_research_agent():
"""Research agent using pre-built LangGraph image"""
research = step("langgraph-research").docker(
image="langchain/langgraph:latest",
command="python -c \"$LANGGRAPH_CODE\"",
env={
"LANGGRAPH_CODE": """
import os
from langchain.schema import SystemMessage
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from tavily import TavilyClient
# Initialize components
llm = ChatOpenAI(model="gpt-4o")
tavily = TavilyClient(api_key=os.environ['TAVILY_API_KEY'])
# Create LangGraph agent
agent = create_react_agent(
llm,
[tavily.search],
state_modifier=SystemMessage(content="You are a research assistant")
)
# Execute research
result = agent.invoke({
"messages": [{"role": "user", "content": "${INPUT_QUERY}"}]
})
print(result['messages'][-1].content)
""",
"OPENAI_API_KEY": "${OPENAI_API_KEY}",
"TAVILY_API_KEY": "${TAVILY_API_KEY}",
"INPUT_QUERY": "Research the latest trends in AI automation"
}
)
return research
from kubiya_workflow_sdk import workflow, step
@workflow
def langgraph_multi_agent():
"""Multi-agent system with injected LangGraph code"""
multi_agent = step("langgraph-team").tool_def(
name="langgraph-multi-agent",
type="docker",
image="python:3.11-slim",
description="LangGraph multi-agent collaboration",
# Install dependencies and run
content="""#!/bin/bash
set -e
pip install -r /app/requirements.txt
python /app/multi_agent.py
""",
# Inject LangGraph code and dependencies
with_files=[
{
"destination": "/app/requirements.txt",
"content": """
langgraph>=0.0.20
langchain>=0.1.0
langchain-openai>=0.0.5
tavily-python>=0.3.0
"""
},
{
"destination": "/app/multi_agent.py",
"content": """
import os
import json
from typing import TypedDict, List
from langchain.schema import BaseMessage, HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
class TeamState(TypedDict):
messages: List[BaseMessage]
task: str
research_data: str
analysis: str
report: str
def researcher_node(state: TeamState):
llm = ChatOpenAI(model="gpt-4o")
prompt = f'''
You are a research specialist. Research this task: {state["task"]}
Provide comprehensive research data.
'''
response = llm.invoke([SystemMessage(content=prompt)])
return {
"research_data": response.content,
"messages": state["messages"] + [response]
}
def analyst_node(state: TeamState):
llm = ChatOpenAI(model="gpt-4o")
prompt = f'''
You are a data analyst. Analyze this research: {state["research_data"]}
Provide insights and recommendations.
'''
response = llm.invoke([SystemMessage(content=prompt)])
return {
"analysis": response.content,
"messages": state["messages"] + [response]
}
def writer_node(state: TeamState):
llm = ChatOpenAI(model="gpt-4o")
prompt = f'''
You are a technical writer. Create a report based on:
Research: {state["research_data"]}
Analysis: {state["analysis"]}
Write a comprehensive report.
'''
response = llm.invoke([SystemMessage(content=prompt)])
return {
"report": response.content,
"messages": state["messages"] + [response]
}
# Create the LangGraph workflow
def create_team_workflow():
workflow = StateGraph(TeamState)
# Add nodes
workflow.add_node("researcher", researcher_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("writer", writer_node)
# Define flow
workflow.add_edge(START, "researcher")
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "writer")
workflow.add_edge("writer", END)
# Compile with memory
memory = MemorySaver()
return workflow.compile(checkpointer=memory)
# Main execution
if __name__ == "__main__":
task = os.environ.get("TASK", "Analyze current AI market trends")
graph = create_team_workflow()
result = graph.invoke({
"messages": [HumanMessage(content=task)],
"task": task,
"research_data": "",
"analysis": "",
"report": ""
})
print(json.dumps({
"final_report": result["report"],
"research_summary": result["research_data"][:200] + "...",
"analysis_summary": result["analysis"][:200] + "..."
}, indent=2))
"""
}
],
args=[
{
"name": "task",
"type": "string",
"required": True,
"description": "Research task for the team"
}
]
)
return multi_agent
with_services
to run supporting infrastructure:
from kubiya_workflow_sdk import workflow, step
@workflow
def langgraph_with_vectordb():
"""LangGraph agent with vector database service"""
rag_agent = step("langgraph-rag").tool_def(
name="langgraph-rag-agent",
type="docker",
image="python:3.11-slim",
description="LangGraph RAG agent with Chroma",
content="""#!/bin/bash
set -e
pip install langgraph langchain langchain-openai chromadb
sleep 5 # Wait for Chroma service
python /app/rag_agent.py
""",
with_files=[
{
"destination": "/app/rag_agent.py",
"content": """
import chromadb
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain.tools import Tool
import os
# Connect to Chroma service
chroma_client = chromadb.HttpClient(host="vectordb", port=8000)
# Create retrieval tool
def retrieval_tool(query: str) -> str:
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
client=chroma_client,
embedding_function=embeddings
)
docs = vectorstore.similarity_search(query, k=3)
return "\\n".join([doc.page_content for doc in docs])
# Create LangGraph agent with RAG
llm = ChatOpenAI(model="gpt-4o")
tools = [
Tool(
name="retrieve_documents",
description="Retrieve relevant documents",
func=retrieval_tool
)
]
agent = create_react_agent(llm, tools)
# Execute query
result = agent.invoke({
"messages": [{"role": "user", "content": "${QUERY}"}]
})
print(result["messages"][-1].content)
"""
}
],
# Chroma vector database service
with_services=[
{
"name": "vectordb",
"image": "chromadb/chroma:latest",
"exposed_ports": [8000],
"env": {
"CHROMA_SERVER_HOST": "0.0.0.0",
"CHROMA_SERVER_HTTP_PORT": "8000"
}
}
],
args=[
{
"name": "query",
"type": "string",
"required": True,
"description": "Query for the RAG agent"
}
]
)
return rag_agent
from kubiya_workflow_sdk import workflow, step
@workflow
def streaming_langgraph():
"""Stream LangGraph execution updates"""
streaming_agent = step("stream-langgraph").tool_def(
name="langgraph-streaming",
type="docker",
image="python:3.11-slim",
description="Streaming LangGraph execution",
content="""#!/bin/bash
pip install langgraph langchain-openai
python /app/streaming_agent.py | tee /proc/1/fd/1
""",
with_files=[
{
"destination": "/app/streaming_agent.py",
"content": """
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
import json
import sys
def streaming_node(state):
llm = ChatOpenAI(model="gpt-4o", streaming=True)
print(json.dumps({"status": "starting", "step": "analysis"}))
sys.stdout.flush()
chunks = []
for chunk in llm.stream([{"role": "user", "content": state["query"]}]):
chunks.append(chunk.content)
print(json.dumps({
"status": "streaming",
"chunk": chunk.content,
"partial_result": "".join(chunks)
}))
sys.stdout.flush()
result = "".join(chunks)
print(json.dumps({"status": "complete", "result": result}))
return {"result": result}
# Execute streaming workflow
workflow = StateGraph(dict)
workflow.add_node("stream", streaming_node)
workflow.add_edge(START, "stream")
workflow.add_edge("stream", END)
graph = workflow.compile()
graph.invoke({"query": "${STREAMING_QUERY}"})
"""
}
],
args=[
{
"name": "streaming_query",
"type": "string",
"required": True,
"description": "Query to process with streaming"
}
]
)
return streaming_agent
from kubiya_workflow_sdk import workflow, step
@workflow
def langgraph_human_approval():
"""LangGraph workflow with human approval via API polling"""
# Phase 1: Initial research
research = step("research").tool_def(
name="langgraph-research",
type="docker",
image="python:3.11-slim",
description="Research phase with LangGraph",
content="""#!/bin/bash
pip install langgraph langchain langchain-openai requests
python /app/research_agent.py
""",
with_files=[
{
"destination": "/app/research_agent.py",
"content": """
import requests
import json
import os
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage
def research_step(state):
llm = ChatOpenAI(model="gpt-4o")
result = llm.invoke([
HumanMessage(content=f"Research: {state['query']}")
])
# Submit research for approval via API
approval_data = {
"workflow_id": "${WORKFLOW_ID}",
"step": "research",
"content": result.content,
"requires_approval": True
}
approval_api = os.environ.get("APPROVAL_API_URL", "https://api.example.com/approvals")
response = requests.post(approval_api, json=approval_data)
approval_id = response.json().get("approval_id")
return {
"research": result.content,
"approval_id": approval_id,
"status": "pending_approval"
}
# Execute research and submit for approval
workflow = StateGraph(dict)
workflow.add_node("research", research_step)
workflow.add_edge(START, "research")
workflow.add_edge("research", END)
graph = workflow.compile()
result = graph.invoke({"query": "${RESEARCH_QUERY}"})
print(json.dumps(result, indent=2))
"""
}
],
args=[
{
"name": "research_query",
"type": "string",
"required": True,
"description": "What to research"
},
{
"name": "workflow_id",
"type": "string",
"required": True,
"description": "Unique workflow identifier"
}
]
)
# Poll for approval status
approval_check = step("check-approval").tool_def(
name="approval-checker",
type="docker",
image="python:3.11-slim",
description="Poll API for approval status",
content="""#!/bin/bash
pip install requests
python /app/approval_checker.py
""",
with_files=[
{
"destination": "/app/approval_checker.py",
"content": """
import requests
import time
import json
import os
def check_approval_status(approval_id, max_wait=300):
approval_api = os.environ.get("APPROVAL_API_URL", "https://api.example.com/approvals")
start_time = time.time()
while time.time() - start_time < max_wait:
try:
response = requests.get(f"{approval_api}/{approval_id}")
if response.status_code == 200:
approval_data = response.json()
status = approval_data.get("status")
if status == "approved":
print(json.dumps({
"status": "approved",
"message": "Research approved by human reviewer",
"approved_at": approval_data.get("approved_at")
}))
return True
elif status == "rejected":
print(json.dumps({
"status": "rejected",
"message": "Research rejected by human reviewer",
"reason": approval_data.get("rejection_reason")
}))
exit(1)
else:
print(f"Status: {status}, waiting...")
except Exception as e:
print(f"Error checking approval: {e}")
time.sleep(10) # Poll every 10 seconds
print(json.dumps({
"status": "timeout",
"message": "Approval timeout - no response within time limit"
}))
exit(1)
# Extract approval_id from previous step
import sys
research_output = json.loads('${research}')
approval_id = research_output.get("approval_id")
if not approval_id:
print("No approval ID found in research output")
exit(1)
check_approval_status(approval_id)
"""
}
],
args=[
{
"name": "approval_timeout",
"type": "string",
"required": False,
"default": "300",
"description": "Timeout in seconds for approval"
}
]
).depends("research")
# Continue processing after approval
final_report = step("generate-report").tool_def(
name="report-generator",
type="docker",
image="python:3.11-slim",
description="Generate final report after approval",
content="""#!/bin/bash
pip install langchain langchain-openai
python /app/report_generator.py
""",
with_files=[
{
"destination": "/app/report_generator.py",
"content": """
import json
from langchain_openai import ChatOpenAI
# Extract approved research
research_data = json.loads('${research}')
approved_research = research_data.get("research", "")
# Generate final report
llm = ChatOpenAI(model="gpt-4o")
result = llm.invoke([{
"role": "user",
"content": f"Create a comprehensive final report based on this approved research: {approved_research}"
}])
final_output = {
"final_report": result.content,
"based_on_research": approved_research[:200] + "...",
"approval_status": "approved"
}
print(json.dumps(final_output, indent=2))
"""
}
]
).depends("check-approval")
return final_report
🏗️ State Management
# Good: Externalized state
with_services=[{
"name": "postgres",
"image": "postgres:15",
"env": {"POSTGRES_DB": "langgraph_state"}
}]
🔄 Resource Management
# Efficient resource allocation
resources={
"requests": {"cpu": "500m", "memory": "1Gi"},
"limits": {"cpu": "2000m", "memory": "4Gi"}
}
🌊 Streaming Integration
print(json.dumps({"step": "research", "progress": 0.3}))
sys.stdout.flush()
Analyze Current Setup
Choose Integration Pattern
Containerize Dependencies
Add Kubiya Orchestration
Test and Iterate
Was this page helpful?