Pipeline 批量操作:减少网络往返的”打包神器”
什么是 Pipeline
Pipeline(流水线/管道)是 Redis 客户端提供的一种批量操作模式,允许客户端将多条命令一次性发送到服务端,然后一次性接收所有返回结果。
为什么需要 Pipeline
没有 Pipeline 的问题
# 普通模式:每条命令一次网络往返
for i in range(1000):
r.set(f"key:{i}", i)
在没有 Pipeline 的情况下,每个 SET 命令都需要经历:
客户端 → 发送命令 → 服务端处理 → 返回结果 → 客户端
1000 次 SET = 1000 次网络往返(每往返约 0.5ms 内网延迟)= 约 500ms 网络开销
使用 Pipeline
# Pipeline 模式:打包发送
pipe = r.pipeline()
for i in range(1000):
pipe.set(f"key:{i}", i)
pipe.execute() # 一次网络往返
1000 次 SET = 1 次网络往返 = 约 0.5ms 网络开销
Pipeline 的工作原理
普通模式:
客户端: [SET a 1]───→[SET b 2]───→[GET c]────→
服务端: ←[OK]── ←[OK]── ←[3]──
Pipeline 模式:
客户端: [SET a 1][SET b 2][GET c]──────────────────────→
服务端: ←[OK][OK][3]───
核心思想:将多个命令打包,减少网络往返次数。
使用示例
Python Redis 客户端
import redis
r = redis.Redis(host='localhost', port=6379)
# 批量写入
pipe = r.pipeline()
pipe.set('name', 'Alice')
pipe.set('age', '30')
pipe.set('city', 'Beijing')
results = pipe.execute()
# results = [True, True, True]
# 批量读取
pipe = r.pipeline()
pipe.get('name')
pipe.get('age')
pipe.get('city')
results = pipe.execute()
# results = [b'Alice', b'30', b'Beijing']
Jedis (Java)
Jedis jedis = new Jedis("localhost");
Pipeline pipeline = jedis.pipelined();
// 批量设置
for (int i = 0; i < 1000; i++) {
pipeline.set("key:" + i, String.valueOf(i));
}
List<Object> results = pipeline.syncAndReturnAll();
Redis CLI 模拟
# 使用 --pipe 模式
echo -e '*3\r\n$3\r\nSET\r\n$5\r\nkey1\r\n$5\r\nvalue\r\n' | redis-cli --pipe
Pipeline 的适用场景
| 场景 | 适合 Pipeline | 原因 |
|---|---|---|
| 大量写入 | ✅ | 减少网络开销,提升 TPS |
| 批量读取 | ✅ | 单次读取多个 key |
| 批量删除 | ✅ | 一次性清理大量 key |
| 数据初始化 | ✅ | 导入大量数据 |
| 强依赖顺序 | ⚠️ | Pipeline 保证命令顺序执行 |
| 依赖中间结果的命令 | ❌ | Pipeline 命令间不能依赖 |
Pipeline 的限制
1. 命令间不能有依赖关系
# ❌ 错误:后续命令依赖前面命令的结果
pipe = r.pipeline()
pipe.set('counter', 0)
pipe.incr('counter') # 没问题,INCER 不依赖 GET 结果
# ❌ 更典型的问题:无法实现"读-改-写"
pipe.get('balance') # 需要这个结果来做判断
pipe.decrby('balance', amount) # 但这里不确定余额是否足够
pipe.execute()
对于这种”读-改-写”的场景,需要使用 Lua 脚本或 WATCH 事务。
2. Pipeline 不是事务
# Pipeline 中的命令执行是顺序的
# 但如果中间某条命令失败,其他命令仍然会执行
# Pipeline 不保证事务隔离
3. 内存消耗
Pipeline 一次性发送大量命令时,客户端和服务端都需要缓冲这些命令。过大的 Pipeline 会消耗内存。
# 分批 Pipeline
for i in range(0, 100000, 1000):
pipe = r.pipeline()
for j in range(i, i + 1000):
pipe.set(f"key:{j}", j)
pipe.execute()
Pipeline 与普通模式的性能对比
import time
# 普通模式
start = time.time()
for i in range(10000):
r.set(f"seq:{i}", i)
print(f"普通模式: {time.time() - start:.2f}s")
# Pipeline 模式
start = time.time()
pipe = r.pipeline()
for i in range(10000):
pipe.set(f"pipe:{i}", i)
pipe.execute()
print(f"Pipeline: {time.time() - start:.2f}s")
典型测试结果(内网环境):
普通模式: 2.85s
Pipeline: 0.08s
Pipeline 速度提升 30-50 倍,网络延迟越高,提升越明显。
面试要点
- Pipeline 的核心价值:减少网络往返(RTT)
- Pipeline 的局限:命令间不能互相依赖
- Pipeline 不是事务(但可以保证命令顺序)
- Pipeline 过大可能导致内存问题,建议分批
- Windows/Linux 下都能用,取决于客户端实现
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END


暂无评论内容