跳到主要内容

CrewAI Python 适配器

让 CrewAI Flow 支持 AG-UI 协议

什么是 AG-UI?

AG-UI 是一个开放、轻量级、基于事件的协议,用于标准化 AI Agent 与用户界面的交互。它让 Agent 可以:

  • 实时流式对话
  • 双向状态同步
  • 前端工具集成(Client Tools)
  • 人机协作(Human-in-the-loop)

这个包解决什么问题?

  • 让 CrewAI Flow 支持 AG-UI 协议:将 CrewAI Flow 实例适配为 AG-UI 兼容的 Agent
  • 事件桥接:将 CrewAI flow 事件转换为 AG-UI 事件
  • 状态管理:提供状态转换器和 CopilotKit 集成辅助工具

核心概念

导出说明
CrewAIAgent将 CrewAI Flow 实例包装为 AG-UI 兼容的 Agent
CopilotKitState支持 AG-UI 的 CrewAI flow 状态模型
copilotkit_streamLiteLLM 响应流式处理辅助函数
crewai_prepare_inputs将 AG-UI 输入转换为 CrewAI 格式

配合使用

包名作用
cloudbase-agent-server将 Agent 部署为 AG-UI 兼容的 HTTP 服务
crewaiCrewAI flow 框架
litellm多提供商 LLM 集成

架构图

安装

pip install cloudbase-agent-crewai cloudbase-agent-server crewai litellm

快速开始

1. 创建 CrewAI Flow

# agent.py
import os
from dotenv import load_dotenv
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from litellm import completion

load_dotenv()

from cloudbase_agent.crewai import CopilotKitState, copilotkit_stream, CrewAIAgent


@persist()
class ChatFlow(Flow[CopilotKitState]):
"""使用 CrewAI 的对话聊天流程"""

@start()
async def chat(self) -> None:
"""处理聊天消息并生成流式响应"""
system_prompt = "你是一位乐于助人的助手。"

try:
# 准备完成参数
completion_params = {
"model": f"openai/{os.getenv('OPENAI_MODEL', 'gpt-4o-mini')}",
"messages": [
{"role": "system", "content": system_prompt},
*self.state.messages
],
"stream": True,
}

# 如果有可用工具则添加
if self.state.copilotkit.actions:
completion_params["tools"] = [*self.state.copilotkit.actions]
completion_params["parallel_tool_calls"] = False

# 使用 copilotkit_stream 包装器流式响应
response = await copilotkit_stream(completion(**completion_params))

message = response.choices[0].message

# 添加到对话历史
self.state.messages.append(message)
except Exception as e:
print(f"[CrewAI Flow Chat] {e}")


def build_workflow():
"""构建并返回新的聊天工作流实例"""
return ChatFlow()


# 导出 create_agent 函数
def create_agent():
flow = build_workflow()
agent = CrewAIAgent(
name="ChatBot",
description="一个乐于助人的对话助手",
flow=flow,
)
return {"agent": agent}

2. 部署为 HTTP 服务

# server.py
from cloudbase_agent.server import AgentServiceApp
from agent import create_agent

# 一行代码部署
AgentServiceApp().run(create_agent, port=9000)

3. 配置环境变量

创建 .env 文件:

OPENAI_API_KEY=your-api-key
OPENAI_BASE_URL=https://api.deepseek.com/v1
OPENAI_MODEL=deepseek-chat

4. 启动服务

python server.py

完整的项目配置(依赖等)请参考示例项目

API 参考

CrewAIAgent

将 CrewAI Flow 实例包装为 AG-UI 兼容的 Agent。

from cloudbase_agent.crewai import CrewAIAgent

agent = CrewAIAgent(
name="ChatBot",
description="一个乐于助人的助手",
flow=flow_instance, # CrewAI Flow 实例
)

构造参数:

参数类型说明
flowFlowCrewAI Flow 实例
namestr人类可读的 Agent 名称(默认:"")
descriptionstr详细的 Agent 描述(默认:"")
fix_event_idsbool启用自动事件 ID 修复(默认:True)

CopilotKitState

支持 AG-UI 的 CrewAI flow 状态模型。

from cloudbase_agent.crewai import CopilotKitState
from crewai.flow.flow import Flow

@persist()
class MyFlow(Flow[CopilotKitState]):
"""支持 AG-UI 状态的 Flow"""

@start()
async def process(self):
# 访问对话消息
messages = self.state.messages

# 访问客户端工具
tools = self.state.copilotkit.actions

状态字段:

字段类型说明
messagesList[dict]对话消息历史
copilotkit.actionsList[dict]从前端传递的客户端工具

copilotkit_stream

LiteLLM 响应流式处理辅助函数。

from cloudbase_agent.crewai import copilotkit_stream
from litellm import completion

# 使用正确格式流式响应
response = await copilotkit_stream(
completion(
model="openai/gpt-4o-mini",
messages=[...],
stream=True,
)
)

参数:

参数类型说明
streamAsyncIteratorLiteLLM 流式响应

返回值: 合并分块后的完整响应对象

crewai_prepare_inputs

将 AG-UI 输入转换为 CrewAI 格式。

from cloudbase_agent.crewai import crewai_prepare_inputs
from ag_ui.core import RunAgentInput

def create_agent():
async def run(run_input: RunAgentInput):
# 转换输入
inputs = crewai_prepare_inputs(run_input)

# 在 flow 中使用
flow.state.messages = inputs["messages"]
flow.state.copilotkit.actions = inputs["copilotkit"]["actions"]

高级用法

带资源清理

def create_agent():
# 初始化资源
db = connect_database()

flow = build_workflow()
agent = CrewAIAgent(
name="ChatBot",
description="带数据库访问的 Agent",
flow=flow,
)

# 定义清理函数
def cleanup():
db.close()
print("资源已清理")

return {"agent": agent, "cleanup": cleanup}

多个 Agent

from fastapi import FastAPI
from cloudbase_agent.server import create_send_message_adapter, RunAgentInput

app = FastAPI()

@app.post("/chat/send-message")
async def chat_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_chat_agent, request)

@app.post("/assistant/send-message")
async def assistant_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_assistant_agent, request)

自定义事件处理

from cloudbase_agent.crewai import CrewFlowEventListener, BridgedTextMessageChunkEvent

class CustomEventListener(CrewFlowEventListener):
"""flow 事件的自定义事件监听器"""

async def on_text_chunk(self, event: BridgedTextMessageChunkEvent):
"""处理文本消息分块"""
print(f"收到分块: {event.content}")

# 在 flow 中使用
listener = CustomEventListener()
# 事件自动桥接到 AG-UI 格式

状态转换器

本包提供了几个转换器工具:

函数说明
copilotkit_emit_state向客户端发送状态更新
copilotkit_predict_state预测并更新状态
copilotkit_exit发送 flow 完成信号

示例:

from cloudbase_agent.crewai import copilotkit_emit_state

@persist()
class MyFlow(Flow[CopilotKitState]):
@start()
async def process(self):
# 向客户端发送中间状态更新
await copilotkit_emit_state({"progress": 50})

相关资源