跳到主要内容

用 Cloudbase 数据库 watch 做实时通知

一句话定义:在已经接入 add-database-wechat-miniprogram 的小程序里,用 db.collection().watch() 订阅一个查询的结果集,把订单状态推送、消息提醒、在线状态同步这类「服务端变了前端立刻看到」的场景做出来,核心是把 onChangedocChanges 用对、把 closer.close() 放对地方。

预计耗时:35 分钟 | 难度:进阶

适用场景

11 篇里的 add-database-wechat-miniprogram 教了 watch 的基础姿势,这篇接着讲怎么把它用到具体场景里。

  • 适用:订单状态实时刷新、单聊 / 群聊消息推送、协同编辑同步、在线状态展示
  • 适用:小程序前端 + 独立 Cloudbase 环境(@cloudbase/js-sdk + @cloudbase/adapter-wx_mp)
  • 不适用:跨设备千万级并发的强实时(IM 这个量级建议上专门的 IM 服务,本文方案在中小规模 OK)
  • 不适用:延迟必须严格小于 100ms 的场景。watch 协议层基于长连接,典型延迟在百毫秒到秒级
  • 不适用:频繁触发数据库写入的瞬时累加(高频写会拖累 watch 推送,极端场景把累加挪到云函数里聚合)

环境要求

依赖版本
@cloudbase/js-sdk2.27.3(已在 add-auth-wechat-miniprogram 装好)
@cloudbase/adapter-wx_mp1.3.1
微信开发者工具1.06.x

另外需要:

第一步:把 watch 的回调结构吃透

watch 的回调里两个字段含义不同,容易混:

字段含义典型用途
snapshot.docs当前查询匹配的全部文档(变更后的完整结果集)直接 setData 渲染整个列表
snapshot.docChanges本次变化的几条记录,带 dataType想做局部更新 / 弹通知 / 增量动画

docChanges[].dataType 取值:

  • init:首次推送,推过来的就是初始快照,数量等于 where 当前匹配的所有文档
  • add:有一条新文档进入了查询结果(可能是新插入,也可能是别的字段变了让它进入了 where 范围)
  • update:已有文档的内容被改了
  • remove:文档被删了或字段变了让它脱离 where 范围

写应用层代码时记一条原则:列表全量重渲染用 snapshot.docs,弹通知 / 局部刷新 / 拆增量用 snapshot.docChanges,两者按需混用。

第二步:订单状态实时推送

场景:用户下单后,这条订单从「待支付 → 支付中 → 待发货 → 已发货」一路前进,前端等待页要随状态变化即时显示。

pages/order-detail/order-detail.js:

import { db } from '../../libs/cloudbase';
import { ensureLogin } from '../../libs/login';

Page({
data: {
order: null,
},

async onLoad({ orderId }) {
await ensureLogin();
this.orderId = orderId;
this.startWatchOrder();
},

startWatchOrder() {
this.watcher = db
.collection('orders')
.doc(this.orderId)
.watch({
onChange: (snapshot) => {
// 单文档监听,docs 长度只会是 0 或 1
const order = snapshot.docs[0] || null;
this.setData({ order });

// 每次变化都看一下增量,弹一下提示
for (const change of snapshot.docChanges) {
if (change.dataType === 'update') {
this.notifyStatusChange(change.doc.status);
}
}
},
onError: (err) => {
console.error('[watch order] error', err);
},
});
},

notifyStatusChange(status) {
const map = {
paid: '支付成功',
shipped: '商家已发货',
delivered: '订单已送达',
};
if (map[status]) {
wx.showToast({ title: map[status], icon: 'success' });
}
},

onUnload() {
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
},
});

要点:

  • 单文档监听用 db.collection().doc(id).watch(),语义比 where({_id: id}) 更直接
  • 服务端 update 后,前端这边经长连接收到推送,典型在百毫秒到秒级。比起前端轮询省接口次数,也少了「刚刷新数据其实没变」的尴尬
  • 业务事件提示只看 update,不要在 init 里也弹通知,否则用户进页面那一瞬间会被弹一次「订单已支付」

第三步:简单消息推送

场景:对话页订阅当前 conversationId 下的消息集合,新消息 add 触发 UI 渲染。

pages/chat/chat.js:

Page({
data: {
conversationId: '',
messages: [],
},

async onLoad({ conversationId }) {
await ensureLogin();
this.setData({ conversationId });

this.watcher = db
.collection('messages')
.where({ conversationId })
.orderBy('createdAt', 'asc')
.watch({
onChange: (snapshot) => {
// 新消息列表全量推过来,直接刷新
this.setData({ messages: snapshot.docs });

// 看 docChanges 拿到本次新增的几条,做提示音 / 滚到底
const newOnes = snapshot.docChanges.filter(
(c) => c.dataType === 'add'
);
if (newOnes.length && this.data.messages.length > 0) {
// init 阶段不算「新消息」,只在已经有消息后再算
wx.vibrateShort({ type: 'light' });
this.scrollToBottom();
}
},
onError: (err) => {
console.error('[watch messages] error', err);
},
});
},

scrollToBottom() {
// ScrollView 滚到底部的实现,略
},

onUnload() {
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
},
});

