Pipeline 批量操作:减少网络往返的”打包神器”

Pipeline 批量操作:减少网络往返的”打包神器”

什么是 Pipeline

Pipeline(流水线/管道)是 Redis 客户端提供的一种批量操作模式,允许客户端将多条命令一次性发送到服务端,然后一次性接收所有返回结果。

为什么需要 Pipeline

没有 Pipeline 的问题

# 普通模式:每条命令一次网络往返
for i in range(1000):
    r.set(f"key:{i}", i)

在没有 Pipeline 的情况下,每个 SET 命令都需要经历:

客户端 → 发送命令 → 服务端处理 → 返回结果 → 客户端

1000 次 SET = 1000 次网络往返(每往返约 0.5ms 内网延迟)= 约 500ms 网络开销

使用 Pipeline

# Pipeline 模式:打包发送
pipe = r.pipeline()
for i in range(1000):
    pipe.set(f"key:{i}", i)
pipe.execute()  # 一次网络往返

1000 次 SET = 1 次网络往返 = 约 0.5ms 网络开销

Pipeline 的工作原理

普通模式
客户端: [SET a 1]───→[SET b 2]───→[GET c]────→
服务端:      [OK]──    [OK]──    [3]──

Pipeline 模式
客户端: [SET a 1][SET b 2][GET c]──────────────────────→
服务端:                              [OK][OK][3]───

核心思想:将多个命令打包,减少网络往返次数

使用示例

Python Redis 客户端

import redis

r = redis.Redis(host='localhost', port=6379)

# 批量写入
pipe = r.pipeline()
pipe.set('name', 'Alice')
pipe.set('age', '30')
pipe.set('city', 'Beijing')
results = pipe.execute()
# results = [True, True, True]

# 批量读取
pipe = r.pipeline()
pipe.get('name')
pipe.get('age')
pipe.get('city')
results = pipe.execute()
# results = [b'Alice', b'30', b'Beijing']

Jedis (Java)

Jedis jedis = new Jedis("localhost");
Pipeline pipeline = jedis.pipelined();

// 批量设置
for (int i = 0; i < 1000; i++) {
    pipeline.set("key:" + i, String.valueOf(i));
}
List<Object> results = pipeline.syncAndReturnAll();

Redis CLI 模拟

# 使用 --pipe 模式
echo -e '*3\r\n$3\r\nSET\r\n$5\r\nkey1\r\n$5\r\nvalue\r\n' | redis-cli --pipe

Pipeline 的适用场景

场景 适合 Pipeline 原因
大量写入 减少网络开销,提升 TPS
批量读取 单次读取多个 key
批量删除 一次性清理大量 key
数据初始化 导入大量数据
强依赖顺序 ⚠️ Pipeline 保证命令顺序执行
依赖中间结果的命令 Pipeline 命令间不能依赖

Pipeline 的限制

1. 命令间不能有依赖关系

# ❌ 错误:后续命令依赖前面命令的结果
pipe = r.pipeline()
pipe.set('counter', 0)
pipe.incr('counter')     # 没问题,INCER 不依赖 GET 结果

# ❌ 更典型的问题:无法实现"读-改-写"
pipe.get('balance')      # 需要这个结果来做判断
pipe.decrby('balance', amount)  # 但这里不确定余额是否足够
pipe.execute()

对于这种”读-改-写”的场景,需要使用 Lua 脚本或 WATCH 事务。

2. Pipeline 不是事务

# Pipeline 中的命令执行是顺序的
# 但如果中间某条命令失败,其他命令仍然会执行
# Pipeline 不保证事务隔离

3. 内存消耗

Pipeline 一次性发送大量命令时,客户端和服务端都需要缓冲这些命令。过大的 Pipeline 会消耗内存。

# 分批 Pipeline
for i in range(0, 100000, 1000):
    pipe = r.pipeline()
    for j in range(i, i + 1000):
        pipe.set(f"key:{j}", j)
    pipe.execute()

Pipeline 与普通模式的性能对比

import time

# 普通模式
start = time.time()
for i in range(10000):
    r.set(f"seq:{i}", i)
print(f"普通模式: {time.time() - start:.2f}s")

# Pipeline 模式
start = time.time()
pipe = r.pipeline()
for i in range(10000):
    pipe.set(f"pipe:{i}", i)
pipe.execute()
print(f"Pipeline: {time.time() - start:.2f}s")

典型测试结果(内网环境)

普通模式: 2.85s
Pipeline: 0.08s

Pipeline 速度提升 30-50 倍,网络延迟越高,提升越明显。

面试要点

  • Pipeline 的核心价值:减少网络往返(RTT)
  • Pipeline 的局限:命令间不能互相依赖
  • Pipeline 不是事务(但可以保证命令顺序)
  • Pipeline 过大可能导致内存问题,建议分批
  • Windows/Linux 下都能用,取决于客户端实现
© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容