用 Cloudbase 数据库 watch 做实时通知
一句话定义:在已经接入 add-database-wechat-miniprogram 的小程序里,用
db.collection().watch()订阅一个查询的结果集,把订单状态推送、消息提醒、在线状态同步这类「服务端变了前端立刻看到」的场景做出来,核心是把onChange的docChanges用对、把closer.close()放对地方。预计耗时:35 分钟 | 难度:进阶
适用场景
11 篇里的 add-database-wechat-miniprogram 教了 watch 的基础姿势,这篇接着讲怎么把它用到具体场景里。
- 适用:订单状态实时刷新、单聊 / 群聊消息推送、协同编辑同步、在线状态展示
- 适用:小程序前端 + 独立 Cloudbase 环境(
@cloudbase/js-sdk+@cloudbase/adapter-wx_mp) - 不适用:跨设备千万级并发的强实时(IM 这个量级建议上专门的 IM 服务,本文方案在中小规模 OK)
- 不适用:延迟必须严格小于 100ms 的场景。
watch协议层基于长连接,典型延迟在百毫秒到秒级 - 不适用:频繁触发数据库写入的瞬时累加(高频写会拖累
watch推送,极端场景把累加挪到云函数里聚合)
环境要求
| 依赖 | 版本 |
|---|---|
@cloudbase/js-sdk | 2.27.3(已在 add-auth-wechat-miniprogram 装好) |
@cloudbase/adapter-wx_mp | 1.3.1 |
| 微信开发者工具 | ≥ 1.06.x |
另外需要:
- 已完成 add-auth-wechat-miniprogram 和 add-database-wechat-miniprogram,
auth.hasLoginState()返回 true,db.collection('xxx')能正常读写 - 控制台「数据库」里有目标集合,权限模式至少允许当前用户读
第一步:把 watch 的回调结构吃透
watch 的回调里两个字段含义不同,容易混:
| 字段 | 含义 | 典型用途 |
|---|---|---|
snapshot.docs | 当前查询匹配的全部文档(变更后的完整结果集) | 直接 setData 渲染整个列表 |
snapshot.docChanges | 本次变化的几条记录,带 dataType | 想做局部更新 / 弹通知 / 增量动画 |
docChanges[].dataType 取值:
init:首次推送,推过来的就是初始快照,数量等于where当前匹配的所有文档add:有一条新文档进入了查询结果(可能是新插入,也可能是别的字段变了让它进入了 where 范围)update:已有文档的内容被改了remove:文档被删了或字段变了让它脱离 where 范围
写应用层代码时记一条原则:列表全量重渲染用 snapshot.docs,弹通知 / 局部刷新 / 拆增量用 snapshot.docChanges,两者按需混用。
第二步:订单状态实时推送
场景:用户下单后,这条订单从「待支付 → 支付中 → 待发货 → 已发货」一路前进,前端等待页要随状态变化即时显示。
pages/order-detail/order-detail.js:
import { db } from '../../libs/cloudbase';
import { ensureLogin } from '../../libs/login';
Page({
data: {
order: null,
},
async onLoad({ orderId }) {
await ensureLogin();
this.orderId = orderId;
this.startWatchOrder();
},
startWatchOrder() {
this.watcher = db
.collection('orders')
.doc(this.orderId)
.watch({
onChange: (snapshot) => {
// 单文档监听,docs 长度只会是 0 或 1
const order = snapshot.docs[0] || null;
this.setData({ order });
// 每次变化都看一下增量,弹一下提示
for (const change of snapshot.docChanges) {
if (change.dataType === 'update') {
this.notifyStatusChange(change.doc.status);
}
}
},
onError: (err) => {
console.error('[watch order] error', err);
},
});
},
notifyStatusChange(status) {
const map = {
paid: '支付成功',
shipped: '商家已发货',
delivered: '订单已送达',
};
if (map[status]) {
wx.showToast({ title: map[status], icon: 'success' });
}
},
onUnload() {
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
},
});
要点:
- 单文档监听用
db.collection().doc(id).watch(),语义比where({_id: id})更直接 - 服务端
update后,前端这边经长连接收到推送,典型在百毫秒到秒级。比起前端轮询省接口次数,也少了「刚刷新数据其实没变」的尴尬 - 业务事件提示只看
update,不要在init里也弹通知,否则用户进页面那一瞬间会被弹一次「订单已支付」
第三步:简单消息推送
场景:对话页订阅当前 conversationId 下的消息集合,新消息 add 触发 UI 渲染。
pages/chat/chat.js:
Page({
data: {
conversationId: '',
messages: [],
},
async onLoad({ conversationId }) {
await ensureLogin();
this.setData({ conversationId });
this.watcher = db
.collection('messages')
.where({ conversationId })
.orderBy('createdAt', 'asc')
.watch({
onChange: (snapshot) => {
// 新消息列表全量推过来,直接刷新
this.setData({ messages: snapshot.docs });
// 看 docChanges 拿到本次新增的几条,做提示音 / 滚到底
const newOnes = snapshot.docChanges.filter(
(c) => c.dataType === 'add'
);
if (newOnes.length && this.data.messages.length > 0) {
// init 阶段不算「新消息」,只在已经有消息后再算
wx.vibrateShort({ type: 'light' });
this.scrollToBottom();
}
},
onError: (err) => {
console.error('[watch messages] error', err);
},
});
},
scrollToBottom() {
// ScrollView 滚到底部的实现,略
},
onUnload() {
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
},
});
发消息侧只是普通 add,不在这里讲:
await db.collection('messages').add({
conversationId,
content: '你好',
createdAt: db.serverDate(),
});
第四步:在线状态(简单版)
场景:展示「这个用户最近在线」。原理是客户端定时心跳 update 自己的 lastActive,他人 watch 这个字段判断时间差。
心跳侧(在 app.js 启动后开一个定时器):
// miniprogram/app.js
import { db } from './libs/cloudbase';
import { ensureLogin } from './libs/login';
App({
async onLaunch() {
await ensureLogin();
this.startHeartbeat();
},
startHeartbeat() {
const beat = async () => {
try {
await db.collection('user_status').doc(this.globalData.user.uid).set({
lastActive: db.serverDate(),
});
} catch (e) {
console.warn('[heartbeat] failed', e);
}
};
beat();
this.heartbeatTimer = setInterval(beat, 30 * 1000); // 30s 一次
},
onHide() {
// 进后台不发心跳,省点资源
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
},
onShow() {
if (!this.heartbeatTimer && this.globalData.user) {
this.startHeartbeat();
}
},
globalData: { user: null },
});
观察侧 watch 一组用户的 lastActive,前端比较时间差:
const ONLINE_THRESHOLD = 90 * 1000; // 90s 内算在线
this.watcher = db
.collection('user_status')
.where({ uid: db.command.in(this.peerUids) })
.watch({
onChange: (snapshot) => {
const now = Date.now();
const onlineMap = {};
for (const doc of snapshot.docs) {
const last = doc.lastActive?.getTime?.() || 0;
onlineMap[doc.uid] = now - last < ONLINE_THRESHOLD;
}
this.setData({ onlineMap });
},
});
注意点:
- 心跳间隔(30s)要小于「在线阈值」(90s),给一点容错。如果两者完全相等,心跳错过一次就会被判离线
lastActive用db.serverDate(),不要用客户端new Date()。客户端时钟可能不准,服务端时间统一- 这个方案是最简单版,在线人数到几百以上时换专门的 presence 系统,这里不展开
第五步:页面销毁一定要 close
watch 的连接是长连接,服务端会维护订阅状态,占用配额。前端如果忘了 close(),常见三种坑:
- 同一页面反复
onLoad(从详情返回再进),每次都建一个新 watcher,旧的还在,推送会重复 - 用户跳来跳去最终退出 App,大量 watcher 没释放,服务端连接数占用,触发限流
- 后台进程被微信回收,watcher 实际已断开,但前端
this.watcher引用还在,以为还能用
固定写法:
Page({
onLoad() {
this.startWatch();
},
startWatch() {
// 进来先关掉旧的(如果有)
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
this.watcher = db.collection('xxx').where({...}).watch({
onChange: (snapshot) => {
// ...
},
});
},
onUnload() {
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
},
});
组件用 attached / detached 同理。
第六步:已知限制
写代码前先把限制刻在脑子里,真出问题时少走弯路。
- 单连接订阅文档数有上限。控制台「环境配置 → 配额」可以看具体数字。一个
watch最多订阅多少文档,以当前文档为准,超过了onChange不会触发 - wifi 切到 4G 等网络切换会触发短暂断连,SDK 内部会自动重连,业务侧基本无感,但偶发会有 1-2 秒推送延迟
- 高频写入(每秒几十次以上)的集合,
watch推送会被 合并 / 节流,不要把它当事件总线用 watch不支持聚合查询(aggregate)。如果要监听聚合结果,把聚合放到云函数里跑,前端watch一个「结果缓存集合」,云函数算完写进去
运行验证
- 微信开发者工具编译,打开 Console
- 跑「订单状态实时推送」例子,先在控制台手工把
orders里某条status字段改一次,小程序页面应该立刻看到新状态 + 弹 toast - 跑「消息推送」例子,在控制台 add 一条
messages记录,前端应该立刻看到新消息出现 - 退出页面 → Console 不应再有 watch 推送日志(没关掉 close 的话会继续推)
- 多次进出同一页面,只应该看到一份 watcher 在工作,不会有重复推送
常见错误
| 错误现象 | 原因 | 修复 |
|---|---|---|
onChange 完全不触发 | 集合权限不允许当前用户读 | 控制台「数据库 → 集合 → 权限」检查模式;watch 受权限约束 |
onChange 一直推全量,前端列表抖动 | 没区分 init 和 add,把所有 docChanges 都当新事件 | 见第三步,用 messages.length > 0 之类的状态过滤 |
| 同一页面进出 N 次后推送累积 N 倍 | 旧 watcher 没 close() | 见第五步固定写法,在 startWatch 开头先关一次 |
onError 频繁触发,提示订阅数超限 | 单连接订阅文档数超上限 | 减少 watch 范围,或者把列表分页 watch(只监听当前可见的几条) |
| 网络切换后短暂掉线 | wifi → 4G 切换 SDK 会重连 | SDK 内部自动恢复,业务侧不需要处理 |
偶发收到一条 dataType: 'remove' 但其实没删 | 文档字段变了,脱离了 where 条件 | 业务上把它当作「不再关心的文档」处理就行,不要假设是真删除 |
错误码定义参考 error-code。
相关文档
- 实时推送 / watch 协议层 —
watch的回调结构和 close 语义 - Web SDK 数据库 API —
collection / where / watch完整签名 - add-database-wechat-miniprogram — 前置:数据库基础读写
- add-auth-wechat-miniprogram — 前置:登录接入
下一步
- 让消息推送同时触达「不在小程序内」的用户:add-subscribe-message-cloud-function
- 在线状态写入收口到云函数:secure-database-multi-tenant-rules
- 群聊场景的多租户隔离:secure-database-multi-tenant-rules