跳到主要内容

SSE 协议支持

本文介绍如何在 Web 云函数中实现 SSE(Server-Sent Events)服务端推送,支持服务端向客户端单向推送实时数据流。

什么是 SSE

「SSE(Server-Sent Events)」是一种服务器向客户端推送实时数据的技术,基于 HTTP 协议实现。与 WebSocket 不同,SSE 是单向通信,只支持服务端向客户端推送数据。

主要特点

  • 单向推送:服务端主动向客户端推送数据,客户端只能接收
  • 基于 HTTP:使用标准 HTTP 协议,兼容性好,易于实现
  • 自动重连:客户端断线后会自动重新连接
  • 文本格式:传输的数据为文本格式(通常是 JSON)
  • 轻量实现:相比 WebSocket,实现更简单,资源占用更少

典型应用场景

  • AI 对话流式输出
  • 实时日志推送
  • 进度更新通知
  • 服务端状态监控
  • 实时数据看板

工作原理

协议启用

SSE 协议在 Web 云函数中默认支持,无需在控制台进行任何额外配置即可使用。

建立连接

  1. 客户端通过标准 HTTP 请求建立 SSE 连接
  2. 服务端返回 Content-Type: text/event-stream 响应头
  3. 连接保持打开状态,服务端持续推送数据

连接生命周期

  • 调用对应关系:一次 SSE 连接的生命周期等同于一次函数调用请求
    • 连接建立 = 请求发起
    • 连接断开 = 请求结束
  • 实例映射:函数实例与 SSE 连接是一一对应的,同一实例在某一时刻仅处理一个 SSE 连接,新连接会启动新的实例
  • 连接保持:连接建立后,实例持续运行,通过流式响应推送数据
  • 连接结束:当 SSE 连接断开或服务端调用 end() 时,对应的函数实例停止运行

使用限制

在使用 SSE 时,需要注意以下限制:

限制项说明
执行超时时间连接持续时间受函数最大运行时长限制
并发连接每个连接启动独立实例,受账户并发配额限制
浏览器连接数限制同一域名下浏览器对 SSE 连接数有限制(通常为 6 个)
数据格式只能传输文本数据,复杂数据需序列化为 JSON

操作步骤

步骤1:编写服务端代码

SSE 协议默认支持,无需在控制台开启。根据您使用的编程语言和框架,编写 SSE 服务端代码。

安装依赖

安装 Express 框架:

npm install express

package.json 中添加依赖:

{
"dependencies": {
"express": "^4.18.0"
}
}

编写服务端代码

使用 Express 实现 SSE 流式响应:

const express = require('express');
const app = express();

// SSE 路由
app.get('/stream', (req, res) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

console.log('新的 SSE 连接建立');

// 流式推送数据
const msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!'];

let index = 0;
const intervalId = setInterval(() => {
if (index < msg.length) {
// 发送 SSE 消息
const data = {
id: index,
content: msg[index],
};
res.write(`data: ${JSON.stringify(data)}\n\n`);
index++;
} else {
// 完成推送,关闭连接
clearInterval(intervalId);
res.end();
}
}, 1000);

// 客户端断开连接时清理
req.on('close', () => {
clearInterval(intervalId);
console.log('SSE 连接已关闭');
});
});

// 启动服务,监听 9000 端口
app.listen(9000, () => {
console.log('SSE 服务已启动,监听端口 9000');
});

步骤2:部署云函数

完成代码编写后,部署云函数:

  • 在控制台点击「部署」按钮
  • 或使用 CloudBase CLI:tcb fn deploy --httpFn

步骤3:客户端连接测试

使用浏览器或命令行工具连接 SSE 服务:

使用 curl 测试

curl -v -H 'Accept:text/event-stream' https://your-function.run.tcloudbase.com/stream

预期返回结果

服务器将以流的形式分块返回数据,每条数据间隔约 1 秒:

data: {"id": 0, "content": "SSE"}

data: {"id": 1, "content": "empowering"}

data: {"id": 2, "content": "GPT"}

...

使用浏览器 JavaScript 测试

// 创建 SSE 连接
const eventSource = new EventSource('https://your-function.run.tcloudbase.com/stream');

eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);

