跳到主要内容

LangGraph Python 适配器

让 LangGraph 工作流支持 AG-UI 协议

什么是 AG-UI?

AG-UI 是一个开放、轻量级、基于事件的协议,用于标准化 AI Agent 与用户界面的交互。它让 Agent 可以:

  • 实时流式对话
  • 双向状态同步
  • 前端工具集成(Client Tools)
  • 人机协作(Human-in-the-loop)

这个包解决什么问题?

  • 让 LangGraph 工作流支持 AG-UI 协议:将编译后的 LangGraph StateGraph 适配为 AG-UI 兼容的 Agent
  • 客户端状态管理:提供工具来接收前端传递的工具和消息

核心概念

导出说明
LangGraphAgent将编译后的 LangGraph 工作流包装为 AG-UI 兼容的 Agent

配合使用

包名作用
cloudbase-agent-server将 Agent 部署为 AG-UI 兼容的 HTTP 服务
langgraphLangGraph 工作流框架
langchain-openaiOpenAI 兼容模型接入

架构图

安装

pip install cloudbase-agent-langgraph cloudbase-agent-server langgraph langchain-openai

快速开始

1. 创建 LangGraph 工作流

# agent.py
import os
from typing import Any, List

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, AIMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver

from cloudbase_agent.langgraph import LangGraphAgent


# 定义状态
class State(MessagesState):
"""聊天 Agent 状态,包含消息和工具"""
tools: List[Any]


# 定义聊天节点
def chat_node(state: State, config: RunnableConfig = None) -> dict:
"""使用 OpenAI 兼容模型生成 AI 响应"""
model = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
)

# 绑定客户端工具
tools = state.get("tools", [])
if tools:
model_with_tools = model.bind_tools(tools)
else:
model_with_tools = model

system_message = SystemMessage(content="你是一位乐于助人的 AI 助手。")
messages = [system_message, *state["messages"]]

try:
response = model_with_tools.invoke(messages, config)
return {"messages": [response]}
except Exception as e:
return {"messages": [AIMessage(content=f"错误: {str(e)}")]}


# 构建工作流
def build_workflow():
graph = StateGraph(State)

# 添加节点
graph.add_node("chat_node", chat_node)

# 定义边
graph.add_edge(START, "chat_node")
graph.add_edge("chat_node", END)

# 编译并添加内存
return graph.compile(checkpointer=MemorySaver())


# 导出 create_agent 函数
def create_agent():
workflow = build_workflow()
agent = LangGraphAgent(
name="ChatBot",
description="一个乐于助人的对话助手",
graph=workflow,
)
return {"agent": agent}

2. 部署为 HTTP 服务

# server.py
from cloudbase_agent.server import AgentServiceApp
from agent import create_agent

# 一行代码部署
AgentServiceApp().run(create_agent, port=9000)

3. 配置环境变量

创建 .env 文件:

OPENAI_API_KEY=your-api-key
OPENAI_BASE_URL=https://api.deepseek.com/v1
OPENAI_MODEL=deepseek-chat

4. 启动服务

python server.py

完整的项目配置(依赖等)请参考示例项目

API 参考

LangGraphAgent

将编译后的 LangGraph 工作流包装为 AG-UI 兼容的 Agent。

from cloudbase_agent.langgraph import LangGraphAgent

agent = LangGraphAgent(
name="ChatBot",
description="一个乐于助人的助手",
graph=compiled_graph, # StateGraph.compile() 的返回值
)

构造参数:

参数类型说明
graphCompiledStateGraph编译后的 LangGraph 工作流
namestr人类可读的 Agent 名称(默认:"")
descriptionstr详细的 Agent 描述(默认:"")
use_callbacksbool启用回调处理(默认:False)
fix_event_idsbool启用自动事件 ID 修复(默认:True)

使用回调的示例:

from cloudbase_agent.langgraph import LangGraphAgent

# 创建支持回调的 Agent
agent = LangGraphAgent(
graph=compiled_graph,
name="ChatBot",
description="一个乐于助人的助手",
use_callbacks=True,
)

# 添加日志回调
class ConsoleLogger:
async def on_text_message_content(self, event, buffer):
print(f"[AI] {buffer}")

agent.add_callback(ConsoleLogger())

状态定义

为了兼容 AG-UI,你的状态应该包含一个 tools 字段来接收客户端工具:

from langgraph.graph import MessagesState
from typing import Any, List

class State(MessagesState):
"""支持客户端工具的状态"""
tools: List[Any]

# 在工作流中使用
graph = StateGraph(State)

状态字段:

字段类型说明
messagesList[BaseMessage]消息历史(来自 MessagesState)
toolsList[Any]从前端传递的客户端工具

高级用法

带资源清理

def create_agent():
# 初始化资源
db = connect_database()

workflow = build_workflow()
agent = LangGraphAgent(
name="ChatBot",
description="带数据库访问的 Agent",
graph=workflow,
)

# 定义清理函数
def cleanup():
db.close()
print("资源已清理")

return {"agent": agent, "cleanup": cleanup}

多个 Agent

from fastapi import FastAPI
from cloudbase_agent.server import create_send_message_adapter, RunAgentInput

app = FastAPI()

@app.post("/chat/send-message")
async def chat_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_chat_agent, request)

@app.post("/assistant/send-message")
async def assistant_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_assistant_agent, request)

相关资源