跳到主要内容

使用 SSE

本文介绍如何在 HTTP 云函数中实现 SSE(Server-Sent Events)服务端推送,支持服务端向客户端单向推送实时数据流。

什么是 SSE

「SSE(Server-Sent Events)」是一种服务器向客户端推送实时数据的技术,基于 HTTP 协议实现。与 WebSocket 不同,SSE 是单向通信,只支持服务端向客户端推送数据,客户端通过普通的 HTTP 请求发送数据。

主要特点

  • 单向推送:服务端主动向客户端推送数据,客户端只能接收
  • 基于 HTTP:使用标准 HTTP 协议,兼容性好,易于实现
  • 自动重连:客户端断线后会自动重新连接
  • 文本格式:传输的数据为文本格式(通常是 JSON)
  • 更轻量:相比 WebSocket,实现更简单,资源占用更少

典型应用场景

  • AI 对话流式输出
  • 实时日志推送
  • 进度更新通知
  • 服务端状态监控
  • 实时数据看板

基本使用

1. 创建 SSE 云函数

在云函数中通过 context.sse() 获取 SSE 实例,然后使用 send() 方法推送数据。

exports.main = async function (event, context) {
// 获取 SSE 实例
const sse = context.sse?.();

if (sse && !sse.closed) {
// 发送消息
sse.send({
data: "Hello from SSE!"
});

// 结束连接
sse.end({
data: "Goodbye!"
});
}

return "";
};

2. 发送多条消息

exports.main = async function (event, context) {
const sse = context.sse?.();

if (sse && !sse.closed) {
// 发送第一条消息
sse.send({
data: "第一条消息"
});

// 发送第二条消息
sse.send({
data: "第二条消息"
});

// 发送 JSON 数据
sse.send({
data: { message: "这是 JSON 数据", timestamp: Date.now() }
});

// 结束连接
sse.end({
data: "所有消息发送完毕"
});
}

return "";
};

3. 流式推送数据

模拟流式推送场景,如 AI 对话:

exports.main = async function (event, context) {
const sse = context.sse?.();

if (sse && !sse.closed) {
const text = "这是一段流式输出的文本,每个字符都会逐个推送给客户端。";

// 逐字推送
for (let i = 0; i < text.length; i++) {
if (sse.closed) break; // 检查连接是否关闭

sse.send({
data: text[i]
});

// 模拟延迟
await new Promise(resolve => setTimeout(resolve, 100));
}

// 结束连接
sse.end({
data: "[完成]"
});
}

return "";
};

SSE API

context.sse() 方法

调用 context.sse() 可以获取 SSE 实例。支持传入配置参数:

const sse = context.sse?.({
keepalive: false, // 是否保持连接,默认 true
headers: {
'X-Custom-Header': 'value',
'X-Session-ID': 'session-123',
}
});

配置参数

参数类型默认值说明
keepalivebooleantrue是否保持连接活跃
headersRecord<string, string \| string[]>{}自定义响应头

ISeverSentEvent 接口

context.sse() 返回的 SSE 实例提供以下方法和属性:

方法

interface ISeverSentEvent {
setEncoder(encoder: TextEncoder): void;
send(event: SseEvent | SseEvent[] | string | string[]): boolean;
end(msg?: SseEvent): void;
on(event: 'close', callback: () => void): void;
}
方法说明返回值
send(event)发送 SSE 事件boolean - 是否发送成功
end(msg?)结束 SSE 连接,可选发送最后一条消息void
setEncoder(encoder)设置文本编码器void
on('close', callback)监听连接关闭事件void

属性

属性类型说明
closedboolean连接是否已关闭

SseEvent 数据结构

interface SseEvent<T = string | Record<string, unknown>> {
data: T; // 发送的数据
comment?: string; // 注释(客户端不处理)
event?: string; // 事件类型
id?: string; // 事件 ID(用于重连)
retry?: number; // 重连时间(毫秒)
}

完整示例

1. AI 对话流式输出

exports.main = async function (event, context) {
const sse = context.sse?.();

if (!sse || sse.closed) {
return { error: "无法建立 SSE 连接" };
}

// 监听连接关闭
sse.on("close", () => {
console.log("客户端断开连接");
});

try {
// 解析用户消息
const { message } = event;

// 发送开始标记
sse.send({
event: "start",
data: { status: "开始生成回复" }
});

// 模拟 AI 生成回复(实际可调用 AI 模型)
const reply = "这是 AI 生成的回复内容,会逐字推送给用户。";

for (let i = 0; i < reply.length; i++) {
if (sse.closed) break;

sse.send({
event: "message",
data: { content: reply[i], index: i }
});

await new Promise(resolve => setTimeout(resolve, 50));
}

// 发送完成标记
sse.end({
event: "done",
data: { status: "生成完成", totalLength: reply.length }
});

} catch (error) {
// 发送错误信息
sse.end({
event: "error",
data: { message: error.message }
});
}

return "";
};

