发布订阅 Pub/Sub 原理:Redis 的消息广播机制
什么是 Pub/Sub
Pub/Sub(发布/订阅)是 Redis 内置的消息通信模式,允许消息的发布者(Publisher)向频道(Channel)发送消息,所有订阅(Subscriber)该频道的客户端都能收到消息。
发布者 → [频道] → 订阅者1
→ 订阅者2
→ 订阅者3
核心命令
# 订阅端
SUBSCRIBE channel # 订阅一个或多个频道
PSUBSCRIBE pattern* # 订阅模式匹配的频道(支持 glob 风格)
UNSUBSCRIBE channel # 取消订阅
PUNSUBSCRIBE pattern # 取消模式订阅
# 发布端
PUBLISH channel message # 向频道发布消息
# 管理
PUBSUB CHANNELS [pattern] # 查看活跃频道
PUBSUB NUMSUB [channel] # 查看频道订阅数
PUBSUB NUMPAT # 查看模式订阅数
工作示例
终端 1:订阅者
127.0.0.1:6379> SUBSCRIBE news.sports news.tech
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.sports"
3) (integer) 1
1) "subscribe"
2) "news.tech"
3) (integer) 2
终端 2:发布者
127.0.0.1:6379> PUBLISH news.sports "NBA Finals Game 7 starts tonight"
(integer) 1
127.0.0.1:6379> PUBLISH news.tech "OpenAI releases new model"
(integer) 1
订阅者收到消息
1) "message" # 消息类型
2) "news.sports" # 频道名
3) "NBA Finals Game 7 starts tonight" # 消息内容
1) "message"
2) "news.tech"
3) "OpenAI releases new model"
模式订阅
# 订阅端:匹配所有 news. 开头的频道
127.0.0.1:6379> PSUBSCRIBE news.*
# 发布端:匹配 news.sports 和 news.tech
127.0.0.1:6379> PUBLISH news.sports "Update"
127.0.0.1:6379> PUBLISH news.music "New album"
Pub/Sub 的消息格式
每条消息都是一个包含 3 个元素的数组:
1) "message" / "subscribe" / "unsubscribe"
2) 频道名
3) 消息内容 / 订阅数
message:正常的消息subscribe/unsubscribe:确认订阅/取消订阅pmessage:模式匹配的消息
Pub/Sub 的工作机制
1. 订阅关系
Redis 在服务端维护一个字典:
"news.sports" → [client_A, client_B, ...]
"news.tech" → [client_C]
"news.*" → [client_D] (模式订阅)
2. 消息发布流程
int pubsubPublishMessage(redisClient *c, robj *channel, robj *message) {
// 1. 查找订阅该频道的所有客户端
list *subscribers = dictFetchValue(server.pubsub_channels, channel);
// 2. 向每个订阅者发送消息
if (listLength(subscribers)) {
listIter li = listGetIterator(subscribers, AL_START_HEAD);
while ((ln = listNext(&li))) {
client = ln->value;
addReply(client, ...); // 发送消息
}
}
// 3. 匹配模式订阅
list *patterns = dictFetchValue(server.pubsub_patterns, pattern);
// 向匹配的客户端发送消息
return receivers;
}
3. 消息传递机制
Pub/Sub 是即发即忘模式:
– 消息不会持久化
– 如果订阅者不在线,消息会丢失
– 没有确认机制
– 没有重试机制
Pub/Sub 的适用场景
| 场景 | 说明 | 实例 |
|---|---|---|
| 实时通知 | 即时推送,可接受丢失 | 系统告警通知 |
| 消息广播 | 同时通知多个服务 | 配置变更通知 |
| 实时聊天 | 群组消息 | 简单的聊天室 |
| 事件驱动 | 解耦生产者消费者 | 缓存失效通知 |
Pub/Sub 的局限性
1. 消息不持久化
# 如果消息发布时订阅者不在线
PUBLISH notifications "Server restarted"
# 没有任何订阅者,消息丢失
2. 没有消息确认
# 订阅者收到消息后宕机
# 消息不会被重新投递
3. 缓冲有限
Redis 为每个订阅者分配输出缓冲区。如果订阅者消费慢,缓冲区满了 Redis 会断开连接:
# 配置客户端输出缓冲区限制
client-output-buffer-limit pubsub 8mb 2mb 60
# 硬限制 8MB,软限制 2MB/60 秒
4. 不支持消息回溯
Pub/Sub 不能像消息队列一样回溯历史消息。
Python 示例
import redis
import threading
import time
class PubSubExample:
def __init__(self, host='localhost', port=6379):
self.r = redis.Redis(host=host, port=port)
def publisher(self, channel, interval=1):
"""发布消息"""
while True:
msg = f"Message at {time.time():.0f}"
count = self.r.publish(channel, msg)
print(f"Published: '{msg}' -> {count} subscribers")
time.sleep(interval)
def subscriber(self, channel):
"""订阅消息"""
pubsub = self.r.pubsub()
pubsub.subscribe(channel)
print(f"Subscribed to {channel}")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
# 使用
if __name__ == '__main__':
example = PubSubExample()
# 启动订阅者线程
t = threading.Thread(target=example.subscriber, args=('alerts',), daemon=True)
t.start()
time.sleep(1)
example.publisher('alerts', 2)
Pub/Sub 与 Redis 事务的关系
重要:在订阅模式下,Redis 客户端不能执行除了 SUBSCRIBE/UNSUBSCRIBE/PSUBSCRIBE/PUNSUBSCRIBE/PING/QUIT 之外的命令。如果要执行其他命令,需要建立新的连接。
面试要点
- Pub/Sub 是即发即忘模式,消息不持久化
- 离线订阅者会丢失消息
- 没有消息确认和重试机制
- Redis 6.0+ 开始不推荐使用 Pub/Sub(推荐 Stream)
- 输出缓冲区限制可能导致连接被关闭
- 一个连接在订阅模式下不能执行其他命令
- PUBLISH 命令返回值是被多少客户端收到
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END


暂无评论内容