try {
const data = JSON.parse(event.data);
console.log('解析数据:', data);
} catch (e) {
// 处理非 JSON 数据
}
};

eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
};

高级用法

1. AI 对话流式输出

实现类似 ChatGPT 的流式对话效果:

const express = require('express');
const app = express();

app.use(express.json());

// AI 对话流式输出
app.post('/chat', async (req, res) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

const { message } = req.body;

// 发送开始事件
res.write(`event: start\ndata: ${JSON.stringify({ status: '开始生成回复' })}\n\n`);

// 模拟 AI 生成回复(实际应调用 AI 模型 API)
const reply = '这是 AI 生成的回复内容,会逐字推送给用户。';

for (let i = 0; i < reply.length; i++) {
const data = {
content: reply[i],
index: i,
};
res.write(`event: message\ndata: ${JSON.stringify(data)}\n\n`);

// 模拟生成延迟
await new Promise((resolve) => setTimeout(resolve, 50));
}

// 发送完成事件
res.write(
`event: done\ndata: ${JSON.stringify({ status: '生成完成', totalLength: reply.length })}\n\n`
);
res.end();
});

app.listen(9000);

2. 实时进度推送

推送任务执行进度:

const express = require('express');
const app = express();

app.get('/progress', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

const totalSteps = 10;

for (let step = 1; step <= totalSteps; step++) {
const data = {
step,
total: totalSteps,
percent: Math.round((step / totalSteps) * 100),
message: `正在处理第 ${step} 步...`,
};

res.write(`event: progress\ndata: ${JSON.stringify(data)}\n\n`);

// 模拟任务执行
await new Promise((resolve) => setTimeout(resolve, 1000));
}

// 任务完成
res.write(
`event: complete\ndata: ${JSON.stringify({ status: '任务完成', timestamp: Date.now() })}\n\n`
);
res.end();
});

app.listen(9000);

3. 实时日志推送

推送服务端日志到客户端:

const express = require('express');
const app = express();

// 模拟日志队列
const logQueue = [];

// 模拟生成日志
setInterval(() => {
logQueue.push({
level: ['info', 'debug', 'warn'][Math.floor(Math.random() * 3)],
message: `日志消息 ${Date.now()}`,
timestamp: Date.now(),
});

// 限制队列大小
if (logQueue.length > 100) {
logQueue.shift();
}
}, 2000);

app.get('/logs', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

console.log('开始推送日志');

// 定期推送日志
const intervalId = setInterval(() => {
if (logQueue.length > 0) {
const logs = logQueue.splice(0, 5); // 每次推送最多 5 条

logs.forEach((log) => {
res.write(`event: log\ndata: ${JSON.stringify(log)}\n\n`);
});
}
}, 1000);

// 客户端断开时清理
req.on('close', () => {
clearInterval(intervalId);
console.log('停止推送日志');
});
});

app.listen(9000);

4. 服务端状态监控

实时推送服务器状态指标:

const express = require('express');
const app = express();

app.get('/metrics', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Monitoring-Session', `session-${Date.now()}`);

console.log('开始推送监控数据');

// 定期推送监控数据
const intervalId = setInterval(() => {
const metrics = {
cpu: Math.random() * 100,
memory: Math.random() * 100,
activeConnections: Math.floor(Math.random() * 1000),
timestamp: Date.now(),
};

res.write(`event: metrics\ndata: ${JSON.stringify(metrics)}\n\n`);
}, 2000);

// 5 分钟后超时
const timeoutId = setTimeout(() => {
clearInterval(intervalId);
res.write(`event: timeout\ndata: ${JSON.stringify({ message: '监控会话超时' })}\n\n`);
res.end();
}, 300000);

// 客户端断开时清理
req.on('close', () => {
clearInterval(intervalId);
clearTimeout(timeoutId);
console.log('停止推送监控数据');
});
});

app.listen(9000);

客户端连接示例

// 创建 SSE 连接
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/stream');

// 监听默认消息事件
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);

try {
const data = JSON.parse(event.data);
console.log('解析的数据:', data);
} catch (e) {
// 处理非 JSON 数据
}
};

// 监听自定义事件
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`进度: ${data.percent}%`);
});

eventSource.addEventListener('done', (event) => {
console.log('任务完成');
eventSource.close(); // 关闭连接
});

