SSE 协议支持
本文介绍如何在 Web 云函数中实现 SSE(Server-Sent Events)服务端推送,支持服务端向客户端单向推送实时数据流。
什么是 SSE
「SSE(Server-Sent Events)」是一种服务器向客户端推送实时数据的技术,基于 HTTP 协议实现。与 WebSocket 不同,SSE 是单向通信,只支持服务端向客户端推送数据。
主要特点:
- 单向推送:服务端主动向客户端推送数据,客户端只能接收
- 基于 HTTP:使用标准 HTTP 协议,兼容性好,易于实现
- 自动重连:客户端断线后会自动重新连接
- 文本格式:传输的数据为文本格式(通常是 JSON)
- 轻量实现:相比 WebSocket,实现更简单,资源占用更少
典型应用场景:
- AI 对话流式输出
- 实时日志推送
- 进度更新通知
- 服务端状态监控
- 实时数据看板
工作原理
协议启用
SSE 协议在 Web 云函数中默认支持,无需在控制台进行任何额外配置即可使用。
建立连接
- 客户端通过标准 HTTP 请求建立 SSE 连接
- 服务端返回
Content-Type: text/event-stream响应头 - 连接保持打开状态,服务端持续推送数据
连接生命周期
- 调用对应关系:一次 SSE 连接的生命周期等同于一次函数调用请求
- 连接建立 = 请求发起
- 连接断开 = 请求结束
- 实例映射:函数实例与 SSE 连接是一一对应的,同一实例在某一时刻仅处理一个 SSE 连接,新连接会启动新的实例
- 连接保持:连接建立后,实例持续运行,通过流式响应推送数据
- 连接结束:当 SSE 连接断开或服务端调用
end()时,对应的函数实例停止运行
使用限制
在使用 SSE 时,需要注意以下限制:
| 限制项 | 说明 |
|---|---|
| 执行超时时间 | 连接持续时间受函数最大运行时长限制 |
| 并发连接 | 每个连接启动独立实例,受账户并发配额限制 |
| 浏览器连接数限制 | 同一域名下浏览器对 SSE 连接数有限制(通常为 6 个) |
| 数据格式 | 只能传输文本数据,复杂数据需序列化为 JSON |
操作步骤
步骤1:编写服务端代码
SSE 协议默认支持,无需在控制台开启。根据您使用的编程语言和框架,编写 SSE 服务端代码。
- Node.js (Express)
- Node.js (Koa)
- Python (Flask)
- Python (FastAPI)
安装依赖
安装 Express 框架:
npm install express
在 package.json 中添加依赖:
{
"dependencies": {
"express": "^4.18.0"
}
}
编写服务端代码
使用 Express 实现 SSE 流式响应:
const express = require('express');
const app = express();
// SSE 路由
app.get('/stream', (req, res) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
console.log('新的 SSE 连接建立');
// 流式推送数据
const msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!'];
let index = 0;
const intervalId = setInterval(() => {
if (index < msg.length) {
// 发送 SSE 消息
const data = {
id: index,
content: msg[index],
};
res.write(`data: ${JSON.stringify(data)}\n\n`);
index++;
} else {
// 完成推送,关闭连接
clearInterval(intervalId);
res.end();
}
}, 1000);
// 客户端断开连接时清理
req.on('close', () => {
clearInterval(intervalId);
console.log('SSE 连接已关闭');
});
});
// 启动服务,监听 9000 端口
app.listen(9000, () => {
console.log('SSE 服务已启动,监听端口 9000');
});
安装依赖
安装 Koa 框架:
npm install koa koa-router
编写服务端代码
使用 Koa 实现 SSE 流式响应:
const Koa = require('koa');
const Router = require('koa-router');
const app = new Koa();
const router = new Router();
router.get('/stream', async (ctx) => {
// 设置 SSE 响应头
ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
console.log('新的 SSE 连接建立');
const msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!'];
// 创建可读流
const stream = new require('stream').PassThrough();
ctx.body = stream;
let index = 0;
const intervalId = setInterval(() => {
if (index < msg.length) {
const data = {
id: index,
content: msg[index],
};
stream.write(`data: ${JSON.stringify(data)}\n\n`);
index++;
} else {
clearInterval(intervalId);
stream.end();
}
}, 1000);
// 客户端断开连接时清理
ctx.req.on('close', () => {
clearInterval(intervalId);
console.log('SSE 连接已关闭');
});
});
app.use(router.routes()).use(router.allowedMethods());
app.listen(9000, () => {
console.log('SSE 服务已启动,监听端口 9000');
});
安装依赖
在 requirements.txt 中添加 Flask:
Flask
编写服务端代码
使用 Flask 实现 SSE 流式响应:
import json
import time
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/stream')
def stream_data():
"""SSE 流式推送"""
msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!']
def generate_response_data():
for i, word in enumerate(msg):
json_data = json.dumps({'id': i, 'content': word})
yield f"data: {json_data}\n\n"
time.sleep(1)
return Response(
stream_with_context(generate_response_data()),
mimetype="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
安装依赖
在 requirements.txt 中添加 FastAPI 和 uvicorn:
fastapi
uvicorn
编写服务端代码
使用 FastAPI 实现 SSE 流式响应:
import json
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get('/stream')
async def stream_data():
"""SSE 流式推送"""
async def generate_response_data():
msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!']
for i, word in enumerate(msg):
json_data = json.dumps({'id': i, 'content': word})
yield f"data: {json_data}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
generate_response_data(),
media_type="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=9000)
步骤2:部署云函数
完成代码编写后,部署云函数:
- 在控制台点击「部署」按钮
- 或使用 CloudBase CLI:
tcb fn deploy --httpFn
步骤3:客户端连接测试
使用浏览器或命令行工具连接 SSE 服务:
使用 curl 测试
curl -v -H 'Accept:text/event-stream' https://your-function.run.tcloudbase.com/stream
预期返回结果
服务器将以流的形式分块返回数据,每条数据间隔约 1 秒:
data: {"id": 0, "content": "SSE"}
data: {"id": 1, "content": "empowering"}
data: {"id": 2, "content": "GPT"}
...
使用浏览器 JavaScript 测试
// 创建 SSE 连接
const eventSource = new EventSource('https://your-function.run.tcloudbase.com/stream');
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
try {
const data = JSON.parse(event.data);
console.log('解析数据:', data);
} catch (e) {
// 处理非 JSON 数据
}
};
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
};
高级用法
1. AI 对话流式输出
实现类似 ChatGPT 的流式对话效果:
- Node.js (Express)
- Python (Flask)
const express = require('express');
const app = express();
app.use(express.json());
// AI 对话流式输出
app.post('/chat', async (req, res) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const { message } = req.body;
// 发送开始事件
res.write(`event: start\ndata: ${JSON.stringify({ status: '开始生成回复' })}\n\n`);
// 模拟 AI 生成回复(实际应调用 AI 模型 API)
const reply = '这是 AI 生成的回复内容,会逐字推送给用户。';
for (let i = 0; i < reply.length; i++) {
const data = {
content: reply[i],
index: i,
};
res.write(`event: message\ndata: ${JSON.stringify(data)}\n\n`);
// 模拟生成延迟
await new Promise((resolve) => setTimeout(resolve, 50));
}
// 发送完成事件
res.write(
`event: done\ndata: ${JSON.stringify({ status: '生成完成', totalLength: reply.length })}\n\n`
);
res.end();
});
app.listen(9000);
import json
import time
from flask import Flask, Response, request, stream_with_context
app = Flask(__name__)
@app.route('/chat', methods=['POST'])
def chat():
"""AI 对话流式输出"""
data = request.get_json()
message = data.get('message', '')
def generate_chat_response():
# 发送开始事件
yield f"event: start\ndata: {json.dumps({'status': '开始生成回复'})}\n\n"
# 模拟 AI 生成回复
reply = "这是 AI 生成的回复内容,会逐字推送给用户。"
for i, char in enumerate(reply):
data = {'content': char, 'index': i}
yield f"event: message\ndata: {json.dumps(data)}\n\n"
time.sleep(0.05)
# 发送完成事件
yield f"event: done\ndata: {json.dumps({'status': '生成完成', 'totalLength': len(reply)})}\n\n"
return Response(
stream_with_context(generate_chat_response()),
mimetype="text/event-stream"
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
2. 实时进度推送
推送任务执行进度:
- Node.js (Express)
- Python (Flask)
const express = require('express');
const app = express();
app.get('/progress', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const totalSteps = 10;
for (let step = 1; step <= totalSteps; step++) {
const data = {
step,
total: totalSteps,
percent: Math.round((step / totalSteps) * 100),
message: `正在处理第 ${step} 步...`,
};
res.write(`event: progress\ndata: ${JSON.stringify(data)}\n\n`);
// 模拟任务执行
await new Promise((resolve) => setTimeout(resolve, 1000));
}
// 任务完成
res.write(
`event: complete\ndata: ${JSON.stringify({ status: '任务完成', timestamp: Date.now() })}\n\n`
);
res.end();
});
app.listen(9000);
import json
import time
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/progress')
def progress():
"""实时进度推送"""
def generate_progress():
total_steps = 10
for step in range(1, total_steps + 1):
data = {
'step': step,
'total': total_steps,
'percent': round((step / total_steps) * 100),
'message': f'正在处理第 {step} 步...'
}
yield f"event: progress\ndata: {json.dumps(data)}\n\n"
time.sleep(1)
# 任务完成
yield f"event: complete\ndata: {json.dumps({'status': '任务完成', 'timestamp': int(time.time() * 1000)})}\n\n"
return Response(
stream_with_context(generate_progress()),
mimetype="text/event-stream"
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
3. 实时日志推送
推送服务端日志到客户端:
- Node.js (Express)
- Python (Flask)
const express = require('express');
const app = express();
// 模拟日志队列
const logQueue = [];
// 模拟生成日志
setInterval(() => {
logQueue.push({
level: ['info', 'debug', 'warn'][Math.floor(Math.random() * 3)],
message: `日志消息 ${Date.now()}`,
timestamp: Date.now(),
});
// 限制队列大小
if (logQueue.length > 100) {
logQueue.shift();
}
}, 2000);
app.get('/logs', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
console.log('开始推送日志');
// 定期推送日志
const intervalId = setInterval(() => {
if (logQueue.length > 0) {
const logs = logQueue.splice(0, 5); // 每次推送最多 5 条
logs.forEach((log) => {
res.write(`event: log\ndata: ${JSON.stringify(log)}\n\n`);
});
}
}, 1000);
// 客户端断开时清理
req.on('close', () => {
clearInterval(intervalId);
console.log('停止推送日志');
});
});
app.listen(9000);
import json
import time
import random
from flask import Flask, Response, stream_with_context
from collections import deque
app = Flask(__name__)
# 模拟日志队列
log_queue = deque(maxlen=100)
def generate_logs():
"""模拟生成日志"""
while True:
log_queue.append({
'level': random.choice(['info', 'debug', 'warn']),
'message': f'日志消息 {int(time.time() * 1000)}',
'timestamp': int(time.time() * 1000)
})
time.sleep(2)
# 在后台线程生成日志
import threading
threading.Thread(target=generate_logs, daemon=True).start()
@app.route('/logs')
def logs():
"""实时日志推送"""
def generate_log_stream():
while True:
if len(log_queue) > 0:
# 每次推送最多 5 条
logs_to_send = [log_queue.popleft() for _ in range(min(5, len(log_queue)))]
for log in logs_to_send:
yield f"event: log\ndata: {json.dumps(log)}\n\n"
time.sleep(1)
return Response(
stream_with_context(generate_log_stream()),
mimetype="text/event-stream"
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
4. 服务端状态监控
实时推送服务器状态指标:
- Node.js (Express)
- Python (Flask)
const express = require('express');
const app = express();
app.get('/metrics', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Monitoring-Session', `session-${Date.now()}`);
console.log('开始推送监控数据');
// 定期推送监控数据
const intervalId = setInterval(() => {
const metrics = {
cpu: Math.random() * 100,
memory: Math.random() * 100,
activeConnections: Math.floor(Math.random() * 1000),
timestamp: Date.now(),
};
res.write(`event: metrics\ndata: ${JSON.stringify(metrics)}\n\n`);
}, 2000);
// 5 分钟后超时
const timeoutId = setTimeout(() => {
clearInterval(intervalId);
res.write(`event: timeout\ndata: ${JSON.stringify({ message: '监控会话超时' })}\n\n`);
res.end();
}, 300000);
// 客户端断开时清理
req.on('close', () => {
clearInterval(intervalId);
clearTimeout(timeoutId);
console.log('停止推送监控数据');
});
});
app.listen(9000);
import json
import time
import random
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/metrics')
def metrics():
"""服务端状态监控"""
def generate_metrics():
start_time = time.time()
while True:
# 5 分钟后超时
if time.time() - start_time > 300:
yield f"event: timeout\ndata: {json.dumps({'message': '监控会话超时'})}\n\n"
break
metrics_data = {
'cpu': random.random() * 100,
'memory': random.random() * 100,
'activeConnections': random.randint(0, 1000),
'timestamp': int(time.time() * 1000)
}
yield f"event: metrics\ndata: {json.dumps(metrics_data)}\n\n"
time.sleep(2)
return Response(
stream_with_context(generate_metrics()),
mimetype="text/event-stream",
headers={
'X-Monitoring-Session': f'session-{int(time.time() * 1000)}'
}
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
客户端连接示例
- 浏览器
- React
- Node.js 客户端
// 创建 SSE 连接
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/stream');
// 监听默认消息事件
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
try {
const data = JSON.parse(event.data);
console.log('解析的数据:', data);
} catch (e) {
// 处理非 JSON 数据
}
};
// 监听自定义事件
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`进度: ${data.percent}%`);
});
eventSource.addEventListener('done', (event) => {
console.log('任务完成');
eventSource.close(); // 关闭连接
});
// 连接打开
eventSource.onopen = () => {
console.log('SSE 连接已建立');
};
// 错误处理
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
// 可选择关闭连接
// eventSource.close();
};
// 手动关闭连接
function closeConnection() {
eventSource.close();
console.log('SSE 连接已关闭');
}
import { useEffect, useState } from 'react';
function StreamingChat() {
const [messages, setMessages] = useState([]);
const [currentMessage, setCurrentMessage] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const startChat = () => {
setIsStreaming(true);
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/chat');
eventSource.addEventListener('start', (event) => {
console.log('开始接收消息');
setCurrentMessage('');
});
eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
setCurrentMessage((prev) => prev + data.content);
});
eventSource.addEventListener('done', (event) => {
setMessages((prev) => [...prev, currentMessage]);
setCurrentMessage('');
setIsStreaming(false);
eventSource.close();
});
eventSource.onerror = (error) => {
console.error('连接错误:', error);
setIsStreaming(false);
eventSource.close();
};
// 组件卸载时清理
return () => {
eventSource.close();
};
};
return (
<div>
<button onClick={startChat} disabled={isStreaming}>
开始对话
</button>
<div>
{messages.map((msg, index) => (
<div key={index}>{msg}</div>
))}
{currentMessage && <div>{currentMessage}</div>}
</div>
</div>
);
}
const EventSource = require('eventsource');
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/stream');
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
try {
const data = JSON.parse(event.data);
console.log('解析的数据:', data);
} catch (e) {
// 处理非 JSON 数据
}
};
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`进度: ${data.percent}%`);
});
eventSource.onerror = (error) => {
console.error('错误:', error);
};
// 10 秒后关闭连接
setTimeout(() => {
eventSource.close();
console.log('连接已关闭');
}, 10000);
SSE 消息格式
SSE 协议使用特定的文本格式发送消息,每条消息由一个或多个字段组成:
event: message
data: {"content": "Hello"}
id: 123
retry: 1000
标准字段
| 字段 | 说明 | 是否必需 |
|---|---|---|
data | 消息数据内容 | 是 |
event | 事件类型(默认为 message) | 否 |
id | 事件 ID,用于客户端重连时恢复 | 否 |
retry | 重连间隔时间(毫秒) | 否 |
: 注释 | 注释行,客户端会忽略(用于保持连接) | 否 |
消息示例
基本消息
data: Hello World
JSON 数据
data: {"message": "Hello", "timestamp": 1234567890}
带事件类型
event: notification
data: {"type": "info", "content": "新消息"}
多行数据
data: 第一行
data: 第二行
data: 第三行
完整消息
event: update
id: 123
retry: 1000
data: {"status": "success"}
常见问题
1. 连接无法建立怎么办?
- 确认云函数已正确部署和运行
- 检查响应头是否设置了
Content-Type: text/event-stream - 确认使用正确的函数访问地址
- 查看云函数日志是否有错误信息
- 检查客户端浏览器是否支持 SSE(IE 不支持)
2. 连接频繁断开怎么办?
- 增加函数执行超时时间配置
- 实现心跳机制,定期发送注释行保持连接:
res.write(': keepalive\n\n') - 检查客户端网络是否稳定
- 查看云函数日志,确认是否有异常错误导致连接断开
- 客户端实现自动重连机制(EventSource 默认支持)
3. 如何实现断线重连?
EventSource 客户端会自动重连,服务端可以通过 id 和 retry 字段配合:
服务端发送 ID 和重连间隔
res.write(`id: ${messageId}\nretry: 3000\ndata: ${JSON.stringify(data)}\n\n`);
客户端获取最后的事件 ID
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/stream');
eventSource.onopen = () => {
console.log('连接已建立');
};
eventSource.onerror = () => {
console.log('连接断开,自动重连中...');
};
4. 如何处理大量并发连接?
- 实例扩展:云函数会自动为每个 SSE 连接创建独立实例
- 连接数限制:注意浏览器同域名连接数限制(通常为 6 个)
- 状态管理:使用 Redis 等外部存储共享连接状态
- 消息队列:使用消息队列处理广播和异步消息
- 资源监控:关注函数实例数量和资源使用情况
5. SSE 和 WebSocket 如何选择?
选择 SSE 的场景:
- 只需要服务端向客户端推送数据
- 追求简单实现和快速开发
- 需要自动重连功能
- 兼容性要求较高(HTTP 协议)
选择 WebSocket 的场景:
- 需要双向实时通信
- 需要传输二进制数据
- 需要更低的延迟和更高的效率
- 客户端需要频繁向服务端发送数据
6. 如何发送心跳保持连接?
定期发送注释行(以 : 开头)保持连接活跃:
const express = require('express');
const app = express();
app.get('/stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 每 30 秒发送心跳
const heartbeatInterval = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);
// 发送实际数据
const dataInterval = setInterval(() => {
res.write(`data: ${JSON.stringify({ timestamp: Date.now() })}\n\n`);
}, 5000);
req.on('close', () => {
clearInterval(heartbeatInterval);
clearInterval(dataInterval);
});
});
app.listen(9000);
SSE vs WebSocket
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方式 | 单向(服务端→客户端) | 双向(服务端↔客户端) |
| 协议 | HTTP | WebSocket 协议 |
| 数据格式 | 文本(通常 JSON) | 文本或二进制 |
| 自动重连 | 是 | 否(需手动实现) |
| 浏览器兼容性 | 好(IE 除外) | 好 |
| 实现复杂度 | 简单 | 相对复杂 |
| 资源占用 | 较低 | 较高 |
| 配置要求 | 无需配置 | 需要在控制台开启 |
| 适用场景 | 单向数据推送 | 实时双向通信 |
选择建议:
- 只需要服务端推送数据:选择 SSE
- 需要双向实时通信:选择 WebSocket
- 追求简单实现:选择 SSE
- 需要传输二进制数据:选择 WebSocket