流式输出
流式输出(Streaming)让大模型在生成过程中逐步返回文本,无需等待完整响应。用户可以实时看到内容生成,大幅提升交互体验。
工作原理
流式输出基于 SSE(Server-Sent Events) 协议。服务端在生成过程中持续推送数据块(chunk),每个 chunk 包含一小段新生成的文本:
data: {"choices":[{"delta":{"content":"你"}}]}
data: {"choices":[{"delta":{"content":"好"}}]}
data: {"choices":[{"delta":{"content":"!"}}]}
data: [DONE]
客户端逐个接收这些 chunk,拼接后展示给用户,实现"打字机"效果。
流式 vs 非流式
| 对比项 | 非流式(generateText) | 流式(streamText) |
|---|---|---|
| 响应方式 | 等待生成完毕后一次性返回 | 边生成边返回 |
| 首字延迟 | 较高(需等完整生成) | 极低(几百毫秒内出首字) |
| 用户体验 | 有明显等待感 | 实时反馈,体验流畅 |
| 适用场景 | 后端处理、批量任务 | 实时对话、长文本生成 |
快速开始
- CloudBase SDK
- OpenAI SDK
- cURL
- 小程序
const model = ai.createModel("cloudbase");
const res = await model.streamText({
model: "deepseek-v4-flash",
messages: [{ role: "user", content: "写一首关于春天的诗" }]
});
// 方式一:迭代文本流(推荐,仅获取文本内容)
for await (const text of res.textStream) {
process.stdout.write(text);
}
// 方式二:迭代数据流(获取完整响应信息)
for await (const data of res.dataStream) {
console.log(data.choices[0]?.delta?.content);
}
// 流结束后获取汇总信息
const messages = await res.messages;
const usage = await res.usage;
const OpenAI = require("openai");
const client = new OpenAI({
apiKey: "<YOUR_API_KEY>",
baseURL: "https://<ENV_ID>.api.tcloudbasegateway.com/v1/ai/cloudbase"
});
const stream = await client.chat.completions.create({
model: "deepseek-v4-flash",
messages: [{ role: "user", content: "写一首关于春天的诗" }],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || "";
process.stdout.write(content);
}
curl -X POST 'https://<ENV_ID>.api.tcloudbasegateway.com/v1/ai/cloudbase/chat/completions' \
-H 'Authorization: Bearer <YOUR_API_KEY>' \
-H 'Content-Type: application/json' \
-d '{
"model": "deepseek-v4-flash",
"messages": [{"role": "user", "content": "写一首关于春天的诗"}],
"stream": true
}'
响应格式(SSE):
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"春"},"index":0}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"风"},"index":0}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"拂面"},"index":0}]}
...
data: [DONE]
const model = wx.cloud.extend.AI.createModel("cloudbase");
const res = await model.streamText({
data: {
model: "deepseek-v4-flash",
messages: [{ role: "user", content: "写一首关于春天的诗" }]
}
});
let fullText = "";
for await (const text of res.textStream) {
fullText += text;
this.setData({ reply: fullText });
}
前端消费流式数据
Web 应用中的流式渲染
在 Web 应用中,推荐使用 CloudBase SDK 的 textStream 结合 DOM 更新:
import cloudbase from "@cloudbase/js-sdk";
const app = cloudbase.init({
env: "您的环境ID",
accessKey: "<YOUR_PUBLISHABLE_KEY>"
});
async function streamChat(userInput) {
const ai = app.ai();
const model = ai.createModel("cloudbase");
const res = await model.streamText({
model: "deepseek-v4-flash",
messages: [{ role: "user", content: userInput }]
});
const outputEl = document.getElementById("output");
let fullText = "";
for await (const text of res.textStream) {
fullText += text;
outputEl.textContent = fullText;
}
}
React 中的流式渲染
import { useState, useCallback } from "react";
function ChatComponent({ ai }) {
const [reply, setReply] = useState("");
const [loading, setLoading] = useState(false);
const sendMessage = useCallback(async (input) => {
setLoading(true);
setReply("");
const model = ai.createModel("cloudbase");
const res = await model.streamText({
model: "deepseek-v4-flash",
messages: [{ role: "user", content: input }]
});
let text = "";
for await (const chunk of res.textStream) {
text += chunk;
setReply(text);
}
setLoading(false);
}, [ai]);
return (
<div>
<div className="reply">{reply}</div>
{loading && <span className="cursor">▌</span>}
</div>
);
}
Markdown 流式渲染
大模型输出通常是 Markdown 格式。直接渲染未完成的 Markdown 会导致格式闪烁(如未关闭的代码块)。推荐方案:
import ReactMarkdown from "react-markdown";
import { useState } from "react";
function StreamMarkdown({ ai, input }) {
const [content, setContent] = useState("");
async function generate() {
const model = ai.createModel("cloudbase");
const res = await model.streamText({
model: "deepseek-v4-flash",
messages: [{ role: "user", content: input }]
});
let text = "";
for await (const chunk of res.textStream) {
text += chunk;
setContent(text);
}
}
return <ReactMarkdown>{content}</ReactMarkdown>;
}
提示
如果遇到渲染闪烁问题,可以设置一个小的缓冲区(如每收到 3-5 个 chunk 才更新一次 DOM),平衡实时性和渲染稳定性。
取消请求
用户点击"停止生成"时,需要中止流式请求:
使用 AbortController
const controller = new AbortController();
// 发起流式请求
const model = ai.createModel("cloudbase");
const res = await model.streamText({
model: "deepseek-v4-flash",
messages: [{ role: "user", content: "写一篇长文章" }],
signal: controller.signal // 传入 abort signal
});
// 用户点击"停止"
document.getElementById("stop-btn").onclick = () => {
controller.abort();
};
// 消费流时捕获中止
try {
for await (const text of res.textStream) {
outputEl.textContent += text;
}
} catch (err) {
if (err.name === "AbortError") {
console.log("用户取消了生成");
}
}
OpenAI SDK 中取消
const stream = await client.chat.completions.create({
model: "deepseek-v4-flash",
messages: [{ role: "user", content: "写一篇长文章" }],
stream: true
});
// 提前终止
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || "";
process.stdout.write(content);
if (shouldStop) {
stream.controller.abort();
break;
}
}
错误处理
流式请求可能在传输过程中断开,需要妥善处理:
async function streamWithRetry(messages, maxRetries = 3) {
const model = ai.createModel("cloudbase");
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const res = await model.streamText({
model: "deepseek-v4-flash",
messages
});
let fullText = "";
for await (const text of res.textStream) {
fullText += text;
}
return fullText;
} catch (err) {
if (attempt === maxRetries - 1) throw err;
// 等待后重试
await new Promise(r => setTimeout(r, 1000 * (attempt + 1)));
}
}
}
常见错误
| 错误类型 | 原因 | 处理建议 |
|---|---|---|
| 网络中断 | 网络不稳定 | 重试,或提示用户检查网络 |
| 超时 | 生成时间过长 | 增加 timeout 配置 |
| Token 超限 | 输入 + 输出超过模型上下文窗口 | 截断历史消息 |
| 429 限流 | 请求频率过高 | 指数退避重试 |
服务端流式转发
在服务端接收流式响应并转发给前端(如 Express / Koa):
const express = require("express");
const OpenAI = require("openai");
const app = express();
const client = new OpenAI({
apiKey: "<YOUR_API_KEY>",
baseURL: "https://<ENV_ID>.api.tcloudbasegateway.com/v1/ai/cloudbase"
});
app.post("/api/chat", async (req, res) => {
const { messages } = req.body;
// 设置 SSE 响应头
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const stream = await client.chat.completions.create({
model: "deepseek-v4-flash",
messages,
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.write("data: [DONE]\n\n");
res.end();
});