// 连接打开
eventSource.onopen = () => {
console.log('SSE 连接已建立');
};

// 错误处理
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
// 可选择关闭连接
// eventSource.close();
};

// 手动关闭连接
function closeConnection() {
eventSource.close();
console.log('SSE 连接已关闭');
}

SSE 消息格式

SSE 协议使用特定的文本格式发送消息,每条消息由一个或多个字段组成:

event: message
data: {"content": "Hello"}
id: 123
retry: 1000

标准字段

字段说明是否必需
data消息数据内容
event事件类型(默认为 message
id事件 ID,用于客户端重连时恢复
retry重连间隔时间(毫秒)
: 注释注释行,客户端会忽略(用于保持连接)

消息示例

基本消息

data: Hello World

JSON 数据

data: {"message": "Hello", "timestamp": 1234567890}

带事件类型

event: notification
data: {"type": "info", "content": "新消息"}

多行数据

data: 第一行
data: 第二行
data: 第三行

完整消息

event: update
id: 123
retry: 1000
data: {"status": "success"}

常见问题

1. 连接无法建立怎么办?

  • 确认云函数已正确部署和运行
  • 检查响应头是否设置了 Content-Type: text/event-stream
  • 确认使用正确的函数访问地址
  • 查看云函数日志是否有错误信息
  • 检查客户端浏览器是否支持 SSE(IE 不支持)

2. 连接频繁断开怎么办?

  • 增加函数执行超时时间配置
  • 实现心跳机制,定期发送注释行保持连接:res.write(': keepalive\n\n')
  • 检查客户端网络是否稳定
  • 查看云函数日志,确认是否有异常错误导致连接断开
  • 客户端实现自动重连机制(EventSource 默认支持)

3. 如何实现断线重连?

EventSource 客户端会自动重连,服务端可以通过 idretry 字段配合:

服务端发送 ID 和重连间隔

res.write(`id: ${messageId}\nretry: 3000\ndata: ${JSON.stringify(data)}\n\n`);

客户端获取最后的事件 ID

const eventSource = new EventSource('https://your-service.run.tcloudbase.com/stream');

eventSource.onopen = () => {
console.log('连接已建立');
};

eventSource.onerror = () => {
console.log('连接断开,自动重连中...');
};

4. 如何处理大量并发连接?

  • 实例扩展:云函数会自动为每个 SSE 连接创建独立实例
  • 连接数限制:注意浏览器同域名连接数限制(通常为 6 个)
  • 状态管理:使用 Redis 等外部存储共享连接状态
  • 消息队列:使用消息队列处理广播和异步消息
  • 资源监控:关注函数实例数量和资源使用情况

5. SSE 和 WebSocket 如何选择?

选择 SSE 的场景

  • 只需要服务端向客户端推送数据
  • 追求简单实现和快速开发
  • 需要自动重连功能
  • 兼容性要求较高(HTTP 协议)

选择 WebSocket 的场景

  • 需要双向实时通信
  • 需要传输二进制数据
  • 需要更低的延迟和更高的效率
  • 客户端需要频繁向服务端发送数据

6. 如何发送心跳保持连接?

定期发送注释行(以 : 开头)保持连接活跃:

const express = require('express');
const app = express();

app.get('/stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

// 每 30 秒发送心跳
const heartbeatInterval = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);

// 发送实际数据
const dataInterval = setInterval(() => {
res.write(`data: ${JSON.stringify({ timestamp: Date.now() })}\n\n`);
}, 5000);

req.on('close', () => {
clearInterval(heartbeatInterval);
clearInterval(dataInterval);
});
});

app.listen(9000);

SSE vs WebSocket

特性SSEWebSocket
通信方式单向(服务端→客户端)双向(服务端↔客户端)
协议HTTPWebSocket 协议
数据格式文本(通常 JSON)文本或二进制
自动重连否(需手动实现)
浏览器兼容性好(IE 除外)
实现复杂度简单相对复杂
资源占用较低较高
配置要求无需配置需要在控制台开启
适用场景单向数据推送实时双向通信

选择建议

  • 只需要服务端推送数据:选择 SSE
  • 需要双向实时通信:选择 WebSocket
  • 追求简单实现:选择 SSE
  • 需要传输二进制数据:选择 WebSocket

相关文档