发消息侧只是普通 add,不在这里讲:

await db.collection('messages').add({
conversationId,
content: '你好',
createdAt: db.serverDate(),
});

第四步:在线状态(简单版)

场景:展示「这个用户最近在线」。原理是客户端定时心跳 update 自己的 lastActive,他人 watch 这个字段判断时间差。

心跳侧(在 app.js 启动后开一个定时器):

// miniprogram/app.js
import { db } from './libs/cloudbase';
import { ensureLogin } from './libs/login';

App({
async onLaunch() {
await ensureLogin();
this.startHeartbeat();
},

startHeartbeat() {
const beat = async () => {
try {
await db.collection('user_status').doc(this.globalData.user.uid).set({
lastActive: db.serverDate(),
});
} catch (e) {
console.warn('[heartbeat] failed', e);
}
};

beat();
this.heartbeatTimer = setInterval(beat, 30 * 1000); // 30s 一次
},

onHide() {
// 进后台不发心跳,省点资源
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
},

onShow() {
if (!this.heartbeatTimer && this.globalData.user) {
this.startHeartbeat();
}
},

globalData: { user: null },
});

观察侧 watch 一组用户的 lastActive,前端比较时间差:

const ONLINE_THRESHOLD = 90 * 1000; // 90s 内算在线

this.watcher = db
.collection('user_status')
.where({ uid: db.command.in(this.peerUids) })
.watch({
onChange: (snapshot) => {
const now = Date.now();
const onlineMap = {};
for (const doc of snapshot.docs) {
const last = doc.lastActive?.getTime?.() || 0;
onlineMap[doc.uid] = now - last < ONLINE_THRESHOLD;
}
this.setData({ onlineMap });
},
});

注意点:

  • 心跳间隔(30s)要小于「在线阈值」(90s),给一点容错。如果两者完全相等,心跳错过一次就会被判离线
  • lastActivedb.serverDate(),不要用客户端 new Date()。客户端时钟可能不准,服务端时间统一
  • 这个方案是最简单版,在线人数到几百以上时换专门的 presence 系统,这里不展开

第五步:页面销毁一定要 close

watch 的连接是长连接,服务端会维护订阅状态,占用配额。前端如果忘了 close(),常见三种坑:

  • 同一页面反复 onLoad(从详情返回再进),每次都建一个新 watcher,旧的还在,推送会重复
  • 用户跳来跳去最终退出 App,大量 watcher 没释放,服务端连接数占用,触发限流
  • 后台进程被微信回收,watcher 实际已断开,但前端 this.watcher 引用还在,以为还能用

固定写法:

Page({
onLoad() {
this.startWatch();
},

startWatch() {
// 进来先关掉旧的(如果有)
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}

this.watcher = db.collection('xxx').where({...}).watch({
onChange: (snapshot) => {
// ...
},
});
},

onUnload() {
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
},
});

组件用 attached / detached 同理。

第六步:已知限制

写代码前先把限制刻在脑子里,真出问题时少走弯路。

  • 单连接订阅文档数有上限。控制台「环境配置 → 配额」可以看具体数字。一个 watch 最多订阅多少文档,以当前文档为准,超过了 onChange 不会触发
  • wifi 切到 4G 等网络切换会触发短暂断连,SDK 内部会自动重连,业务侧基本无感,但偶发会有 1-2 秒推送延迟
  • 高频写入(每秒几十次以上)的集合,watch 推送会被合并 / 节流,不要把它当事件总线用
  • watch 不支持聚合查询(aggregate)。如果要监听聚合结果,把聚合放到云函数里跑,前端 watch 一个「结果缓存集合」,云函数算完写进去

运行验证

  1. 微信开发者工具编译,打开 Console
  2. 跑「订单状态实时推送」例子,先在控制台手工把 orders 里某条 status 字段改一次,小程序页面应该立刻看到新状态 + 弹 toast
  3. 跑「消息推送」例子,在控制台 add 一条 messages 记录,前端应该立刻看到新消息出现
  4. 退出页面 → Console 不应再有 watch 推送日志(没关掉 close 的话会继续推)
  5. 多次进出同一页面,只应该看到一份 watcher 在工作,不会有重复推送

常见错误

错误现象原因修复
onChange 完全不触发集合权限不允许当前用户读控制台「数据库 → 集合 → 权限」检查模式;watch 受权限约束
onChange 一直推全量,前端列表抖动没区分 initadd,把所有 docChanges 都当新事件见第三步,用 messages.length > 0 之类的状态过滤
同一页面进出 N 次后推送累积 N 倍旧 watcher 没 close()见第五步固定写法,在 startWatch 开头先关一次
onError 频繁触发,提示订阅数超限单连接订阅文档数超上限减少 watch 范围,或者把列表分页 watch(只监听当前可见的几条)
网络切换后短暂掉线wifi → 4G 切换 SDK 会重连SDK 内部自动恢复,业务侧不需要处理
偶发收到一条 dataType: 'remove' 但其实没删文档字段变了,脱离了 where 条件业务上把它当作「不再关心的文档」处理就行,不要假设是真删除

错误码定义参考 error-code

相关文档

下一步