Stream 数据结构详解:Redis 的消息队列利器

Stream 数据结构详解:Redis 的消息队列利器

什么是 Stream

Stream 是 Redis 5.0 引入的一种全新的数据结构。它可以看作一个追加写日志(Append-Only Log),每条消息都有一个唯一的 ID,新消息只能追加到末尾,不能修改已存在的消息。

如果说 List 是最简单的消息队列,那么 Stream 就是专业的消息队列解决方案。

数据结构模型

Stream 在 Redis 内部使用基数树(Rax) 实现,这是一种压缩字典树(compressed trie),特别适合有序的消息 ID 存储。

Stream 结构
┌─────────────────────────────┐
  Stream (rax tree)          
  ├─ ID: 1700000000000-0     
    └─ {name: "Alice", age: 30}  
  ├─ ID: 1700000000100-0     
    └─ {name: "Bob", age: 25}    
  ├─ ID: 1700000000200-0     
    └─ {name: "Charlie", age: 35}│
  └─ ...                      
                              
  Consumer Groups:            
  ├─ "group1"                 
    └─ consumers: [c1, c2]  
  └─ "group2"                 
     └─ consumers: [c3]      
└─────────────────────────────┘

核心命令集

写入和读取

# 添加消息(自动生成 ID)
XADD mystream * name Alice age 30
# "1716000000000-0"

# 添加消息(指定 ID)
XADD mystream 0-1 name Bob

# 获取消息数量
XLEN mystream

# 范围查询
XRANGE mystream - +                 # 所有消息
XRANGE mystream 1716000000000-0 +   # 从指定 ID 开始
XRANGE mystream - + COUNT 10       # 只取前 10 条

# 反向查询(最新消息在前)
XREVRANGE mystream + - COUNT 5

# 按时间范围查询
XRANGE mystream 1716000000000-0 1716000000100-0

消费者组操作

# 创建消费者组
XGROUP CREATE mystream mygroup 0
# 0 表示从 Stream 开头开始消费
# $ 表示只消费后续的新消息

# 消费者读取消息
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >

# > 表示读取未被其他消费者读取的消息

# 确认消息已处理
XACK mystream mygroup "1716000000000-0"

# 查看待处理消息
XPENDING mystream mygroup

# 查看消费者组信息
XINFO GROUPS mystream

# 查看消费者信息
XINFO CONSUMERS mystream mygroup

消息管理

# 修剪 Stream 长度
XTRIM mystream MAXLEN 10000        # 只保留 10000 条
XTRIM mystream MAXLEN ~ 10000     # 近似修剪(更高效)

# 删除指定消息
XDEL mystream "1716000000000-0"

# 设置消息过期(通过 MAXLEN/MINID)
XADD mystream MAXLEN 1000 * data value
# 自动删除最旧的消息,保持在 1000 条以内

# 按最小 ID 修剪
XTRIM mystream MINID 1716000000100-0
# 删除所有 ID 小于指定值的消息

消息 ID 详解

Stream 的消息 ID 由两部分组成:<毫秒级时间戳>-<序号>

1716000000000-0
^            ^
|            └─ 同一毫秒内的序号 0 开始
└─ 消息创建时的毫秒时间戳

ID 的特性
单调递增:新 ID 必须大于已有 ID
可排序:以 ID 做字典序排序就是时间序
基于时间:可以按时间范围查询
自动生成或自定义:可以用 * 自动生成,也可以手动指定

消费者组机制

消费者组是 Stream 最强大的特性:

Stream (消息列表)
    │
    ├─ 消费者组 "order_group"
    │   ├─ 消费者 "worker_1" → 处理消息 A, D, G
    │   ├─ 消费者 "worker_2" → 处理消息 B, E, H
    │   └─ 消费者 "worker_3" → 处理消息 C, F, I
    │
    └─ 消费者组 "log_group"
        ├─ 消费者 "logger_1" → 处理所有消息
        └─ 消费者 "logger_2" → 处理所有消息
        ↑ 这里是广播模式:每个消费者都处理所有消息
import redis

r = redis.Redis()

# 创建消费者组
r.xgroup_create('orders', 'shipping_group', id='0', mkstream=True)

# 消费者 - 读取消息
def process_orders(consumer_name):
    while True:
        # 阻塞读取,等待新消息
        messages = r.xreadgroup(
            'shipping_group', consumer_name,
            {'orders': '>'},   # > 表示读取新消息
            count=1, block=1000
        )
        if messages:
            for stream, msgs in messages:
                for msg_id, data in msgs:
                    print(f"处理订单: {data}")
                    # 处理完成后确认
                    r.xack('orders', 'shipping_group', msg_id)

Stream vs 其他数据结构

特性 Stream List (BRPOP) Pub/Sub Sorted Set
消息持久化
消费者组
消息确认
阻塞读取
消息回溯 ✅ (按score)
自动 ID

使用场景示例

任务队列

def task_queue_example():
    """Stream 实现任务队列"""
    r = redis.Redis()

    # 创建队列
    stream = 'task_queue'
    group = 'workers'

    try:
        r.xgroup_create(stream, group, id='0')
    except:
        pass  # 组已存在

    # 生产者:添加任务
    def add_task(task_type, task_data):
        r.xadd(stream, '*', {
            'type': task_type,
            'data': task_data,
            'created_at': time.time()
        })

    # 消费者:处理任务
    def worker(worker_id):
        while True:
            # 阻塞等待任务
            results = r.xreadgroup(
                group, worker_id,
                {stream: '>'},
                count=1, block=5000
            )
            for _, msgs in results:
                for msg_id, data in msgs:
                    try:
                        process_task(data)  # 业务处理
                        r.xack(stream, group, msg_id)  # 确认完成
                    except Exception as e:
                        print(f"处理失败: {e}")
                        # 失败消息留在待处理列表中

事件溯源

# Stream 作为事件存储,完整记录所有状态变更
event_stream = 'order_events'

# 记录每个事件
def record_event(order_id, event_type, data):
    r.xadd(event_stream, '*', {
        'order_id': order_id,
        'event_type': event_type,
        'data': json.dumps(data),
        'timestamp': time.time()
    })

# 重放订单历史
order_events = r.xrange(event_stream, '-', '+')
for event_id, event in order_events:
    event_data = json.loads(event[b'data'].decode())
    replay(event[b'event_type'].decode(), event_data)

常见问题

Q: Stream 内存占用大怎么办?

# 使用 MAXLEN 限制大小
XADD mystream MAXLEN 100000 * name value
# 只保留最近的 10 万条

# 使用 ~ 符号提高性能(允许略微超过限制)
XADD mystream MAXLEN ~ 100000 * name value

Q: 消费者处理失败怎么重试?

# 查看待处理列表(可能包括多次失败的消息)
XPENDING mystream mygroup - + 10

# 将失败消息重新分配给其他消费者
XCLAIM mystream mygroup new_consumer 60000 "msg-id-1"
# 60 秒后未确认的消息转移给新的消费者

面试要点

  • Stream 是基于基数树(Rax) 的有序消息队列
  • 消息 ID 是毫秒时间戳-序号,单调递增且按时间可排序
  • 消费者组是 Stream 最大的优势:负载均衡 + 消息确认
  • Stream 支持 ACK 确认机制,可做至少一次投递
  • 对比 Pub/Sub:Stream 能回溯历史,Pub/Sub 不能
  • 推荐 Stream 替代旧的 List/RPOP 消息队列模式
  • MAXLEN ~ 可以实现高效的消息修剪
© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容