使用 SSE
本文介绍如何在 HTTP 云函数中实现 SSE(Server-Sent Events)服务端推送,支持服务端向客户端单向推送实时数据流。
什么是 SSE
「SSE(Server-Sent Events)」是一种服务器向客户端推送实时数据的技术,基于 HTTP 协议实现。与 WebSocket 不同,SSE 是单向通信,只支持服务端向客户端推送数据,客户端通过普通的 HTTP 请求发送数据。
主要特点:
- 单向推送:服务端主动向客户端推送数据,客户端只能接收
- 基于 HTTP:使用标准 HTTP 协议,兼容性好,易于实现
- 自动重连:客户端断线后会自动重新连接
- 文本格式:传输的数据为文本格式(通常是 JSON)
- 更轻量:相比 WebSocket,实现更简单,资源占用更少
典型应用场景:
- AI 对话流式输出
- 实时日志推送
- 进度更新通知
- 服务端状态监控
- 实时数据看板
基本使用
1. 创建 SSE 云函数
在云函数中通过 context.sse() 获取 SSE 实例,然后使用 send() 方法推送数据。
exports.main = async function (event, context) {
// 获取 SSE 实例
const sse = context.sse?.();
if (sse && !sse.closed) {
// 发送消息
sse.send({
data: "Hello from SSE!"
});
// 结束连接
sse.end({
data: "Goodbye!"
});
}
return "";
};
2. 发送多条消息
exports.main = async function (event, context) {
const sse = context.sse?.();
if (sse && !sse.closed) {
// 发送第一条消息
sse.send({
data: "第一条消息"
});
// 发送第二条消息
sse.send({
data: "第二条消息"
});
// 发送 JSON 数据
sse.send({
data: { message: "这是 JSON 数据", timestamp: Date.now() }
});
// 结束连接
sse.end({
data: "所有消息发送完毕"
});
}
return "";
};
3. 流式推送数据
模拟流式推送场景,如 AI 对话:
exports.main = async function (event, context) {
const sse = context.sse?.();
if (sse && !sse.closed) {
const text = "这是一段流式输出的文本,每个字符都会逐个推送给客户端。";
// 逐字推送
for (let i = 0; i < text.length; i++) {
if (sse.closed) break; // 检查连接是否关闭
sse.send({
data: text[i]
});
// 模拟延迟
await new Promise(resolve => setTimeout(resolve, 100));
}
// 结束连接
sse.end({
data: "[完成]"
});
}
return "";
};
SSE API
context.sse() 方法
调用 context.sse() 可以获取 SSE 实例。支持传入配置参数:
const sse = context.sse?.({
keepalive: false, // 是否保持连接,默认 true
headers: {
'X-Custom-Header': 'value',
'X-Session-ID': 'session-123',
}
});
配置参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
keepalive | boolean | true | 是否保持连接活跃 |
headers | Record<string, string \| string[]> | {} | 自定义响应头 |
ISeverSentEvent 接口
context.sse() 返回的 SSE 实例提供以下方法和属性:
方法
interface ISeverSentEvent {
setEncoder(encoder: TextEncoder): void;
send(event: SseEvent | SseEvent[] | string | string[]): boolean;
end(msg?: SseEvent): void;
on(event: 'close', callback: () => void): void;
}
| 方法 | 说明 | 返回值 |
|---|---|---|
send(event) | 发送 SSE 事件 | boolean - 是否发送成功 |
end(msg?) | 结束 SSE 连接,可选发送最后一条消息 | void |
setEncoder(encoder) | 设置文本编码器 | void |
on('close', callback) | 监听连接关闭事件 | void |
属性
| 属性 | 类型 | 说明 |
|---|---|---|
closed | boolean | 连接是否已关闭 |
SseEvent 数据结构
interface SseEvent<T = string | Record<string, unknown>> {
data: T; // 发送的数据
comment?: string; // 注释(客户端不处理)
event?: string; // 事件类型
id?: string; // 事件 ID(用于重连)
retry?: number; // 重连时间(毫秒)
}
完整示例
1. AI 对话流式输出
exports.main = async function (event, context) {
const sse = context.sse?.();
if (!sse || sse.closed) {
return { error: "无法建立 SSE 连接" };
}
// 监听连接关闭
sse.on("close", () => {
console.log("客户端断开连接");
});
try {
// 解析用户消息
const { message } = event;
// 发送开始标记
sse.send({
event: "start",
data: { status: "开始生成回复" }
});
// 模拟 AI 生成回复(实际可调用 AI 模型)
const reply = "这是 AI 生成的回复内容,会逐字推送给用户。";
for (let i = 0; i < reply.length; i++) {
if (sse.closed) break;
sse.send({
event: "message",
data: { content: reply[i], index: i }
});
await new Promise(resolve => setTimeout(resolve, 50));
}
// 发送完成标记
sse.end({
event: "done",
data: { status: "生成完成", totalLength: reply.length }
});
} catch (error) {
// 发送错误信息
sse.end({
event: "error",
data: { message: error.message }
});
}
return "";
};
2. 实时进度推送
exports.main = async function (event, context) {
const sse = context.sse?.();
if (!sse || sse.closed) {
return { error: "无法建立 SSE 连接" };
}
// 模拟长时间任务
async function longRunningTask() {
const totalSteps = 10;
for (let step = 1; step <= totalSteps; step++) {
if (sse.closed) break;
// 发送进度更新
sse.send({
event: "progress",
data: {
step,
total: totalSteps,
percent: Math.round((step / totalSteps) * 100),
message: `正在处理第 ${step} 步...`
}
});
// 模拟任务执行
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
await longRunningTask();
// 任务完成
sse.end({
event: "complete",
data: { status: "任务完成", timestamp: Date.now() }
});
return "";
};
3. 实时日志推送
// 日志队列(实际可使用消息队列)
const logQueue = [];
exports.main = async function (event, context) {
const sse = context.sse?.();
if (!sse || sse.closed) {
return { error: "无法建立 SSE 连接" };
}
// 监听连接关闭
let intervalId;
sse.on("close", () => {
clearInterval(intervalId);
console.log("停止推送日志");
});
// 定期推送日志
intervalId = setInterval(() => {
if (sse.closed) {
clearInterval(intervalId);
return;
}
// 从队列中获取日志
const logs = getLatestLogs(); // 实际实现
if (logs.length > 0) {
// 批量发送日志
sse.send(logs.map(log => ({
event: "log",
data: {
level: log.level,
message: log.message,
timestamp: log.timestamp
}
})));
}
}, 1000);
// 保持连接
return "";
};
// 模拟获取日志
function getLatestLogs() {
// 实际实现中可以从数据库或日志系统获取
return [
{ level: "info", message: "应用启动", timestamp: Date.now() },
{ level: "debug", message: "处理请求", timestamp: Date.now() }
];
}
4. 服务端状态监控
exports.main = async function (event, context) {
const sse = context.sse?.({
headers: {
'X-Monitoring-Session': `session-${Date.now()}`
}
});
if (!sse || sse.closed) {
return { error: "无法建立 SSE 连接" };
}
// 监控数据推送
let intervalId = setInterval(() => {
if (sse.closed) {
clearInterval(intervalId);
return;
}
// 获取系统状态
const status = {
cpu: Math.random() * 100,
memory: Math.random() * 100,
activeConnections: Math.floor(Math.random() * 1000),
timestamp: Date.now()
};
sse.send({
event: "metrics",
data: status
});
}, 2000);
// 设置超时(可选)
setTimeout(() => {
clearInterval(intervalId);
sse.end({
event: "timeout",
data: { message: "监控会话超时" }
});
}, 300000); // 5 分钟超时
return "";
};
高级用法
1. 发送原始 SSE 消息
SSE 协议支持多种字段(data、event、id、retry 等),可以发送原始消息字符串:
exports.main = async function (event, context) {
const sse = context.sse?.();
if (sse && !sse.closed) {
// 发送原始消息(注意:末尾必须有换行符)
sse.send("data: 这是原始消息\n\n");
// 批量发送原始消息
sse.send([
"event: custom\n",
"data: 自定义事件\n",
"id: 123\n",
"retry: 1000\n",
"\n"
]);
sse.end();
}
return "";
};
2. 批量发送多个事件
exports.main = async function (event, context) {
const sse = context.sse?.();
if (sse && !sse.closed) {
// 一次发送多个事件
sse.send([
{ event: "message", data: "第一条消息" },
{ event: "message", data: "第二条消息" },
{ event: "message", data: "第三条消息" }
]);
sse.end();
}
return "";
};
3. 处理换行符
SSE 协议中换行符有特殊含义,发送包含换行符的数据时需要注意:
exports.main = async function (event, context) {
const sse = context.sse?.();
if (sse && !sse.closed) {
// 包含换行符的消息
sse.send({
data: "第一行\\n第二行\\n第三行"
});
// 多行文本
const multilineText = `这是第一行
这是第二行
这是第三行`;
sse.send({
data: multilineText
});
sse.end();
}
return "";
};
4. 自定义事件类型
exports.main = async function (event, context) {
const sse = context.sse?.();
if (sse && !sse.closed) {
// 不同类型的事件
sse.send({
event: "notification",
data: { type: "info", message: "这是一条通知" }
});
sse.send({
event: "alert",
data: { type: "warning", message: "这是一条警告" }
});
sse.send({
event: "update",
data: { type: "success", message: "更新成功" }
});
sse.end();
}
return "";
};
客户端连接示例
浏览器 JavaScript
// 创建 SSE 连接
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/sse');
// 监听默认消息事件
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
try {
const data = JSON.parse(event.data);
console.log('解析的数据:', data);
} catch (e) {
// 处理非 JSON 数据
}
};
// 监听自定义事件
eventSource.addEventListener('notification', (event) => {
console.log('收到通知:', event.data);
});
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`进度: ${data.percent}%`);
});
// 监听连接打开
eventSource.onopen = () => {
console.log('SSE 连接已建立');
};
// 监听错误
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
// 可以选择关闭连接
// eventSource.close();
};
// 关闭连接
function closeConnection() {
eventSource.close();
console.log('SSE 连接已关闭');
}
React 示例
import { useEffect, useState } from 'react';
function StreamingChat() {
const [messages, setMessages] = useState([]);
const [currentMessage, setCurrentMessage] = useState('');
useEffect(() => {
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/sse');
eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
setCurrentMessage(prev => prev + data.content);
});
eventSource.addEventListener('done', () => {
setMessages(prev => [...prev, currentMessage]);
setCurrentMessage('');
eventSource.close();
});
eventSource.onerror = (error) => {
console.error('连接错误:', error);
eventSource.close();
};
return () => {
eventSource.close();
};
}, []);
return (
<div>
{messages.map((msg, index) => (
<div key={index}>{msg}</div>
))}
{currentMessage && <div>{currentMessage}</div>}
</div>
);
}
Node.js 客户端
const EventSource = require('eventsource');
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/sse');
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
};
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`进度: ${data.percent}%`);
});
eventSource.onerror = (error) => {
console.error('错误:', error);
};
使用 TypeScript
import { TcbEventFunction } from "@cloudbase/functions-typings";
interface MessageData {
content: string;
timestamp: number;
}
export const main: TcbEventFunction<void, Promise<string>> = async function (
event,
context
) {
const sse = context.sse?.({
keepalive: true,
headers: {
'X-Session-ID': 'session-123'
}
});
if (!sse || sse.closed) {
return JSON.stringify({ error: "无法建立 SSE 连接" });
}
sse.on("close", () => {
console.log("SSE 连接已关闭");
});
// 发送消息
sse.send({
event: "message",
data: {
content: "Hello from SSE!",
timestamp: Date.now()
} as MessageData
});
// 结束连接
sse.end({
event: "complete",
data: { status: "完成" }
});
return "";
};
注意事项
⚠️ 注意:以下事项在使用 SSE 时需要特别注意
- 单向通信:SSE 只支持服务端向客户端推送,客户端需要通过其他方式(如 HTTP POST)向服务端发送数据
- 连接保持:SSE 连接会长期占用资源,需要设置合理的超时机制和连接数限制
- 浏览器限制:同一域名下浏览器对 SSE 连接数有限制(通常为 6 个),需要注意连接管理
- 自动重连:客户端会自动重连,服务端需要能够处理重复连接
- 文本格式:SSE 只能传输文本数据,复杂数据需要序列化为 JSON
- 错误处理:需要正确处理连接关闭和错误情况,避免资源泄漏
- 返回值:使用 SSE 的云函数必须返回空字符串或不返回内容
- Content-Type:SSE 响应的
Content-Type为text/event-stream,由框架自动设置
SSE vs WebSocket
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方式 | 单向(服务端→客户端) | 双向(服务端↔客户端) |
| 协议 | HTTP | WebSocket 协议 |
| 数据格式 | 文本(通常 JSON) | 文本或二进制 |
| 自动重连 | 是 | 否(需手动实现) |
| 浏览器兼容性 | 好(IE 除外) | 好 |
| 实现复杂度 | 简单 | 相对复杂 |
| 资源占用 | 较低 | 较高 |
| 适用场景 | 单向数据推送 | 实时双向通信 |
选择建议:
- 只需要服务端推送数据:选择 SSE
- 需要双向实时通信:选择 WebSocket
- 追求简单实现:选择 SSE
- 需要传输二进制数据:选择 WebSocket