2. 实时进度推送

exports.main = async function (event, context) {
const sse = context.sse?.();

if (!sse || sse.closed) {
return { error: "无法建立 SSE 连接" };
}

// 模拟长时间任务
async function longRunningTask() {
const totalSteps = 10;

for (let step = 1; step <= totalSteps; step++) {
if (sse.closed) break;

// 发送进度更新
sse.send({
event: "progress",
data: {
step,
total: totalSteps,
percent: Math.round((step / totalSteps) * 100),
message: `正在处理第 ${step} 步...`
}
});

// 模拟任务执行
await new Promise(resolve => setTimeout(resolve, 1000));
}
}

await longRunningTask();

// 任务完成
sse.end({
event: "complete",
data: { status: "任务完成", timestamp: Date.now() }
});

return "";
};

3. 实时日志推送

// 日志队列(实际可使用消息队列)
const logQueue = [];

exports.main = async function (event, context) {
const sse = context.sse?.();

if (!sse || sse.closed) {
return { error: "无法建立 SSE 连接" };
}

// 监听连接关闭
let intervalId;
sse.on("close", () => {
clearInterval(intervalId);
console.log("停止推送日志");
});

// 定期推送日志
intervalId = setInterval(() => {
if (sse.closed) {
clearInterval(intervalId);
return;
}

// 从队列中获取日志
const logs = getLatestLogs(); // 实际实现

if (logs.length > 0) {
// 批量发送日志
sse.send(logs.map(log => ({
event: "log",
data: {
level: log.level,
message: log.message,
timestamp: log.timestamp
}
})));
}
}, 1000);

// 保持连接
return "";
};

// 模拟获取日志
function getLatestLogs() {
// 实际实现中可以从数据库或日志系统获取
return [
{ level: "info", message: "应用启动", timestamp: Date.now() },
{ level: "debug", message: "处理请求", timestamp: Date.now() }
];
}

4. 服务端状态监控

exports.main = async function (event, context) {
const sse = context.sse?.({
headers: {
'X-Monitoring-Session': `session-${Date.now()}`
}
});

if (!sse || sse.closed) {
return { error: "无法建立 SSE 连接" };
}

// 监控数据推送
let intervalId = setInterval(() => {
if (sse.closed) {
clearInterval(intervalId);
return;
}

// 获取系统状态
const status = {
cpu: Math.random() * 100,
memory: Math.random() * 100,
activeConnections: Math.floor(Math.random() * 1000),
timestamp: Date.now()
};

sse.send({
event: "metrics",
data: status
});
}, 2000);

// 设置超时(可选)
setTimeout(() => {
clearInterval(intervalId);
sse.end({
event: "timeout",
data: { message: "监控会话超时" }
});
}, 300000); // 5 分钟超时

return "";
};

高级用法

1. 发送原始 SSE 消息

SSE 协议支持多种字段(dataeventidretry 等),可以发送原始消息字符串:

exports.main = async function (event, context) {
const sse = context.sse?.();

if (sse && !sse.closed) {
// 发送原始消息(注意:末尾必须有换行符)
sse.send("data: 这是原始消息\n\n");

// 批量发送原始消息
sse.send([
"event: custom\n",
"data: 自定义事件\n",
"id: 123\n",
"retry: 1000\n",
"\n"
]);

sse.end();
}

return "";
};

2. 批量发送多个事件

exports.main = async function (event, context) {
const sse = context.sse?.();

if (sse && !sse.closed) {
// 一次发送多个事件
sse.send([
{ event: "message", data: "第一条消息" },
{ event: "message", data: "第二条消息" },
{ event: "message", data: "第三条消息" }
]);

sse.end();
}

return "";
};

3. 处理换行符

SSE 协议中换行符有特殊含义,发送包含换行符的数据时需要注意:

exports.main = async function (event, context) {
const sse = context.sse?.();

if (sse && !sse.closed) {
// 包含换行符的消息
sse.send({
data: "第一行\\n第二行\\n第三行"
});

// 多行文本
const multilineText = `这是第一行
这是第二行
这是第三行`;

sse.send({
data: multilineText
});

sse.end();
}

return "";
};

