CrewAI Python Adapter
Make CrewAI Flows compatible with the AG-UI Protocol.
What is AG-UI?
AG-UI is an open, lightweight, event-based protocol for standardizing AI Agent interactions with user interfaces. It enables agents to:
- Real-time streaming conversations
- Bidirectional state synchronization
- Frontend tool integration (Client Tools)
- Human-in-the-loop workflows
What does this package solve?
- Make CrewAI Flows support AG-UI protocol: Adapt CrewAI Flow instances to AG-UI compatible agents
- Event bridging: Convert CrewAI flow events to AG-UI events
- State management: Provides state converters and helpers for CopilotKit integration
Core Concepts
| Export | Description |
|---|---|
CrewAIAgent | Wraps CrewAI Flow instances as AG-UI compatible agents |
CopilotKitState | State model for CrewAI flows with AG-UI support |
copilotkit_stream | Streaming helper for LiteLLM responses |
crewai_prepare_inputs | Convert AG-UI inputs to CrewAI format |
Works with
| Package | Purpose |
|---|---|
cloudbase-agent-server | Deploy agents as AG-UI compatible HTTP services |
crewai | CrewAI flow framework |
litellm | Multi-provider LLM integration |
Architecture Diagram
Installation
pip install cloudbase-agent-crewai cloudbase-agent-server crewai litellm
Quick Start
1. Create CrewAI Flow
# agent.py
import os
from dotenv import load_dotenv
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from litellm import completion
load_dotenv()
from cloudbase_agent.crewai import CopilotKitState, copilotkit_stream, CrewAIAgent
@persist()
class ChatFlow(Flow[CopilotKitState]):
"""Conversational chat flow using CrewAI."""
@start()
async def chat(self) -> None:
"""Process chat messages and generate streaming responses."""
system_prompt = "You are a helpful assistant."
try:
# Prepare completion parameters
completion_params = {
"model": f"openai/{os.getenv('OPENAI_MODEL', 'gpt-4o-mini')}",
"messages": [
{"role": "system", "content": system_prompt},
*self.state.messages
],
"stream": True,
}
# Add tools if available
if self.state.copilotkit.actions:
completion_params["tools"] = [*self.state.copilotkit.actions]
completion_params["parallel_tool_calls"] = False
# Stream response using copilotkit_stream wrapper
response = await copilotkit_stream(completion(**completion_params))
message = response.choices[0].message
# Append to conversation history
self.state.messages.append(message)
except Exception as e:
print(f"[CrewAI Flow Chat] {e}")
def build_workflow():
"""Build and return a new chat workflow instance."""
return ChatFlow()
# Export create_agent function
def create_agent():
flow = build_workflow()
agent = CrewAIAgent(
name="ChatBot",
description="A helpful conversational assistant",
flow=flow,
)
return {"agent": agent}
2. Deploy as HTTP Service
# server.py
from cloudbase_agent.server import AgentServiceApp
from agent import create_agent
# One-line deployment
AgentServiceApp().run(create_agent, port=9000)
3. Configure Environment Variables
Create a .env file:
OPENAI_API_KEY=your-api-key
OPENAI_BASE_URL=https://api.deepseek.com/v1
OPENAI_MODEL=deepseek-chat
4. Start Service
python server.py
For complete project configuration (dependencies, etc.), see the example project.
API Reference
CrewAIAgent
Wraps CrewAI Flow instances as AG-UI compatible agents.
from cloudbase_agent.crewai import CrewAIAgent
agent = CrewAIAgent(
name="ChatBot",
description="A helpful assistant",
flow=flow_instance, # CrewAI Flow instance
)
Constructor Parameters:
| Parameter | Type | Description |
|---|---|---|
flow | Flow | CrewAI Flow instance |
name | str | Human-readable agent name (default: "") |
description | str | Detailed agent description (default: "") |
fix_event_ids | bool | Enable automatic event ID fixing (default: True) |
CopilotKitState
State model for CrewAI flows with AG-UI support.
from cloudbase_agent.crewai import CopilotKitState
from crewai.flow.flow import Flow
@persist()
class MyFlow(Flow[CopilotKitState]):
"""Flow with AG-UI state support."""
@start()
async def process(self):
# Access conversation messages
messages = self.state.messages
# Access client tools
tools = self.state.copilotkit.actions
State Fields:
| Field | Type | Description |
|---|---|---|
messages | List[dict] | Conversation message history |
copilotkit.actions | List[dict] | Client tools passed from frontend |
copilotkit_stream
Helper function for streaming LiteLLM responses.
from cloudbase_agent.crewai import copilotkit_stream
from litellm import completion
# Stream response with proper formatting
response = await copilotkit_stream(
completion(
model="openai/gpt-4o-mini",
messages=[...],
stream=True,
)
)
Parameters:
| Parameter | Type | Description |
|---|---|---|
stream | AsyncIterator | LiteLLM streaming response |
Returns: Complete response object with merged chunks
crewai_prepare_inputs
Convert AG-UI inputs to CrewAI format.
from cloudbase_agent.crewai import crewai_prepare_inputs
from ag_ui.core import RunAgentInput
def create_agent():
async def run(run_input: RunAgentInput):
# Convert inputs
inputs = crewai_prepare_inputs(run_input)
# Use with flow
flow.state.messages = inputs["messages"]
flow.state.copilotkit.actions = inputs["copilotkit"]["actions"]
Advanced Usage
With Resource Cleanup
def create_agent():
# Initialize resources
db = connect_database()
flow = build_workflow()
agent = CrewAIAgent(
name="ChatBot",
description="Agent with database access",
flow=flow,
)
# Define cleanup function
def cleanup():
db.close()
print("Resources cleaned up")
return {"agent": agent, "cleanup": cleanup}
Multiple Agents
from fastapi import FastAPI
from cloudbase_agent.server import create_send_message_adapter, RunAgentInput
app = FastAPI()
@app.post("/chat/send-message")
async def chat_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_chat_agent, request)
@app.post("/assistant/send-message")
async def assistant_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_assistant_agent, request)
Custom Event Handling
from cloudbase_agent.crewai import CrewFlowEventListener, BridgedTextMessageChunkEvent
class CustomEventListener(CrewFlowEventListener):
"""Custom event listener for flow events."""
async def on_text_chunk(self, event: BridgedTextMessageChunkEvent):
"""Handle text message chunks."""
print(f"Received chunk: {event.content}")
# Use with flow
listener = CustomEventListener()
# Events are automatically bridged to AG-UI format
State Converters
The package provides several converter utilities:
| Function | Description |
|---|---|
copilotkit_emit_state | Emit state updates to client |
copilotkit_predict_state | Predict and update state |
copilotkit_exit | Signal flow completion |
Example:
from cloudbase_agent.crewai import copilotkit_emit_state
@persist()
class MyFlow(Flow[CopilotKitState]):
@start()
async def process(self):
# Emit intermediate state update to client
await copilotkit_emit_state({"progress": 50})