LangGraph Python 适配器
让 LangGraph 工作流支持 AG-UI 协议。
什么是 AG-UI?
AG-UI 是一个开放、轻量级、基于事件的协议,用于标准化 AI Agent 与用户界面的交互。它让 Agent 可以:
- 实时流式对话
- 双向状态同步
- 前端工具集成(Client Tools)
- 人机协作(Human-in-the-loop)
这个包解决什么问题?
- 让 LangGraph 工作流支持 AG-UI 协议:将编译后的 LangGraph StateGraph 适配为 AG-UI 兼容的 Agent
- 客户端状态管理:提供工具来接收前端传递的工具和消息
核心概念
| 导出 | 说明 |
|---|---|
LangGraphAgent | 将编译后的 LangGraph 工作流包装为 AG-UI 兼容的 Agent |
配合使用
| 包名 | 作用 |
|---|---|
cloudbase-agent-server | 将 Agent 部署为 AG-UI 兼容的 HTTP 服务 |
langgraph | LangGraph 工作流框架 |
langchain-openai | OpenAI 兼容模型接入 |
架构图
安装
pip install cloudbase-agent-langgraph cloudbase-agent-server langgraph langchain-openai
快速开始
1. 创建 LangGraph 工作流
# agent.py
import os
from typing import Any, List
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, AIMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from cloudbase_agent.langgraph import LangGraphAgent
# 定义状态
class State(MessagesState):
"""聊天 Agent 状态,包含消息和工具"""
tools: List[Any]
# 定义聊天节点
def chat_node(state: State, config: RunnableConfig = None) -> dict:
"""使用 OpenAI 兼容模型生成 AI 响应"""
model = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
)
# 绑定客户端工具
tools = state.get("tools", [])
if tools:
model_with_tools = model.bind_tools(tools)
else:
model_with_tools = model
system_message = SystemMessage(content="你是一位乐于助人的 AI 助手。")
messages = [system_message, *state["messages"]]
try:
response = model_with_tools.invoke(messages, config)
return {"messages": [response]}
except Exception as e:
return {"messages": [AIMessage(content=f"错误: {str(e)}")]}
# 构建工作流
def build_workflow():
graph = StateGraph(State)
# 添加节点
graph.add_node("chat_node", chat_node)
# 定义边
graph.add_edge(START, "chat_node")
graph.add_edge("chat_node", END)
# 编译并添加内存
return graph.compile(checkpointer=MemorySaver())
# 导出 create_agent 函数
def create_agent():
workflow = build_workflow()
agent = LangGraphAgent(
name="ChatBot",
description="一个乐于助人的对话助手",
graph=workflow,
)
return {"agent": agent}
2. 部署为 HTTP 服务
# server.py
from cloudbase_agent.server import AgentServiceApp
from agent import create_agent
# 一行代码部署
AgentServiceApp().run(create_agent, port=9000)
3. 配置环境变量
创建 .env 文件:
OPENAI_API_KEY=your-api-key
OPENAI_BASE_URL=https://api.deepseek.com/v1
OPENAI_MODEL=deepseek-chat
4. 启动服务
python server.py
完整的项目配置(依赖等)请参考示例项目。
API 参考
LangGraphAgent
将编译后的 LangGraph 工作流包装为 AG-UI 兼容的 Agent。
from cloudbase_agent.langgraph import LangGraphAgent
agent = LangGraphAgent(
name="ChatBot",
description="一个乐于助人的助手",
graph=compiled_graph, # StateGraph.compile() 的返回值
)
构造参数:
| 参数 | 类型 | 说明 |
|---|---|---|
graph | CompiledStateGraph | 编译后的 LangGraph 工作流 |
name | str | 人类可读的 Agent 名称(默认:"") |
description | str | 详细的 Agent 描述(默认:"") |
use_callbacks | bool | 启用回调处理(默认:False) |
fix_event_ids | bool | 启用自动事件 ID 修复(默认:True) |
使用回调的示例:
from cloudbase_agent.langgraph import LangGraphAgent
# 创建支持回调的 Agent
agent = LangGraphAgent(
graph=compiled_graph,
name="ChatBot",
description="一个乐于助人的助手",
use_callbacks=True,
)
# 添加日志回调
class ConsoleLogger:
async def on_text_message_content(self, event, buffer):
print(f"[AI] {buffer}")
agent.add_callback(ConsoleLogger())
状态定义
为了兼容 AG-UI,你的状态应该包含一个 tools 字段来接收客户端工具:
from langgraph.graph import MessagesState
from typing import Any, List
class State(MessagesState):
"""支持客户端工具的状态"""
tools: List[Any]
# 在工作流中使用
graph = StateGraph(State)
状态字段:
| 字段 | 类型 | 说明 |
|---|---|---|
messages | List[BaseMessage] | 消息历史(来自 MessagesState) |
tools | List[Any] | 从前端传递的客户端工具 |
高级用法
带资源清理
def create_agent():
# 初始化资源
db = connect_database()
workflow = build_workflow()
agent = LangGraphAgent(
name="ChatBot",
description="带数据库访问的 Agent",
graph=workflow,
)
# 定义清理函数
def cleanup():
db.close()
print("资源已清理")
return {"agent": agent, "cleanup": cleanup}
多个 Agent
from fastapi import FastAPI
from cloudbase_agent.server import create_send_message_adapter, RunAgentInput
app = FastAPI()
@app.post("/chat/send-message")
async def chat_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_chat_agent, request)
@app.post("/assistant/send-message")
async def assistant_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_assistant_agent, request)