Stream 数据结构详解:Redis 的消息队列利器
什么是 Stream
Stream 是 Redis 5.0 引入的一种全新的数据结构。它可以看作一个追加写日志(Append-Only Log),每条消息都有一个唯一的 ID,新消息只能追加到末尾,不能修改已存在的消息。
如果说 List 是最简单的消息队列,那么 Stream 就是专业的消息队列解决方案。
数据结构模型
Stream 在 Redis 内部使用基数树(Rax) 实现,这是一种压缩字典树(compressed trie),特别适合有序的消息 ID 存储。
Stream 结构:
┌─────────────────────────────┐
│ Stream (rax tree) │
│ ├─ ID: 1700000000000-0 │
│ │ └─ {name: "Alice", age: 30} │
│ ├─ ID: 1700000000100-0 │
│ │ └─ {name: "Bob", age: 25} │
│ ├─ ID: 1700000000200-0 │
│ │ └─ {name: "Charlie", age: 35}│
│ └─ ... │
│ │
│ Consumer Groups: │
│ ├─ "group1" │
│ │ └─ consumers: [c1, c2] │
│ └─ "group2" │
│ └─ consumers: [c3] │
└─────────────────────────────┘
核心命令集
写入和读取
# 添加消息(自动生成 ID)
XADD mystream * name Alice age 30
# "1716000000000-0"
# 添加消息(指定 ID)
XADD mystream 0-1 name Bob
# 获取消息数量
XLEN mystream
# 范围查询
XRANGE mystream - + # 所有消息
XRANGE mystream 1716000000000-0 + # 从指定 ID 开始
XRANGE mystream - + COUNT 10 # 只取前 10 条
# 反向查询(最新消息在前)
XREVRANGE mystream + - COUNT 5
# 按时间范围查询
XRANGE mystream 1716000000000-0 1716000000100-0
消费者组操作
# 创建消费者组
XGROUP CREATE mystream mygroup 0
# 0 表示从 Stream 开头开始消费
# $ 表示只消费后续的新消息
# 消费者读取消息
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
# > 表示读取未被其他消费者读取的消息
# 确认消息已处理
XACK mystream mygroup "1716000000000-0"
# 查看待处理消息
XPENDING mystream mygroup
# 查看消费者组信息
XINFO GROUPS mystream
# 查看消费者信息
XINFO CONSUMERS mystream mygroup
消息管理
# 修剪 Stream 长度
XTRIM mystream MAXLEN 10000 # 只保留 10000 条
XTRIM mystream MAXLEN ~ 10000 # 近似修剪(更高效)
# 删除指定消息
XDEL mystream "1716000000000-0"
# 设置消息过期(通过 MAXLEN/MINID)
XADD mystream MAXLEN 1000 * data value
# 自动删除最旧的消息,保持在 1000 条以内
# 按最小 ID 修剪
XTRIM mystream MINID 1716000000100-0
# 删除所有 ID 小于指定值的消息
消息 ID 详解
Stream 的消息 ID 由两部分组成:<毫秒级时间戳>-<序号>
1716000000000-0
^ ^
| └─ 同一毫秒内的序号(从 0 开始)
└─ 消息创建时的毫秒时间戳
ID 的特性:
– 单调递增:新 ID 必须大于已有 ID
– 可排序:以 ID 做字典序排序就是时间序
– 基于时间:可以按时间范围查询
– 自动生成或自定义:可以用 * 自动生成,也可以手动指定
消费者组机制
消费者组是 Stream 最强大的特性:
Stream (消息列表)
│
├─ 消费者组 "order_group"
│ ├─ 消费者 "worker_1" → 处理消息 A, D, G
│ ├─ 消费者 "worker_2" → 处理消息 B, E, H
│ └─ 消费者 "worker_3" → 处理消息 C, F, I
│
└─ 消费者组 "log_group"
├─ 消费者 "logger_1" → 处理所有消息
└─ 消费者 "logger_2" → 处理所有消息
↑ 这里是广播模式:每个消费者都处理所有消息
import redis
r = redis.Redis()
# 创建消费者组
r.xgroup_create('orders', 'shipping_group', id='0', mkstream=True)
# 消费者 - 读取消息
def process_orders(consumer_name):
while True:
# 阻塞读取,等待新消息
messages = r.xreadgroup(
'shipping_group', consumer_name,
{'orders': '>'}, # > 表示读取新消息
count=1, block=1000
)
if messages:
for stream, msgs in messages:
for msg_id, data in msgs:
print(f"处理订单: {data}")
# 处理完成后确认
r.xack('orders', 'shipping_group', msg_id)
Stream vs 其他数据结构
| 特性 | Stream | List (BRPOP) | Pub/Sub | Sorted Set |
|---|---|---|---|---|
| 消息持久化 | ✅ | ✅ | ❌ | ✅ |
| 消费者组 | ✅ | ❌ | ❌ | ❌ |
| 消息确认 | ✅ | ❌ | ❌ | ❌ |
| 阻塞读取 | ✅ | ✅ | ✅ | ❌ |
| 消息回溯 | ✅ | ❌ | ❌ | ✅ (按score) |
| 自动 ID | ✅ | ❌ | ❌ | ❌ |
使用场景示例
任务队列
def task_queue_example():
"""Stream 实现任务队列"""
r = redis.Redis()
# 创建队列
stream = 'task_queue'
group = 'workers'
try:
r.xgroup_create(stream, group, id='0')
except:
pass # 组已存在
# 生产者:添加任务
def add_task(task_type, task_data):
r.xadd(stream, '*', {
'type': task_type,
'data': task_data,
'created_at': time.time()
})
# 消费者:处理任务
def worker(worker_id):
while True:
# 阻塞等待任务
results = r.xreadgroup(
group, worker_id,
{stream: '>'},
count=1, block=5000
)
for _, msgs in results:
for msg_id, data in msgs:
try:
process_task(data) # 业务处理
r.xack(stream, group, msg_id) # 确认完成
except Exception as e:
print(f"处理失败: {e}")
# 失败消息留在待处理列表中
事件溯源
# Stream 作为事件存储,完整记录所有状态变更
event_stream = 'order_events'
# 记录每个事件
def record_event(order_id, event_type, data):
r.xadd(event_stream, '*', {
'order_id': order_id,
'event_type': event_type,
'data': json.dumps(data),
'timestamp': time.time()
})
# 重放订单历史
order_events = r.xrange(event_stream, '-', '+')
for event_id, event in order_events:
event_data = json.loads(event[b'data'].decode())
replay(event[b'event_type'].decode(), event_data)
常见问题
Q: Stream 内存占用大怎么办?
# 使用 MAXLEN 限制大小
XADD mystream MAXLEN 100000 * name value
# 只保留最近的 10 万条
# 使用 ~ 符号提高性能(允许略微超过限制)
XADD mystream MAXLEN ~ 100000 * name value
Q: 消费者处理失败怎么重试?
# 查看待处理列表(可能包括多次失败的消息)
XPENDING mystream mygroup - + 10
# 将失败消息重新分配给其他消费者
XCLAIM mystream mygroup new_consumer 60000 "msg-id-1"
# 60 秒后未确认的消息转移给新的消费者
面试要点
- Stream 是基于基数树(Rax) 的有序消息队列
- 消息 ID 是毫秒时间戳-序号,单调递增且按时间可排序
- 消费者组是 Stream 最大的优势:负载均衡 + 消息确认
- Stream 支持 ACK 确认机制,可做至少一次投递
- 对比 Pub/Sub:Stream 能回溯历史,Pub/Sub 不能
- 推荐 Stream 替代旧的 List/RPOP 消息队列模式
- MAXLEN ~ 可以实现高效的消息修剪
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END


暂无评论内容