CrewAI Python 适配器
让 CrewAI Flow 支持 AG-UI 协议。
什么是 AG-UI?
AG-UI 是一个开放、轻量级、基于事件的协议,用于标准化 AI Agent 与用户界面的交互。它让 Agent 可以:
- 实时流式对话
- 双向状态同步
- 前端工具集成(Client Tools)
- 人机协作(Human-in-the-loop)
这个包解决什么问题?
- 让 CrewAI Flow 支持 AG-UI 协议:将 CrewAI Flow 实例适配为 AG-UI 兼容的 Agent
- 事件桥接:将 CrewAI flow 事件转换为 AG-UI 事件
- 状态管理:提供状态转换器和 CopilotKit 集成辅助工具
核心概念
| 导出 | 说明 |
|---|---|
CrewAIAgent | 将 CrewAI Flow 实例包装为 AG-UI 兼容的 Agent |
CopilotKitState | 支持 AG-UI 的 CrewAI flow 状态模型 |
copilotkit_stream | LiteLLM 响应流式处理辅助函数 |
crewai_prepare_inputs | 将 AG-UI 输入转换为 CrewAI 格式 |
配合使用
| 包名 | 作用 |
|---|---|
cloudbase-agent-server | 将 Agent 部署为 AG-UI 兼容的 HTTP 服务 |
crewai | CrewAI flow 框架 |
litellm | 多提供商 LLM 集成 |
架构图
安装
pip install cloudbase-agent-crewai cloudbase-agent-server crewai litellm
快速开始
1. 创建 CrewAI Flow
# agent.py
import os
from dotenv import load_dotenv
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from litellm import completion
load_dotenv()
from cloudbase_agent.crewai import CopilotKitState, copilotkit_stream, CrewAIAgent
@persist()
class ChatFlow(Flow[CopilotKitState]):
"""使用 CrewAI 的对话聊天流程"""
@start()
async def chat(self) -> None:
"""处理聊天消息并生成流式响应"""
system_prompt = "你是一位乐于助人的助手。"
try:
# 准备完成参数
completion_params = {
"model": f"openai/{os.getenv('OPENAI_MODEL', 'gpt-4o-mini')}",
"messages": [
{"role": "system", "content": system_prompt},
*self.state.messages
],
"stream": True,
}
# 如果有可用工具则添加
if self.state.copilotkit.actions:
completion_params["tools"] = [*self.state.copilotkit.actions]
completion_params["parallel_tool_calls"] = False
# 使用 copilotkit_stream 包装器流式响应
response = await copilotkit_stream(completion(**completion_params))
message = response.choices[0].message
# 添加到对话历史
self.state.messages.append(message)
except Exception as e:
print(f"[CrewAI Flow Chat] {e}")
def build_workflow():
"""构建并返回新的聊天工作流实例"""
return ChatFlow()
# 导出 create_agent 函数
def create_agent():
flow = build_workflow()
agent = CrewAIAgent(
name="ChatBot",
description="一个乐于助人的对话助手",
flow=flow,
)
return {"agent": agent}
2. 部署为 HTTP 服务
# server.py
from cloudbase_agent.server import AgentServiceApp
from agent import create_agent
# 一行代码部署
AgentServiceApp().run(create_agent, port=9000)
3. 配置环境变量
创建 .env 文件:
OPENAI_API_KEY=your-api-key
OPENAI_BASE_URL=https://api.deepseek.com/v1
OPENAI_MODEL=deepseek-chat
4. 启动服务
python server.py
完整的项目配置(依赖等)请参考示例项目。
API 参考
CrewAIAgent
将 CrewAI Flow 实例包装为 AG-UI 兼容的 Agent。
from cloudbase_agent.crewai import CrewAIAgent
agent = CrewAIAgent(
name="ChatBot",
description="一个乐于助人的助手",
flow=flow_instance, # CrewAI Flow 实例
)
构造参数:
| 参数 | 类型 | 说明 |
|---|---|---|
flow | Flow | CrewAI Flow 实例 |
name | str | 人类可读的 Agent 名称(默认:"") |
description | str | 详细的 Agent 描述(默认:"") |
fix_event_ids | bool | 启用自动事件 ID 修复(默认:True) |
CopilotKitState
支持 AG-UI 的 CrewAI flow 状态模型。
from cloudbase_agent.crewai import CopilotKitState
from crewai.flow.flow import Flow
@persist()
class MyFlow(Flow[CopilotKitState]):
"""支持 AG-UI 状态的 Flow"""
@start()
async def process(self):
# 访问对话消息
messages = self.state.messages
# 访问客户端工具
tools = self.state.copilotkit.actions
状态字段:
| 字段 | 类型 | 说明 |
|---|---|---|
messages | List[dict] | 对话消息历史 |
copilotkit.actions | List[dict] | 从前端传递的客户端工具 |
copilotkit_stream
LiteLLM 响应流式处理辅助函数。
from cloudbase_agent.crewai import copilotkit_stream
from litellm import completion
# 使用正确格式流式响应
response = await copilotkit_stream(
completion(
model="openai/gpt-4o-mini",
messages=[...],
stream=True,
)
)
参数:
| 参数 | 类型 | 说明 |
|---|---|---|
stream | AsyncIterator | LiteLLM 流式响应 |
返回值: 合并分块后的完整响应对象
crewai_prepare_inputs
将 AG-UI 输入转换为 CrewAI 格式。
from cloudbase_agent.crewai import crewai_prepare_inputs
from ag_ui.core import RunAgentInput
def create_agent():
async def run(run_input: RunAgentInput):
# 转换输入
inputs = crewai_prepare_inputs(run_input)
# 在 flow 中使用
flow.state.messages = inputs["messages"]
flow.state.copilotkit.actions = inputs["copilotkit"]["actions"]
高级用法
带资源清理
def create_agent():
# 初始化资源
db = connect_database()
flow = build_workflow()
agent = CrewAIAgent(
name="ChatBot",
description="带数据库访问的 Agent",
flow=flow,
)
# 定义清理函数
def cleanup():
db.close()
print("资源已清理")
return {"agent": agent, "cleanup": cleanup}
多个 Agent
from fastapi import FastAPI
from cloudbase_agent.server import create_send_message_adapter, RunAgentInput
app = FastAPI()
@app.post("/chat/send-message")
async def chat_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_chat_agent, request)
@app.post("/assistant/send-message")
async def assistant_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_assistant_agent, request)
自定义事件处理
from cloudbase_agent.crewai import CrewFlowEventListener, BridgedTextMessageChunkEvent
class CustomEventListener(CrewFlowEventListener):
"""flow 事件的自定义事件监听器"""
async def on_text_chunk(self, event: BridgedTextMessageChunkEvent):
"""处理文本消息分块"""
print(f"收到分块: {event.content}")
# 在 flow 中使用
listener = CustomEventListener()
# 事件自动桥接到 AG-UI 格式
状态转换器
本包提供了几个转换器工具:
| 函数 | 说明 |
|---|---|
copilotkit_emit_state | 向客户端发送状态更新 |
copilotkit_predict_state | 预测并更新状态 |
copilotkit_exit | 发送 flow 完成信号 |
示例:
from cloudbase_agent.crewai import copilotkit_emit_state
@persist()
class MyFlow(Flow[CopilotKitState]):
@start()
async def process(self):
# 向客户端发送中间状态更新
await copilotkit_emit_state({"progress": 50})