SSE Protocol Support
This document introduces how to implement SSE (Server-Sent Events) server push in Web Cloud Functions, supporting server-to-client unidirectional real-time data streaming.
What is SSE
SSE (Server-Sent Events) is a technology for servers to push real-time data to clients, based on the HTTP protocol. Unlike WebSocket, SSE is unidirectional communication, only supporting server-to-client data push.
Key Features:
- Unidirectional Push: Server actively pushes data to client, client can only receive
- HTTP-Based: Uses standard HTTP protocol, good compatibility, easy to implement
- Auto Reconnection: Client automatically reconnects after disconnection
- Text Format: Transmitted data is in text format (usually JSON)
- Lightweight Implementation: Compared to WebSocket, simpler implementation with less resource usage
Typical Use Cases:
- AI conversation streaming output
- Real-time log push
- Progress update notifications
- Server status monitoring
- Real-time data dashboards
How It Works
Protocol Enablement
SSE protocol is supported by default in Web Cloud Functions, no additional configuration needed in the console.
Establishing Connection
- Client establishes SSE connection through standard HTTP request
- Server returns
Content-Type: text/event-streamresponse header - Connection remains open, server continuously pushes data
Connection Lifecycle
- Invocation Correspondence: One SSE connection lifecycle equals one function invocation request
- Connection established = Request initiated
- Connection disconnected = Request ended
- Instance Mapping: Function instance and SSE connection are one-to-one, same instance only handles one SSE connection at a time, new connections start new instances
- Connection Persistence: After connection is established, instance continues running, pushing data through streaming response
- Connection Termination: When SSE connection is disconnected or server calls
end(), the corresponding function instance stops running
Usage Limitations
When using SSE, note the following limitations:
| Limitation Item | Description |
|---|---|
| Execution Timeout | Connection duration is limited by function maximum runtime |
| Concurrent Connections | Each connection starts independent instance, limited by account concurrency quota |
| Browser Connection Limit | Browsers have connection limits per domain for SSE (usually 6) |
| Data Format | Can only transmit text data, complex data needs JSON serialization |
Operation Steps
Step 1: Write Server Code
SSE protocol is supported by default, no need to enable in console. Write SSE server code according to your programming language and framework.
- Node.js (Express)
- Node.js (Koa)
- Python (Flask)
- Python (FastAPI)
Install Dependencies
Install Express framework:
npm install express
Add dependency in package.json:
{
"dependencies": {
"express": "^4.18.0"
}
}
Write Server Code
Implement SSE streaming response using Express:
const express = require('express');
const app = express();
// SSE route
app.get('/stream', (req, res) => {
// Set SSE response headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
console.log('New SSE connection established');
// Stream push data
const msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!'];
let index = 0;
const intervalId = setInterval(() => {
if (index < msg.length) {
// Send SSE message
const data = {
id: index,
content: msg[index],
};
res.write(`data: ${JSON.stringify(data)}\n\n`);
index++;
} else {
// Complete push, close connection
clearInterval(intervalId);
res.end();
}
}, 1000);
// Cleanup when client disconnects
req.on('close', () => {
clearInterval(intervalId);
console.log('SSE connection closed');
});
});
// Start service, listen on port 9000
app.listen(9000, () => {
console.log('SSE service started, listening on port 9000');
});
Install Dependencies
Install Koa framework:
npm install koa koa-router
Write Server Code
Implement SSE streaming response using Koa:
const Koa = require('koa');
const Router = require('koa-router');
const app = new Koa();
const router = new Router();
router.get('/stream', async (ctx) => {
// Set SSE response headers
ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
console.log('New SSE connection established');
const msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!'];
// Create readable stream
const stream = new require('stream').PassThrough();
ctx.body = stream;
let index = 0;
const intervalId = setInterval(() => {
if (index < msg.length) {
const data = {
id: index,
content: msg[index],
};
stream.write(`data: ${JSON.stringify(data)}\n\n`);
index++;
} else {
clearInterval(intervalId);
stream.end();
}
}, 1000);
// Cleanup when client disconnects
ctx.req.on('close', () => {
clearInterval(intervalId);
console.log('SSE connection closed');
});
});
app.use(router.routes()).use(router.allowedMethods());
app.listen(9000, () => {
console.log('SSE service started, listening on port 9000');
});
Install Dependencies
Add Flask in requirements.txt:
Flask
Write Server Code
Implement SSE streaming response using Flask:
import json
import time
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/stream')
def stream_data():
"""SSE streaming push"""
msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!']
def generate_response_data():
for i, word in enumerate(msg):
json_data = json.dumps({'id': i, 'content': word})
yield f"data: {json_data}\n\n"
time.sleep(1)
return Response(
stream_with_context(generate_response_data()),
mimetype="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
Install Dependencies
Add FastAPI and uvicorn in requirements.txt:
fastapi
uvicorn
Write Server Code
Implement SSE streaming response using FastAPI:
import json
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get('/stream')
async def stream_data():
"""SSE streaming push"""
async def generate_response_data():
msg = ['SSE', 'empowering', 'GPT', 'applications', '!', 'Happy', 'chatting', '!']
for i, word in enumerate(msg):
json_data = json.dumps({'id': i, 'content': word})
yield f"data: {json_data}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
generate_response_data(),
media_type="text/event-stream",
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=9000)
Step 2: Deploy Cloud Function
After completing code writing, deploy the cloud function:
- Click "Deploy" button in the console
- Or use CloudBase CLI:
tcb fn deploy --httpFn
Step 3: Client Connection Test
Connect to SSE service using browser or command-line tools:
Test with curl
curl -v -H 'Accept:text/event-stream' https://your-function.run.tcloudbase.com/stream
Expected Response
The server will return data in chunks as a stream, with each data item spaced about 1 second apart:
data: {"id": 0, "content": "SSE"}
data: {"id": 1, "content": "empowering"}
data: {"id": 2, "content": "GPT"}
...
Test with Browser JavaScript
// Create SSE connection
const eventSource = new EventSource('https://your-function.run.tcloudbase.com/stream');
eventSource.onmessage = (event) => {
console.log('Received message:', event.data);
try {
const data = JSON.parse(event.data);
console.log('Parsed data:', data);
} catch (e) {
// Handle non-JSON data
}
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
};
Advanced Usage
1. AI Conversation Streaming Output
Implement streaming conversation similar to ChatGPT:
- Node.js (Express)
- Python (Flask)
const express = require('express');
const app = express();
app.use(express.json());
// AI conversation streaming output
app.post('/chat', async (req, res) => {
// Set SSE response headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const { message } = req.body;
// Send start event
res.write(`event: start\ndata: ${JSON.stringify({ status: 'Start generating reply' })}\n\n`);
// Simulate AI generating reply (should call AI model API in practice)
const reply = 'This is AI-generated reply content, will be pushed to users word by word.';
for (let i = 0; i < reply.length; i++) {
const data = {
content: reply[i],
index: i,
};
res.write(`event: message\ndata: ${JSON.stringify(data)}\n\n`);
// Simulate generation delay
await new Promise((resolve) => setTimeout(resolve, 50));
}
// Send completion event
res.write(
`event: done\ndata: ${JSON.stringify({ status: 'Generation complete', totalLength: reply.length })}\n\n`
);
res.end();
});
app.listen(9000);
import json
import time
from flask import Flask, Response, request, stream_with_context
app = Flask(__name__)
@app.route('/chat', methods=['POST'])
def chat():
"""AI conversation streaming output"""
data = request.get_json()
message = data.get('message', '')
def generate_chat_response():
# Send start event
yield f"event: start\ndata: {json.dumps({'status': 'Start generating reply'})}\n\n"
# Simulate AI generating reply
reply = "This is AI-generated reply content, will be pushed to users word by word."
for i, char in enumerate(reply):
data = {'content': char, 'index': i}
yield f"event: message\ndata: {json.dumps(data)}\n\n"
time.sleep(0.05)
# Send completion event
yield f"event: done\ndata: {json.dumps({'status': 'Generation complete', 'totalLength': len(reply)})}\n\n"
return Response(
stream_with_context(generate_chat_response()),
mimetype="text/event-stream"
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
2. Real-time Progress Push
Push task execution progress:
- Node.js (Express)
- Python (Flask)
const express = require('express');
const app = express();
app.get('/progress', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const totalSteps = 10;
for (let step = 1; step <= totalSteps; step++) {
const data = {
step,
total: totalSteps,
percent: Math.round((step / totalSteps) * 100),
message: `Processing step ${step}...`,
};
res.write(`event: progress\ndata: ${JSON.stringify(data)}\n\n`);
// Simulate task execution
await new Promise((resolve) => setTimeout(resolve, 1000));
}
// Task complete
res.write(
`event: complete\ndata: ${JSON.stringify({ status: 'Task completed', timestamp: Date.now() })}\n\n`
);
res.end();
});
app.listen(9000);
import json
import time
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/progress')
def progress():
"""Real-time progress push"""
def generate_progress():
total_steps = 10
for step in range(1, total_steps + 1):
data = {
'step': step,
'total': total_steps,
'percent': round((step / total_steps) * 100),
'message': f'Processing step {step}...'
}
yield f"event: progress\ndata: {json.dumps(data)}\n\n"
time.sleep(1)
# Task complete
yield f"event: complete\ndata: {json.dumps({'status': 'Task completed', 'timestamp': int(time.time() * 1000)})}\n\n"
return Response(
stream_with_context(generate_progress()),
mimetype="text/event-stream"
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
3. Real-time Log Push
Push server logs to client:
- Node.js (Express)
- Python (Flask)
const express = require('express');
const app = express();
// Simulate log queue
const logQueue = [];
// Simulate log generation
setInterval(() => {
logQueue.push({
level: ['info', 'debug', 'warn'][Math.floor(Math.random() * 3)],
message: `Log message ${Date.now()}`,
timestamp: Date.now(),
});
// Limit queue size
if (logQueue.length > 100) {
logQueue.shift();
}
}, 2000);
app.get('/logs', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
console.log('Start pushing logs');
// Periodically push logs
const intervalId = setInterval(() => {
if (logQueue.length > 0) {
const logs = logQueue.splice(0, 5); // Push max 5 logs each time
logs.forEach((log) => {
res.write(`event: log\ndata: ${JSON.stringify(log)}\n\n`);
});
}
}, 1000);
// Cleanup when client disconnects
req.on('close', () => {
clearInterval(intervalId);
console.log('Stop pushing logs');
});
});
app.listen(9000);
import json
import time
import random
from flask import Flask, Response, stream_with_context
from collections import deque
app = Flask(__name__)
# Simulate log queue
log_queue = deque(maxlen=100)
def generate_logs():
"""Simulate log generation"""
while True:
log_queue.append({
'level': random.choice(['info', 'debug', 'warn']),
'message': f'Log message {int(time.time() * 1000)}',
'timestamp': int(time.time() * 1000)
})
time.sleep(2)
# Generate logs in background thread
import threading
threading.Thread(target=generate_logs, daemon=True).start()
@app.route('/logs')
def logs():
"""Real-time log push"""
def generate_log_stream():
while True:
if len(log_queue) > 0:
# Push max 5 logs each time
logs_to_send = [log_queue.popleft() for _ in range(min(5, len(log_queue)))]
for log in logs_to_send:
yield f"event: log\ndata: {json.dumps(log)}\n\n"
time.sleep(1)
return Response(
stream_with_context(generate_log_stream()),
mimetype="text/event-stream"
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
4. Server Status Monitoring
Push server status metrics in real-time:
- Node.js (Express)
- Python (Flask)
const express = require('express');
const app = express();
app.get('/metrics', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Monitoring-Session', `session-${Date.now()}`);
console.log('Start pushing monitoring data');
// Periodically push monitoring data
const intervalId = setInterval(() => {
const metrics = {
cpu: Math.random() * 100,
memory: Math.random() * 100,
activeConnections: Math.floor(Math.random() * 1000),
timestamp: Date.now(),
};
res.write(`event: metrics\ndata: ${JSON.stringify(metrics)}\n\n`);
}, 2000);
// Timeout after 5 minutes
const timeoutId = setTimeout(() => {
clearInterval(intervalId);
res.write(`event: timeout\ndata: ${JSON.stringify({ message: 'Monitoring session timeout' })}\n\n`);
res.end();
}, 300000);
// Cleanup when client disconnects
req.on('close', () => {
clearInterval(intervalId);
clearTimeout(timeoutId);
console.log('Stop pushing monitoring data');
});
});
app.listen(9000);
import json
import time
import random
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/metrics')
def metrics():
"""Server status monitoring"""
def generate_metrics():
start_time = time.time()
while True:
# Timeout after 5 minutes
if time.time() - start_time > 300:
yield f"event: timeout\ndata: {json.dumps({'message': 'Monitoring session timeout'})}\n\n"
break
metrics_data = {
'cpu': random.random() * 100,
'memory': random.random() * 100,
'activeConnections': random.randint(0, 1000),
'timestamp': int(time.time() * 1000)
}
yield f"event: metrics\ndata: {json.dumps(metrics_data)}\n\n"
time.sleep(2)
return Response(
stream_with_context(generate_metrics()),
mimetype="text/event-stream",
headers={
'X-Monitoring-Session': f'session-{int(time.time() * 1000)}'
}
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000)
Client Connection Examples
- Browser
- React
- Node.js Client
// Create SSE connection
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/stream');
// Listen for default message event
eventSource.onmessage = (event) => {
console.log('Received message:', event.data);
try {
const data = JSON.parse(event.data);
console.log('Parsed data:', data);
} catch (e) {
// Handle non-JSON data
}
};
// Listen for custom events
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`Progress: ${data.percent}%`);
});
eventSource.addEventListener('done', (event) => {
console.log('Task completed');
eventSource.close(); // Close connection
});
// Connection opened
eventSource.onopen = () => {
console.log('SSE connection established');
};
// Error handling
eventSource.onerror = (error) => {
console.error('SSE error:', error);
// Optionally close connection
// eventSource.close();
};
// Manually close connection
function closeConnection() {
eventSource.close();
console.log('SSE connection closed');
}
import { useEffect, useState } from 'react';
function StreamingChat() {
const [messages, setMessages] = useState([]);
const [currentMessage, setCurrentMessage] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const startChat = () => {
setIsStreaming(true);
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/chat');
eventSource.addEventListener('start', (event) => {
console.log('Start receiving messages');
setCurrentMessage('');
});
eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
setCurrentMessage((prev) => prev + data.content);
});
eventSource.addEventListener('done', (event) => {
setMessages((prev) => [...prev, currentMessage]);
setCurrentMessage('');
setIsStreaming(false);
eventSource.close();
});
eventSource.onerror = (error) => {
console.error('Connection error:', error);
setIsStreaming(false);
eventSource.close();
};
// Cleanup on component unmount
return () => {
eventSource.close();
};
};
return (
<div>
<button onClick={startChat} disabled={isStreaming}>
Start Chat
</button>
<div>
{messages.map((msg, index) => (
<div key={index}>{msg}</div>
))}
{currentMessage && <div>{currentMessage}</div>}
</div>
</div>
);
}
const EventSource = require('eventsource');
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/stream');
eventSource.onmessage = (event) => {
console.log('Received message:', event.data);
try {
const data = JSON.parse(event.data);
console.log('Parsed data:', data);
} catch (e) {
// Handle non-JSON data
}
};
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`Progress: ${data.percent}%`);
});
eventSource.onerror = (error) => {
console.error('Error:', error);
};
// Close connection after 10 seconds
setTimeout(() => {
eventSource.close();
console.log('Connection closed');
}, 10000);
SSE Message Format
SSE protocol uses a specific text format to send messages, each message consists of one or more fields:
event: message
data: {"content": "Hello"}
id: 123
retry: 1000
Standard Fields
| Field | Description | Required |
|---|---|---|
data | Message data content | Yes |
event | Event type (defaults to message) | No |
id | Event ID, used for client reconnection recovery | No |
retry | Reconnection interval (milliseconds) | No |
: comment | Comment line, ignored by client (keeps connection alive) | No |
Message Examples
Basic Message
data: Hello World
JSON Data
data: {"message": "Hello", "timestamp": 1234567890}
With Event Type
event: notification
data: {"type": "info", "content": "New message"}
Multi-line Data
data: First line
data: Second line
data: Third line
Complete Message
event: update
id: 123
retry: 1000
data: {"status": "success"}
FAQ
1. What if connection cannot be established?
- Confirm cloud function is correctly deployed and running
- Check if response headers set
Content-Type: text/event-stream - Confirm using correct function access address
- Check cloud function logs for error messages
- Check if client browser supports SSE (IE not supported)
2. What if connection frequently disconnects?
- Increase function execution timeout configuration
- Implement heartbeat mechanism, periodically send comment lines to keep connection alive:
res.write(': keepalive\n\n') - Check if client network is stable
- Check cloud function logs, confirm if there are errors causing disconnection
- Implement auto-reconnection mechanism on client (EventSource supports by default)
3. How to implement reconnection?
EventSource client automatically reconnects, server can cooperate with id and retry fields:
Server sends ID and reconnection interval
res.write(`id: ${messageId}\nretry: 3000\ndata: ${JSON.stringify(data)}\n\n`);
Client gets last event ID
const eventSource = new EventSource('https://your-service.run.tcloudbase.com/stream');
eventSource.onopen = () => {
console.log('Connection established');
};
eventSource.onerror = () => {
console.log('Connection disconnected, reconnecting...');
};
4. How to handle large concurrent connections?
- Instance Scaling: Cloud functions automatically create independent instances for each SSE connection
- Connection Limit: Note browser same-domain connection limit (usually 6)
- State Management: Use external storage like Redis to share connection state
- Message Queue: Use message queue for broadcast and async messages
- Resource Monitoring: Monitor function instance count and resource usage
5. How to choose between SSE and WebSocket?
Choose SSE when:
- Only need server-to-client data push
- Pursuing simple implementation and rapid development
- Need auto-reconnection feature
- High compatibility requirements (HTTP protocol)
Choose WebSocket when:
- Need bidirectional real-time communication
- Need to transmit binary data
- Need lower latency and higher efficiency
- Client needs to frequently send data to server
6. How to send heartbeat to keep connection alive?
Periodically send comment lines (starting with :) to keep connection active:
const express = require('express');
const app = express();
app.get('/stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Send heartbeat every 30 seconds
const heartbeatInterval = setInterval(() => {
res.write(': heartbeat\n\n');
}, 30000);
// Send actual data
const dataInterval = setInterval(() => {
res.write(`data: ${JSON.stringify({ timestamp: Date.now() })}\n\n`);
}, 5000);
req.on('close', () => {
clearInterval(heartbeatInterval);
clearInterval(dataInterval);
});
});
app.listen(9000);
SSE vs WebSocket
| Feature | SSE | WebSocket |
|---|---|---|
| Communication | Unidirectional (Server→Client) | Bidirectional (Server↔Client) |
| Protocol | HTTP | WebSocket protocol |
| Data Format | Text (usually JSON) | Text or binary |
| Auto Reconnection | Yes | No (needs manual implementation) |
| Browser Compatibility | Good (except IE) | Good |
| Implementation Complexity | Simple | Relatively complex |
| Resource Usage | Lower | Higher |
| Configuration | No configuration needed | Needs console enablement |
| Use Cases | Unidirectional data push | Real-time bidirectional communication |
Selection Recommendations:
- Only need server data push: Choose SSE
- Need bidirectional real-time communication: Choose WebSocket
- Pursuing simple implementation: Choose SSE
- Need to transmit binary data: Choose WebSocket