Redis Stream 核心命令:XADD、XREAD、XACK
概述
这三个命令是 Redis Stream 消息队列的核心操作,分别对应消息的生产、消费和确认。理解它们的用法和背后的机制,是掌握 Stream 的关键。
XADD:消息生产者
基本用法
# 添加一条消息到 Stream
XADD mystream * name "张三" age 25
字段格式:XADD key [NOMKSTREAM] [MAXLEN|MINID [=~] threshold] *|id field value [field value ...]
重要参数
NOMKSTREAM(Redis 6.2+)
# 如果 Stream 不存在,不自动创建
XADD mystream NOMKSTREAM * event "test" # 若 Stream 不存在则返回 nil
MAXLEN 修剪
# 限制 Stream 长度为 1000 条(精确修剪)
XADD mystream MAXLEN 1000 * field value
# 近似修剪(性能更好,推荐用于生产)
XADD mystream MAXLEN ~ 1000 * field value
MINID 修剪(Redis 6.2+)
# 删除所有 ID 小于指定值的消息
XADD mystream MINID 1700000000000-0 * field value
返回值
XADD mystream * name "张三"
# 返回: "1700000000000-0" ← 新消息的 ID
性能特点
- 单条消息添加复杂度 O(log N),N 为 Stream 中消息数量
- 使用 MAXLEN 时,整体复杂度 O(log N)~O(N)
- 近似修剪(
~)的性能更好
XREAD:消息消费者
基本用法
# 读取所有 Stream 的新消息
XREAD COUNT 10 BLOCK 0 STREAMS mystream $
参数说明
| 参数 | 说明 |
|---|---|
COUNT 10 |
每次最多返回 10 条消息 |
BLOCK 0 |
阻塞等待,0 表示无限等待 |
STREAMS |
指定一个或多个 Stream |
$ |
读取之后的新消息 |
0 |
从 Stream 开头读取 |
| ID | 从指定 ID 之后读取 |
三种读取模式
1. 实时监听模式($)
# 只读取之后的新消息,类似 tail -f
XREAD COUNT 10 BLOCK 5000 STREAMS mystream $
2. 历史回溯模式
# 从 Stream 开头读取所有消息
XREAD COUNT 100 STREAMS mystream 0
3. 指定位置开始
# 从特定消息 ID 之后开始读取
XREAD COUNT 10 STREAMS mystream 1700000000000-0
多 Stream 读取
# 同时读取多个 Stream
XREAD COUNT 5 BLOCK 0 STREAMS stream1 stream2 $ $
与 XREADGROUP 的区别
XREAD vs XREADGROUP
↓ ↓
独立消费者 消费者组成员
每个消费者独立消费 消息在组内负载均衡
没有消息确认机制 需要 XACK 确认
不追踪消费进度 追踪消费进度(PEL)
XACK:消息确认
基本用法
XACK mystream mygroup 1700000000000-0
确认单个消息或多个消息:
# 确认多条消息
XACK mystream mygroup 1700000000000-0 1700000000001-0 1700000000002-0
确认机制的原理
消息被投递 → 加入 PEL
↓
消费者处理完成 → 调用 XACK
↓
消息从 PEL 移除 → 确认完成
- 不 ACK 的消息会一直留在 PEL 中
- XACK 可以保证”至少一次”投递
- 重复 ACK 同一消息是安全的(幂等)
查看 PEL 状态
# 查看待确认消息数量
XPENDING mystream mygroup
# 返回: (integer) 5 ← 有 5 条消息待确认
# 查看详情
XPENDING mystream mygroup - + 10
三命令配合的完整流程
基础消息队列实现
import redis
r = redis.Redis(decode_responses=True)
STREAM = "order_queue"
GROUP = "order_processors"
CONSUMER = "worker1"
# 创建消费者组(首次运行)
try:
r.xgroup_create(STREAM, GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
# 生产者
def produce_order(order_data):
msg_id = r.xadd(STREAM, order_data, maxlen=10000)
return msg_id
# 消费者
def consume_orders():
while True:
try:
# XREAD:读取消息
results = r.xreadgroup(GROUP, CONSUMER,
{STREAM: '>'},
count=10,
block=2000)
if not results:
continue
for stream_data in results:
for msg_id, fields in stream_data[1]:
try:
# 处理消息
process_order(fields)
# XACK:确认消息
r.xack(STREAM, GROUP, msg_id)
except Exception as e:
log_error(f"处理失败: {msg_id}, {e}")
# 不 ACK,消息保留在 PEL 中
except Exception as e:
log_error(f"消费循环异常: {e}")
time.sleep(1)
性能考量
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
| XADD | O(log N) | N 为消息数量 |
| XREAD | O(log N + M) | M 为返回的消息数 |
| XACK | O(log N) | 从基数树中移除 |
| MAXLEN 修剪 | O(log N)~O(N) | 精确修剪较慢 |
面试要点
- XADD:消息添加是 O(log N),近似修剪用
~提高性能 - XREAD:
$实时监听 vs0历史回溯 vs>消费者组新消息 - XACK:消息确认的核心,影响 PEL 内存占用
- 三者关系:生产 → XADD,消费 → XREAD,确认 → XACK
- 阻塞模式:BLOCK 参数实现长轮询,避免空轮询浪费
- 常见陷阱:忘记 XACK 导致 PEL 膨胀、MAXLEN 过大导致内存溢出
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END


暂无评论内容