Pub/Sub 与 Stream 的区别:为什么说 Stream 是 Pub/Sub 的升级版
简单认识两者
Pub/Sub(发布订阅):简单的消息广播系统,发布者向频道发消息,所有订阅者实时接收。
Stream:支持持久化的消息队列,可以回溯历史消息,支持消费者组。
核心区别一览
| 对比维度 | Pub/Sub | Stream |
|---|---|---|
| 消息持久化 | ❌ 不持久化 | ✅ 持久化到内存 |
| 消息确认 | ❌ 无 | ✅ ACK 机制 |
| 消费模式 | 广播(所有订阅者都收到) | 负载均衡/广播 |
| 消费者组 | ❌ 不支持 | ✅ 支持 |
| 消息回溯 | ❌ 不能回溯 | ✅ 可以回溯 |
| 离线消费 | ❌ 离线丢失 | ✅ 上线后能继续消费 |
| 消息排序 | 不保证 | ✅ 按 ID 严格排序 |
| 内存占用 | 极低 | 较高 |
| 适用场景 | 实时通知 | 消息队列、事件溯源 |
| 引入版本 | 2.0 | 5.0 |
详细对比
1. 消息持久化
Pub/Sub:消息即发即失。如果发布消息时没有订阅者,消息直接丢弃。
# Pub/Sub - 消息丢失
# 订阅者离线期间,所有发布的消息都丢了
PUBLISH chat "Hello" # 没有订阅者,消息直接丢弃
Stream:消息保存在内存中,直到被明确删除或达到最大长度。
# Stream - 消息持久化
XADD mystream * name "Alice" age 30
# "1700000000-0" -- 消息 ID,基于时间戳+序号
# 即使没有消费者,消息也会保留
XLEN mystream
# (integer) 1
2. 消费者组
Pub/Sub:每个订阅者收到所有消息。
# 两个客户端都订阅同一个频道
SUBSCRIBE channel1
# 发布一条消息,两个客户端都会收到
Stream:支持消费者组(Consumer Group),组内消息负载均衡。
# 创建消费者组
XGROUP CREATE mystream mygroup 0
# 消费者 A 读取
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
# 消费者 B 读取同一条时,不会收到(已经被 A 消费了)
3. 消息确认机制
Pub/Sub:没有确认机制,消息发送后就认为已送达。
Stream:支持 ACK 确认,可保证至少一次投递。
# 确认消息已处理
XACK mystream mygroup "1700000000-0"
# 查看未确认消息
XPENDING mystream mygroup
# 显示所有待确认的消息
4. 消息回溯
Pub/Sub:无法获取历史消息。
Stream:可以根据 ID 或时间范围读取历史消息。
# 获取所有历史消息
XRANGE mystream - +
# 获取某个时间点后的消息
XRANGE mystream 1700000000-0 +
# 获取最近 N 条消息
XREVRANGE mystream + - COUNT 100
场景选择指南
选 Pub/Sub 的场景
# ✅ Pub/Sub 适合的:
# 1. 实时通知,丢失没关系
pubsub.publish('notifications', 'Server health check complete')
# 2. 广播事件,需要所有服务都收到
pubsub.publish('config_changed', json.dumps(new_config))
# 3. 轻量级聊天,不在历史
pubsub.publish('chat:room1', user_message)
选 Stream 的场景
# ✅ Stream 适合的:
# 1. 任务队列,需要保证不丢
stream.xadd('task_queue', {'type': 'email', 'to': 'user@example.com'})
# 2. 事件溯源,需要回溯
stream.xadd('order_events', {'order_id': '123', 'status': 'paid'})
# 3. 多消费者负载均衡
# 多个 worker 从同一个 Stream 消费,互相不冲突
# 4. 离线消费者恢复
# 消费者宕机重启后,可以从上次中断处继续消费
Stream 的额外特性
1. 消息 ID 可自定义
# 自动生成:基于毫秒时间戳
XADD mystream * name "Alice"
# "1716000000000-0"
# 自定义 ID:尾部是递增序号
XADD mystream 0-1 name "Bob"
XADD mystream 0-2 name "Charlie"
# ID 比 0-2 小,会被拒绝(必须单调递增)
XADD mystream 0-1 name "David"
# (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
2. 消息修剪
# 限制 Stream 最大长度
XADD mystream MAXLEN 10000 * name "Alice"
# 只保留最新的 10000 条消息
# 近似修剪(更高效,使用 ~ 符号)
XADD mystream MAXLEN ~ 10000 * name "Alice"
# 效率更高,但长度可能略多于 10000
3. 阻塞读取
# 阻塞等待新消息
XREAD BLOCK 5000 STREAMS mystream 0
# 没有新消息时阻塞 5 秒
# 只读最新消息(使用 $)
XREAD BLOCK 0 STREAMS mystream $
# 只等待新消息,$ 表示"从现在开始"
性能对比
Pub/Sub 延迟: < 0.1ms (纯内存转发)
Stream 延迟: < 0.2ms (需要写入数据结构)
内存效率:
Pub/Sub: 极低(不保存消息)
Stream: 与数据量成正比(每条消息约 100+ 字节)
吞吐量(单机):
Pub/Sub: ~1,000,000 条/秒
Stream: ~500,000 条/秒(写入+读取)
结论
def choose_message_system(requirements):
"""根据需求选择 Pub/Sub 还是 Stream"""
if requirements.get('persistence', False):
return "Stream" # 需要持久化
if requirements.get('consumer_group', False):
return "Stream" # 需要消费者组
if requirements.get('message_ack', False):
return "Stream" # 需要消息确认
if requirements.get('history_replay', False):
return "Stream" # 需要回溯历史
if requirements.get('minimal_delay', False):
return "Pub/Sub" # 极致延迟
if requirements.get('simple_broadcast', False):
return "Pub/Sub" # 简单广播
return "建议优先使用 Stream(功能更全面)"
面试要点
- Pub/Sub 是轻量广播,Stream 是完整消息队列
- Pub/Sub 消息不持久化,Stream 持久化
- Stream 的消费者组支持负载均衡和消息确认
- Stream 支持回溯历史消息,Pub/Sub 不行
- Redis 5.0+ 推荐使用 Stream 替代 Pub/Sub
- Pub/Sub 的输出缓冲区可能导致订阅者被断开
- Stream 支持阻塞读取(BLOCK),Pub/Sub 没有读取控制
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END


暂无评论内容