发布订阅 Pub/Sub 原理:Redis 的消息广播机制

发布订阅 Pub/Sub 原理:Redis 的消息广播机制

什么是 Pub/Sub

Pub/Sub(发布/订阅)是 Redis 内置的消息通信模式,允许消息的发布者(Publisher)向频道(Channel)发送消息,所有订阅(Subscriber)该频道的客户端都能收到消息。

发布者 → [频道] → 订阅者1
                  → 订阅者2
                  → 订阅者3

核心命令

# 订阅端
SUBSCRIBE channel          # 订阅一个或多个频道
PSUBSCRIBE pattern*        # 订阅模式匹配的频道(支持 glob 风格)
UNSUBSCRIBE channel        # 取消订阅
PUNSUBSCRIBE pattern       # 取消模式订阅

# 发布端
PUBLISH channel message    # 向频道发布消息

# 管理
PUBSUB CHANNELS [pattern]  # 查看活跃频道
PUBSUB NUMSUB [channel]    # 查看频道订阅数
PUBSUB NUMPAT              # 查看模式订阅数

工作示例

终端 1:订阅者

127.0.0.1:6379> SUBSCRIBE news.sports news.tech
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.sports"
3) (integer) 1
1) "subscribe"
2) "news.tech"
3) (integer) 2

终端 2:发布者

127.0.0.1:6379> PUBLISH news.sports "NBA Finals Game 7 starts tonight"
(integer) 1
127.0.0.1:6379> PUBLISH news.tech "OpenAI releases new model"
(integer) 1

订阅者收到消息

1) "message"              # 消息类型
2) "news.sports"          # 频道名
3) "NBA Finals Game 7 starts tonight"  # 消息内容

1) "message"
2) "news.tech"
3) "OpenAI releases new model"

模式订阅

# 订阅端:匹配所有 news. 开头的频道
127.0.0.1:6379> PSUBSCRIBE news.*

# 发布端:匹配 news.sports 和 news.tech
127.0.0.1:6379> PUBLISH news.sports "Update"
127.0.0.1:6379> PUBLISH news.music "New album"

Pub/Sub 的消息格式

每条消息都是一个包含 3 个元素的数组:

1) "message" / "subscribe" / "unsubscribe"
2) 频道名
3) 消息内容 / 订阅数
  • message:正常的消息
  • subscribe/unsubscribe:确认订阅/取消订阅
  • pmessage:模式匹配的消息

Pub/Sub 的工作机制

1. 订阅关系

Redis 在服务端维护一个字典:

"news.sports"  [client_A, client_B, ...]
"news.tech"   [client_C]
"news.*"      [client_D]  (模式订阅)

2. 消息发布流程

int pubsubPublishMessage(redisClient *c, robj *channel, robj *message) {
    // 1. 查找订阅该频道的所有客户端
    list *subscribers = dictFetchValue(server.pubsub_channels, channel);

    // 2. 向每个订阅者发送消息
    if (listLength(subscribers)) {
        listIter li = listGetIterator(subscribers, AL_START_HEAD);
        while ((ln = listNext(&li))) {
            client = ln->value;
            addReply(client, ...);  // 发送消息
        }
    }

    // 3. 匹配模式订阅
    list *patterns = dictFetchValue(server.pubsub_patterns, pattern);
    // 向匹配的客户端发送消息

    return receivers;
}

3. 消息传递机制

Pub/Sub 是即发即忘模式:
– 消息不会持久化
– 如果订阅者不在线,消息会丢失
– 没有确认机制
– 没有重试机制

Pub/Sub 的适用场景

场景 说明 实例
实时通知 即时推送,可接受丢失 系统告警通知
消息广播 同时通知多个服务 配置变更通知
实时聊天 群组消息 简单的聊天室
事件驱动 解耦生产者消费者 缓存失效通知

Pub/Sub 的局限性

1. 消息不持久化

# 如果消息发布时订阅者不在线
PUBLISH notifications "Server restarted"
# 没有任何订阅者,消息丢失

2. 没有消息确认

# 订阅者收到消息后宕机
# 消息不会被重新投递

3. 缓冲有限

Redis 为每个订阅者分配输出缓冲区。如果订阅者消费慢,缓冲区满了 Redis 会断开连接:

# 配置客户端输出缓冲区限制
client-output-buffer-limit pubsub 8mb 2mb 60
# 硬限制 8MB,软限制 2MB/60 秒

4. 不支持消息回溯

Pub/Sub 不能像消息队列一样回溯历史消息。

Python 示例

import redis
import threading
import time

class PubSubExample:
    def __init__(self, host='localhost', port=6379):
        self.r = redis.Redis(host=host, port=port)

    def publisher(self, channel, interval=1):
        """发布消息"""
        while True:
            msg = f"Message at {time.time():.0f}"
            count = self.r.publish(channel, msg)
            print(f"Published: '{msg}' -> {count} subscribers")
            time.sleep(interval)

    def subscriber(self, channel):
        """订阅消息"""
        pubsub = self.r.pubsub()
        pubsub.subscribe(channel)
        print(f"Subscribed to {channel}")

        for message in pubsub.listen():
            if message['type'] == 'message':
                print(f"Received: {message['data']}")

# 使用
if __name__ == '__main__':
    example = PubSubExample()

    # 启动订阅者线程
    t = threading.Thread(target=example.subscriber, args=('alerts',), daemon=True)
    t.start()

    time.sleep(1)
    example.publisher('alerts', 2)

Pub/Sub 与 Redis 事务的关系

重要:在订阅模式下,Redis 客户端不能执行除了 SUBSCRIBE/UNSUBSCRIBE/PSUBSCRIBE/PUNSUBSCRIBE/PING/QUIT 之外的命令。如果要执行其他命令,需要建立新的连接

面试要点

  • Pub/Sub 是即发即忘模式,消息不持久化
  • 离线订阅者会丢失消息
  • 没有消息确认和重试机制
  • Redis 6.0+ 开始不推荐使用 Pub/Sub(推荐 Stream)
  • 输出缓冲区限制可能导致连接被关闭
  • 一个连接在订阅模式下不能执行其他命令
  • PUBLISH 命令返回值是被多少客户端收到
© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容