Redis Stream 核心命令:XADD、XREAD、XACK

Redis Stream 核心命令:XADD、XREAD、XACK

概述

这三个命令是 Redis Stream 消息队列的核心操作,分别对应消息的生产消费确认。理解它们的用法和背后的机制,是掌握 Stream 的关键。

XADD:消息生产者

基本用法

# 添加一条消息到 Stream
XADD mystream * name "张三" age 25

字段格式:XADD key [NOMKSTREAM] [MAXLEN|MINID [=~] threshold] *|id field value [field value ...]

重要参数

NOMKSTREAM(Redis 6.2+)

# 如果 Stream 不存在,不自动创建
XADD mystream NOMKSTREAM * event "test"   # 若 Stream 不存在则返回 nil

MAXLEN 修剪

# 限制 Stream 长度为 1000 条(精确修剪)
XADD mystream MAXLEN 1000 * field value

# 近似修剪(性能更好,推荐用于生产)
XADD mystream MAXLEN ~ 1000 * field value

MINID 修剪(Redis 6.2+)

# 删除所有 ID 小于指定值的消息
XADD mystream MINID 1700000000000-0 * field value

返回值

XADD mystream * name "张三"
# 返回: "1700000000000-0"  ← 新消息的 ID

性能特点

  • 单条消息添加复杂度 O(log N),N 为 Stream 中消息数量
  • 使用 MAXLEN 时,整体复杂度 O(log N)~O(N)
  • 近似修剪(~)的性能更好

XREAD:消息消费者

基本用法

# 读取所有 Stream 的新消息
XREAD COUNT 10 BLOCK 0 STREAMS mystream $

参数说明

参数 说明
COUNT 10 每次最多返回 10 条消息
BLOCK 0 阻塞等待,0 表示无限等待
STREAMS 指定一个或多个 Stream
$ 读取之后的新消息
0 从 Stream 开头读取
ID 从指定 ID 之后读取

三种读取模式

1. 实时监听模式($)

# 只读取之后的新消息,类似 tail -f
XREAD COUNT 10 BLOCK 5000 STREAMS mystream $

2. 历史回溯模式

# 从 Stream 开头读取所有消息
XREAD COUNT 100 STREAMS mystream 0

3. 指定位置开始

# 从特定消息 ID 之后开始读取
XREAD COUNT 10 STREAMS mystream 1700000000000-0

多 Stream 读取

# 同时读取多个 Stream
XREAD COUNT 5 BLOCK 0 STREAMS stream1 stream2 $ $

与 XREADGROUP 的区别

XREAD                    vs     XREADGROUP
  ↓                              ↓
独立消费者                    消费者组成员
每个消费者独立消费            消息在组内负载均衡
没有消息确认机制             需要 XACK 确认
不追踪消费进度               追踪消费进度(PEL)

XACK:消息确认

基本用法

XACK mystream mygroup 1700000000000-0

确认单个消息或多个消息:

# 确认多条消息
XACK mystream mygroup 1700000000000-0 1700000000001-0 1700000000002-0

确认机制的原理

消息被投递 → 加入 PEL
    ↓
消费者处理完成 → 调用 XACK
    ↓
消息从 PEL 移除 → 确认完成
  • 不 ACK 的消息会一直留在 PEL 中
  • XACK 可以保证”至少一次”投递
  • 重复 ACK 同一消息是安全的(幂等)

查看 PEL 状态

# 查看待确认消息数量
XPENDING mystream mygroup
# 返回: (integer) 5    5 条消息待确认

# 查看详情
XPENDING mystream mygroup - + 10

三命令配合的完整流程

基础消息队列实现

import redis

r = redis.Redis(decode_responses=True)
STREAM = "order_queue"
GROUP = "order_processors"
CONSUMER = "worker1"

# 创建消费者组(首次运行)
try:
    r.xgroup_create(STREAM, GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
    if "BUSYGROUP" not in str(e):
        raise

# 生产者
def produce_order(order_data):
    msg_id = r.xadd(STREAM, order_data, maxlen=10000)
    return msg_id

# 消费者
def consume_orders():
    while True:
        try:
            # XREAD:读取消息
            results = r.xreadgroup(GROUP, CONSUMER, 
                                   {STREAM: '>'}, 
                                   count=10, 
                                   block=2000)

            if not results:
                continue

            for stream_data in results:
                for msg_id, fields in stream_data[1]:
                    try:
                        # 处理消息
                        process_order(fields)

                        # XACK:确认消息
                        r.xack(STREAM, GROUP, msg_id)
                    except Exception as e:
                        log_error(f"处理失败: {msg_id}, {e}")
                        # 不 ACK,消息保留在 PEL 中

        except Exception as e:
            log_error(f"消费循环异常: {e}")
            time.sleep(1)

性能考量

操作 时间复杂度 说明
XADD O(log N) N 为消息数量
XREAD O(log N + M) M 为返回的消息数
XACK O(log N) 从基数树中移除
MAXLEN 修剪 O(log N)~O(N) 精确修剪较慢

面试要点

  • XADD:消息添加是 O(log N),近似修剪用 ~ 提高性能
  • XREAD$ 实时监听 vs 0 历史回溯 vs > 消费者组新消息
  • XACK:消息确认的核心,影响 PEL 内存占用
  • 三者关系:生产 → XADD,消费 → XREAD,确认 → XACK
  • 阻塞模式:BLOCK 参数实现长轮询,避免空轮询浪费
  • 常见陷阱:忘记 XACK 导致 PEL 膨胀、MAXLEN 过大导致内存溢出
© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容