Pub/Sub 与 Stream 的区别:为什么说 Stream 是 Pub/Sub 的升级版

Pub/Sub 与 Stream 的区别:为什么说 Stream 是 Pub/Sub 的升级版

简单认识两者

Pub/Sub(发布订阅):简单的消息广播系统,发布者向频道发消息,所有订阅者实时接收。

Stream:支持持久化的消息队列,可以回溯历史消息,支持消费者组。

核心区别一览

对比维度 Pub/Sub Stream
消息持久化 ❌ 不持久化 ✅ 持久化到内存
消息确认 ❌ 无 ✅ ACK 机制
消费模式 广播(所有订阅者都收到) 负载均衡/广播
消费者组 ❌ 不支持 ✅ 支持
消息回溯 ❌ 不能回溯 ✅ 可以回溯
离线消费 ❌ 离线丢失 ✅ 上线后能继续消费
消息排序 不保证 ✅ 按 ID 严格排序
内存占用 极低 较高
适用场景 实时通知 消息队列、事件溯源
引入版本 2.0 5.0

详细对比

1. 消息持久化

Pub/Sub:消息即发即失。如果发布消息时没有订阅者,消息直接丢弃。

# Pub/Sub - 消息丢失
# 订阅者离线期间,所有发布的消息都丢了
PUBLISH chat "Hello"   # 没有订阅者,消息直接丢弃

Stream:消息保存在内存中,直到被明确删除或达到最大长度。

# Stream - 消息持久化
XADD mystream * name "Alice" age 30
# "1700000000-0"  -- 消息 ID,基于时间戳+序号

# 即使没有消费者,消息也会保留
XLEN mystream
# (integer) 1

2. 消费者组

Pub/Sub:每个订阅者收到所有消息。

# 两个客户端都订阅同一个频道
SUBSCRIBE channel1
# 发布一条消息,两个客户端都会收到

Stream:支持消费者组(Consumer Group),组内消息负载均衡

# 创建消费者组
XGROUP CREATE mystream mygroup 0

# 消费者 A 读取
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
# 消费者 B 读取同一条时,不会收到(已经被 A 消费了)

3. 消息确认机制

Pub/Sub:没有确认机制,消息发送后就认为已送达。

Stream:支持 ACK 确认,可保证至少一次投递。

# 确认消息已处理
XACK mystream mygroup "1700000000-0"

# 查看未确认消息
XPENDING mystream mygroup
# 显示所有待确认的消息

4. 消息回溯

Pub/Sub:无法获取历史消息。

Stream:可以根据 ID 或时间范围读取历史消息。

# 获取所有历史消息
XRANGE mystream - +

# 获取某个时间点后的消息
XRANGE mystream 1700000000-0 +

# 获取最近 N 条消息
XREVRANGE mystream + - COUNT 100

场景选择指南

选 Pub/Sub 的场景

# ✅ Pub/Sub 适合的:
# 1. 实时通知,丢失没关系
pubsub.publish('notifications', 'Server health check complete')

# 2. 广播事件,需要所有服务都收到
pubsub.publish('config_changed', json.dumps(new_config))

# 3. 轻量级聊天,不在历史
pubsub.publish('chat:room1', user_message)

选 Stream 的场景

# ✅ Stream 适合的:
# 1. 任务队列,需要保证不丢
stream.xadd('task_queue', {'type': 'email', 'to': 'user@example.com'})

# 2. 事件溯源,需要回溯
stream.xadd('order_events', {'order_id': '123', 'status': 'paid'})

# 3. 多消费者负载均衡
# 多个 worker 从同一个 Stream 消费,互相不冲突

# 4. 离线消费者恢复
# 消费者宕机重启后,可以从上次中断处继续消费

Stream 的额外特性

1. 消息 ID 可自定义

# 自动生成:基于毫秒时间戳
XADD mystream * name "Alice"
# "1716000000000-0"

# 自定义 ID:尾部是递增序号
XADD mystream 0-1 name "Bob"
XADD mystream 0-2 name "Charlie"

# ID 比 0-2 小,会被拒绝(必须单调递增)
XADD mystream 0-1 name "David"
# (error) ERR The ID specified in XADD is equal or smaller than the target stream top item

2. 消息修剪

# 限制 Stream 最大长度
XADD mystream MAXLEN 10000 * name "Alice"
# 只保留最新的 10000 条消息

# 近似修剪(更高效,使用 ~ 符号)
XADD mystream MAXLEN ~ 10000 * name "Alice"
# 效率更高,但长度可能略多于 10000

3. 阻塞读取

# 阻塞等待新消息
XREAD BLOCK 5000 STREAMS mystream 0
# 没有新消息时阻塞 5 秒

# 只读最新消息(使用 $)
XREAD BLOCK 0 STREAMS mystream $
# 只等待新消息,$ 表示"从现在开始"

性能对比

Pub/Sub 延迟:      < 0.1ms (纯内存转发)
Stream 延迟:       < 0.2ms (需要写入数据结构)

内存效率:
Pub/Sub:   极低(不保存消息)
Stream:    与数据量成正比(每条消息约 100+ 字节)

吞吐量(单机):
Pub/Sub:   ~1,000,000 条/秒
Stream:    ~500,000 条/秒(写入+读取)

结论

def choose_message_system(requirements):
    """根据需求选择 Pub/Sub 还是 Stream"""

    if requirements.get('persistence', False):
        return "Stream"  # 需要持久化
    if requirements.get('consumer_group', False):
        return "Stream"  # 需要消费者组
    if requirements.get('message_ack', False):
        return "Stream"  # 需要消息确认
    if requirements.get('history_replay', False):
        return "Stream"  # 需要回溯历史

    if requirements.get('minimal_delay', False):
        return "Pub/Sub"  # 极致延迟

    if requirements.get('simple_broadcast', False):
        return "Pub/Sub"  # 简单广播

    return "建议优先使用 Stream(功能更全面)"

面试要点

  • Pub/Sub 是轻量广播,Stream 是完整消息队列
  • Pub/Sub 消息不持久化,Stream 持久化
  • Stream 的消费者组支持负载均衡消息确认
  • Stream 支持回溯历史消息,Pub/Sub 不行
  • Redis 5.0+ 推荐使用 Stream 替代 Pub/Sub
  • Pub/Sub 的输出缓冲区可能导致订阅者被断开
  • Stream 支持阻塞读取(BLOCK),Pub/Sub 没有读取控制
© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容