Skip to main content

CrewAI Python Adapter

Make CrewAI Flows compatible with the AG-UI Protocol.

What is AG-UI?

AG-UI is an open, lightweight, event-based protocol for standardizing AI Agent interactions with user interfaces. It enables agents to:

  • Real-time streaming conversations
  • Bidirectional state synchronization
  • Frontend tool integration (Client Tools)
  • Human-in-the-loop workflows

What does this package solve?

  • Make CrewAI Flows support AG-UI protocol: Adapt CrewAI Flow instances to AG-UI compatible agents
  • Event bridging: Convert CrewAI flow events to AG-UI events
  • State management: Provides state converters and helpers for CopilotKit integration

Core Concepts

ExportDescription
CrewAIAgentWraps CrewAI Flow instances as AG-UI compatible agents
CopilotKitStateState model for CrewAI flows with AG-UI support
copilotkit_streamStreaming helper for LiteLLM responses
crewai_prepare_inputsConvert AG-UI inputs to CrewAI format

Works with

PackagePurpose
cloudbase-agent-serverDeploy agents as AG-UI compatible HTTP services
crewaiCrewAI flow framework
litellmMulti-provider LLM integration

Architecture Diagram

Installation

pip install cloudbase-agent-crewai cloudbase-agent-server crewai litellm

Quick Start

1. Create CrewAI Flow

# agent.py
import os
from dotenv import load_dotenv
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from litellm import completion

load_dotenv()

from cloudbase_agent.crewai import CopilotKitState, copilotkit_stream, CrewAIAgent


@persist()
class ChatFlow(Flow[CopilotKitState]):
"""Conversational chat flow using CrewAI."""

@start()
async def chat(self) -> None:
"""Process chat messages and generate streaming responses."""
system_prompt = "You are a helpful assistant."

try:
# Prepare completion parameters
completion_params = {
"model": f"openai/{os.getenv('OPENAI_MODEL', 'gpt-4o-mini')}",
"messages": [
{"role": "system", "content": system_prompt},
*self.state.messages
],
"stream": True,
}

# Add tools if available
if self.state.copilotkit.actions:
completion_params["tools"] = [*self.state.copilotkit.actions]
completion_params["parallel_tool_calls"] = False

# Stream response using copilotkit_stream wrapper
response = await copilotkit_stream(completion(**completion_params))

message = response.choices[0].message

# Append to conversation history
self.state.messages.append(message)
except Exception as e:
print(f"[CrewAI Flow Chat] {e}")


def build_workflow():
"""Build and return a new chat workflow instance."""
return ChatFlow()


# Export create_agent function
def create_agent():
flow = build_workflow()
agent = CrewAIAgent(
name="ChatBot",
description="A helpful conversational assistant",
flow=flow,
)
return {"agent": agent}

2. Deploy as HTTP Service

# server.py
from cloudbase_agent.server import AgentServiceApp
from agent import create_agent

# One-line deployment
AgentServiceApp().run(create_agent, port=9000)

3. Configure Environment Variables

Create a .env file:

OPENAI_API_KEY=your-api-key
OPENAI_BASE_URL=https://api.deepseek.com/v1
OPENAI_MODEL=deepseek-chat

4. Start Service

python server.py

For complete project configuration (dependencies, etc.), see the example project.

API Reference

CrewAIAgent

Wraps CrewAI Flow instances as AG-UI compatible agents.

from cloudbase_agent.crewai import CrewAIAgent

agent = CrewAIAgent(
name="ChatBot",
description="A helpful assistant",
flow=flow_instance, # CrewAI Flow instance
)

Constructor Parameters:

ParameterTypeDescription
flowFlowCrewAI Flow instance
namestrHuman-readable agent name (default: "")
descriptionstrDetailed agent description (default: "")
fix_event_idsboolEnable automatic event ID fixing (default: True)

CopilotKitState

State model for CrewAI flows with AG-UI support.

from cloudbase_agent.crewai import CopilotKitState
from crewai.flow.flow import Flow

@persist()
class MyFlow(Flow[CopilotKitState]):
"""Flow with AG-UI state support."""

@start()
async def process(self):
# Access conversation messages
messages = self.state.messages

# Access client tools
tools = self.state.copilotkit.actions

State Fields:

FieldTypeDescription
messagesList[dict]Conversation message history
copilotkit.actionsList[dict]Client tools passed from frontend

copilotkit_stream

Helper function for streaming LiteLLM responses.

from cloudbase_agent.crewai import copilotkit_stream
from litellm import completion

# Stream response with proper formatting
response = await copilotkit_stream(
completion(
model="openai/gpt-4o-mini",
messages=[...],
stream=True,
)
)

Parameters:

ParameterTypeDescription
streamAsyncIteratorLiteLLM streaming response

Returns: Complete response object with merged chunks

crewai_prepare_inputs

Convert AG-UI inputs to CrewAI format.

from cloudbase_agent.crewai import crewai_prepare_inputs
from ag_ui.core import RunAgentInput

def create_agent():
async def run(run_input: RunAgentInput):
# Convert inputs
inputs = crewai_prepare_inputs(run_input)

# Use with flow
flow.state.messages = inputs["messages"]
flow.state.copilotkit.actions = inputs["copilotkit"]["actions"]

Advanced Usage

With Resource Cleanup

def create_agent():
# Initialize resources
db = connect_database()

flow = build_workflow()
agent = CrewAIAgent(
name="ChatBot",
description="Agent with database access",
flow=flow,
)

# Define cleanup function
def cleanup():
db.close()
print("Resources cleaned up")

return {"agent": agent, "cleanup": cleanup}

Multiple Agents

from fastapi import FastAPI
from cloudbase_agent.server import create_send_message_adapter, RunAgentInput

app = FastAPI()

@app.post("/chat/send-message")
async def chat_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_chat_agent, request)

@app.post("/assistant/send-message")
async def assistant_endpoint(request: RunAgentInput):
return await create_send_message_adapter(create_assistant_agent, request)

Custom Event Handling

from cloudbase_agent.crewai import CrewFlowEventListener, BridgedTextMessageChunkEvent

class CustomEventListener(CrewFlowEventListener):
"""Custom event listener for flow events."""

async def on_text_chunk(self, event: BridgedTextMessageChunkEvent):
"""Handle text message chunks."""
print(f"Received chunk: {event.content}")

# Use with flow
listener = CustomEventListener()
# Events are automatically bridged to AG-UI format

State Converters

The package provides several converter utilities:

FunctionDescription
copilotkit_emit_stateEmit state updates to client
copilotkit_predict_statePredict and update state
copilotkit_exitSignal flow completion

Example:

from cloudbase_agent.crewai import copilotkit_emit_state

@persist()
class MyFlow(Flow[CopilotKitState]):
@start()
async def process(self):
# Emit intermediate state update to client
await copilotkit_emit_state({"progress": 50})