4. 自定义事件类型

exports.main = async function (event, context) {
const sse = context.sse?.();

if (sse && !sse.closed) {
// 不同类型的事件
sse.send({
event: "notification",
data: { type: "info", message: "这是一条通知" }
});

sse.send({
event: "alert",
data: { type: "warning", message: "这是一条警告" }
});

sse.send({
event: "update",
data: { type: "success", message: "更新成功" }
});

sse.end();
}

return "";
};

客户端连接示例

浏览器 JavaScript

// 创建 SSE 连接
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/sse');

// 监听默认消息事件
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);

try {
const data = JSON.parse(event.data);
console.log('解析的数据:', data);
} catch (e) {
// 处理非 JSON 数据
}
};

// 监听自定义事件
eventSource.addEventListener('notification', (event) => {
console.log('收到通知:', event.data);
});

eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`进度: ${data.percent}%`);
});

// 监听连接打开
eventSource.onopen = () => {
console.log('SSE 连接已建立');
};

// 监听错误
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);

// 可以选择关闭连接
// eventSource.close();
};

// 关闭连接
function closeConnection() {
eventSource.close();
console.log('SSE 连接已关闭');
}

React 示例

import { useEffect, useState } from 'react';

function StreamingChat() {
const [messages, setMessages] = useState([]);
const [currentMessage, setCurrentMessage] = useState('');

useEffect(() => {
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/sse');

eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
setCurrentMessage(prev => prev + data.content);
});

eventSource.addEventListener('done', () => {
setMessages(prev => [...prev, currentMessage]);
setCurrentMessage('');
eventSource.close();
});

eventSource.onerror = (error) => {
console.error('连接错误:', error);
eventSource.close();
};

return () => {
eventSource.close();
};
}, []);

return (
<div>
{messages.map((msg, index) => (
<div key={index}>{msg}</div>
))}
{currentMessage && <div>{currentMessage}</div>}
</div>
);
}

Node.js 客户端

const EventSource = require('eventsource');

const eventSource = new EventSource('https://your-service.run.tcloudbase.com/sse');

eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
};

eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`进度: ${data.percent}%`);
});

eventSource.onerror = (error) => {
console.error('错误:', error);
};

使用 TypeScript

import { TcbEventFunction } from "@cloudbase/functions-typings";

interface MessageData {
content: string;
timestamp: number;
}

export const main: TcbEventFunction<void, Promise<string>> = async function (
event,
context
) {
const sse = context.sse?.({
keepalive: true,
headers: {
'X-Session-ID': 'session-123'
}
});

if (!sse || sse.closed) {
return JSON.stringify({ error: "无法建立 SSE 连接" });
}

sse.on("close", () => {
console.log("SSE 连接已关闭");
});

// 发送消息
sse.send({
event: "message",
data: {
content: "Hello from SSE!",
timestamp: Date.now()
} as MessageData
});

// 结束连接
sse.end({
event: "complete",
data: { status: "完成" }
});

return "";
};

注意事项

⚠️ 注意:以下事项在使用 SSE 时需要特别注意

  • 单向通信:SSE 只支持服务端向客户端推送,客户端需要通过其他方式(如 HTTP POST)向服务端发送数据
  • 连接保持:SSE 连接会长期占用资源,需要设置合理的超时机制和连接数限制
  • 浏览器限制:同一域名下浏览器对 SSE 连接数有限制(通常为 6 个),需要注意连接管理
  • 自动重连:客户端会自动重连,服务端需要能够处理重复连接
  • 文本格式:SSE 只能传输文本数据,复杂数据需要序列化为 JSON
  • 错误处理:需要正确处理连接关闭和错误情况,避免资源泄漏
  • 返回值:使用 SSE 的云函数必须返回空字符串或不返回内容
  • Content-Type:SSE 响应的 Content-Typetext/event-stream,由框架自动设置

SSE vs WebSocket

特性SSEWebSocket
通信方式单向(服务端→客户端)双向(服务端↔客户端)
协议HTTPWebSocket 协议
数据格式文本(通常 JSON)文本或二进制
自动重连否(需手动实现)
浏览器兼容性好(IE 除外)
实现复杂度简单相对复杂
资源占用较低较高
适用场景单向数据推送实时双向通信

选择建议

  • 只需要服务端推送数据:选择 SSE
  • 需要双向实时通信:选择 WebSocket
  • 追求简单实现:选择 SSE
  • 需要传输二进制数据:选择 WebSocket

相关文档