SSE 协议支持
本文介绍如何在 Web 云函数中实现 SSE(Server-Sent Events)服务端推送,支持服务端向客户端单向推送实时数据流。
什么是 SSE
「SSE(Server-Sent Events)」是一种服务器向客户端推送实时数据的技术,基于 HTTP 协议实现。与 WebSocket 不同,SSE 是单向通信,只支持服务端向客户端推送数据。
主要特点:
- 单向推送:服务端主动向客户端推送数据,客户端只能接收
- 基于 HTTP:使用标准 HTTP 协议,兼容性好,易于实现
- 自动重连:客户端断线后会自动重新连接
- 文本格式:传输的数据为文本格式(通常是 JSON)
- 轻量实现:相比 WebSocket,实现更简单,资源占用更少
典型应用场景:
- AI 对话流式输出
- 实时日志推送
- 进度更新通知
- 服务端状态监控
- 实时数据看板
工作原理
协议启用
SSE 协议在 Web 云函数中默认支持,无需在控制台进行任何额外配置即可使用。
建立连接
- 客户端通过标准 HTTP 请求建立 SSE 连接
- 服务端返回
Content-Type: text/event-stream响应头 - 连接保持打开状态,服务端持续推送数据
连接生命周期
- 调用对应关系:一次 SSE 连接的生命周期等同于一次函数调用请求
- 连接建立 = 请求发起
- 连接断开 = 请求结束
- 实例映射:函数实例与 SSE 连接是一一对应的,同一实例在某一时刻仅处理一个 SSE 连接,新连接会启动新的实例
- 连接保持:连接建立后,实例持续运行,通过流式响应推送数据
- 连接结束:当 SSE 连接断开或服务端调用
end()时,对应的函数实例停止运行
使用限制
在使用 SSE 时,需要注意以下限制:
| 限制项 | 说明 |
|---|---|
| 执行超时时间 | 连接持续时间受函数最大运行时长限制 |
| 并发连接 | 每个连接启动独立实例,受账户并发配额限制 |
| 浏览器连接数限制 | 同一域名下浏览器对 SSE 连接数有限制(通常为 6 个) |
| 数据格式 | 只能传输文本数据,复杂数据需序列化为 JSON |
操作步骤
步骤1:编写服务端代码
SSE 协议默认支持,无需在控制台开启。根据您使用的编程语言和框架,编写 SSE 服务端代码。
- Node.js (Express)
- Node.js (Koa)
- Python (Flask)
- Python (FastAPI)
安装依赖
安装 Express 框架:
npm install express
在 package.json 中添加依赖:
{
"dependencies": {
"express": "^4.18.0"
}
}
编写服务端代码
使用 Express 实现 SSE 流式响应:
const express = require('express');
const app = express();
// SSE 路由
app.get('/stream', (req, res) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
console.log('新的 SSE 连接建立');
// 流式推送数据
const msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!'];
let index = 0;
const intervalId = setInterval(() => {
if (index < msg.length) {
// 发送 SSE 消息
const data = {
id: index,
content: msg[index],
};
res.write(`data: ${JSON.stringify(data)}\n\n`);
index++;
} else {
// 完成推送,关闭连接
clearInterval(intervalId);
res.end();
}
}, 1000);
// 客户端断开连接时清理
req.on('close', () => {
clearInterval(intervalId);
console.log('SSE 连接已关闭');
});
});
// 启动服务,监听 9000 端口
app.listen(9000, () => {
console.log('SSE 服务已启动,监听端口 9000');
});
安装依赖
安装 Koa 框架:
npm install koa koa-router
编写服务端代码
使用 Koa 实现 SSE 流式响应:
const Koa = require('koa');
const Router = require('koa-router');
const app = new Koa();
const router = new Router();
router.get('/stream', async (ctx) => {
// 设置 SSE 响应头
ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
console.log('新的 SSE 连接建立');
const msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!'];
// 创建可读流
const stream = new require('stream').PassThrough();
ctx.body = stream;
let index = 0;
const intervalId = setInterval(() => {
if (index < msg.length) {
const data = {
id: index,
content: msg[index],
};
stream.write(`data: ${JSON.stringify(data)}\n\n`);
index++;
} else {
clearInterval(intervalId);
stream.end();
}
}, 1000);
// 客户端断开连接时清理
ctx.req.on('close', () => {
clearInterval(intervalId);
console.log('SSE 连接已关闭');
});
});
app.use(router.routes()).use(router.allowedMethods());
app.listen(9000, () => {
console.log('SSE 服务已启动,监听端口 9000');
});
安装依赖
在 requirements.txt 中添加 Flask:
Flask
编写服务端代码
使用 Flask 实现 SSE 流式响应:
import json
import time
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/stream')
def stream_data():
"""SSE 流式推送"""
msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!']
def generate_response_data():
for i, word in enumerate(msg):
json_data = json.dumps({'id': i, 'content': word})
yield f"data: {json_data}\n\n"
time.sleep(1)
return Response(
stream_with_context(generate_response_data()),
mimetype="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
安装依赖
在 requirements.txt 中添加 FastAPI 和 uvicorn:
fastapi
uvicorn
编写服务端代码
使用 FastAPI 实现 SSE 流式响应:
import json
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get('/stream')
async def stream_data():
"""SSE 流式推送"""
async def generate_response_data():
msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!']
for i, word in enumerate(msg):
json_data = json.dumps({'id': i, 'content': word})
yield f"data: {json_data}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
generate_response_data(),
media_type="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=9000)
步骤2:部署云函数
完成代码编写后,部署云函数:
- 在控制台点击「部署」按钮
- 或使用 CloudBase CLI:
tcb fn deploy --httpFn
步骤3:客户端连接测试
使用浏览器或命令行工具连接 SSE 服务:
使用 curl 测试
curl -v -H 'Accept:text/event-stream' https://your-function.run.tcloudbase.com/stream
预期返回结果
服务器将以流的形式分块返回数据,每条数据间隔约 1 秒:
data: {"id": 0, "content": "SSE"}
data: {"id": 1, "content": "empowering"}
data: {"id": 2, "content": "GPT"}
...
使用浏览器 JavaScript 测试
// 创建 SSE 连接
const eventSource = new EventSource('https://your-function.run.tcloudbase.com/stream');
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
try {
const data = JSON.parse(event.data);
console.log('解析数据:', data);
} catch (e) {
// 处理非 JSON 数据
}
};
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
};