一、引言
Python 的异步编程经历了从生成器协程到原生 async/await 的演进。asyncio 作为 Python 标准库提供的异步 I/O 框架,背后涉及事件循环、协程调度、任务管理和 IO 多路复用等多个核心组件。本文将从零开始,完整拆解 Python 异步编程的底层实现机制。
二、事件循环(Event Loop)机制
2.1 什么是事件循环
事件循环是实现异步 I/O 的核心调度器。它是一个无限循环,不断检查是否有待处理的 I/O 事件、定时器或回调函数,并按照优先级依次执行:
# 事件循环的伪代码
def event_loop():
while True:
# 1. 检查定时器(TimerHandle)
fire_due_timers()
# 2. 执行 I/O 事件(通过 epoll/select)
for fd, event in poll(events, timeout):
if event & READABLE:
schedule_callback(read_handlers[fd])
if event & WRITABLE:
schedule_callback(write_handlers[fd])
# 3. 执行准备就绪的回调
while ready_queue:
callback = ready_queue.pop(0)
callback()
# 4. 执行空闲任务(idle callbacks)
2.2 不同平台的事件驱动实现
| 平台 | I/O 复用 API | Python 封装 |
|---|---|---|
| Linux | epoll | selector.EpollSelector |
| macOS/FreeBSD | kqueue | selector.KqueueSelector |
| Windows | IOCP | proactor.IocpProactor |
| 跨平台兼容 | select(效率低) | selector.SelectSelector |
flowchart TD
subgraph "事件循环核心流程"
A[开始新迭代] --> B[计算最小超时时间]
B --> C[调用 select/epoll 等待事件]
C --> D{有事件就绪?}
D -->|是| E[读取事件列表]
D -->|否| F[超时返回]
E --> G[执行 I/O 回调]
F --> H[执行到期定时器]
G --> H
H --> I[执行回调队列]
I --> J{还有待处理事件?}
J -->|是| C
J -->|否| A
end
三、async/await 关键字原理
3.1 协程对象
当一个函数被 async def 定义,它不再返回一个普通值,而是返回一个 协程对象:
async def hello():
return "Hello World"
print(hello()) #
print(type(hello())) #
这个协程对象本质上是一个 状态机,其内部通过 __await__ 方法返回一个迭代器:
# 协程的简化实现
class Coroutine:
def __await__(self):
return self._iter_impl().__await__()
def send(self, value):
# 恢复执行,直到下一个 await 或 return
pass
def throw(self, exception):
# 在协程内部抛出异常
pass
3.2 await 关键字的行为
await 等价于 yield from 的语义升级。执行 await 时,当前协程会 挂起自身,将执行权交还给事件循环:
async def fetch_data():
# 1. 创建协程,但不执行
result = await async_io_operation()
# 2. 挂起当前协程,yield 控制权到事件循环
# 3. 当 async_io_operation() 完成时,事件循环 send 结果回来
# 4. 恢复执行
return result
sequenceDiagram
participant EL as 事件循环
participant C1 as 协程A
participant C2 as 协程B
participant IO as IO 操作
EL->>C1: send(None) 启动
C1->>C1: 执行到第一个 await
C1->>IO: 发起 IO 操作
C1->>EL: 挂起(yield 控制权)
EL->>C2: send(None) 启动
C2->>C2: 执行代码
C2->>IO: 发起 IO 操作
C2->>EL: 挂起
Note over EL: 等待 IO 事件...
IO-->>EL: 数据就绪
EL->>C1: send(data) 恢复
C1->>C1: 处理结果
C1->>EL: 完成
四、协程调度与任务管理
4.1 Task 与 Future
asyncio.Task 是 asyncio.Future 的子类,是对协程的封装,使其能被事件循环调度:
class Task(Future):
def __init__(self, coro, *, loop=None, name=None):
super().__init__(loop=loop)
self._coro = coro # 包装的协程
self._name = name
self._loop.call_soon(self.__step)
def __step(self, exc=None):
coro = self._coro
try:
if exc is None:
result = coro.send(None) # 推进协程
else:
result = coro.throw(exc)
except StopIteration as exc:
# 协程完成
self.set_result(exc.value)
except Exception as exc:
self.set_exception(exc)
else:
# 协程挂起,返回了一个 Future
if isinstance(result, Future):
result.add_done_callback(self.__wakeup)
4.2 调度机制
flowchart TD
subgraph "任务调度"
T1[Task 提交] --> Q[就绪队列]
T2[IO 完成] --> Q
T3[定时器到期] --> Q
Q --> S[__step 依次执行]
S --> R{协程返回?}
R -->|await Future| F[Future 未完成]
R -->|StopIteration| D[任务完成]
F --> W[注册回调: 完成后重新入队]
end
4.3 gather 的实现原理
asyncio.gather() 是最常用的并发 API。其实现本质是创建多个 Task 并等待它们全部完成:
async def gather(*coros, return_exceptions=False):
# 为每个协程创建 Task(隐式调度)
tasks = [ensure_future(c) for c in coros]
if not tasks:
return []
# 内部使用 _GatheringFuture 等待所有任务
outer = _GatheringFuture(tasks, loop=loop)
for i, t in enumerate(tasks):
t.add_done_callback(partial(outer._done_callback, i))
return await outer # 挂起直到所有 task 完成
关键点对比:
| 方式 | 是否并发 | 异常处理 | 使用场景 |
|---|---|---|---|
await coro() |
否(顺序) | 自然传播 | 依赖链式调用 |
await asyncio.gather(c1,c2) |
是 | 第一个异常即抛出 | 并行独立任务 |
asyncio.create_task(c()) |
是 | 需单独 try/except | 后台任务 |
asyncio.wait(tasks) |
是 | 任务级控制 | 超时/任意完成 |
五、asyncio 库核心组件剖析
5.1 传输层与协议层(Transport/Protocol)
asyncio 的数据流抽象分为两层:
- Transport(传输层): 负责 socket 的创建、连接、数据读写等底层 I/O 操作
- Protocol(协议层): 定义数据到达、连接建立、连接关闭时的回调
class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
# TCP 数据到达回调
self.transport.write(data) # 回写
def connection_lost(self, exc):
# 连接关闭
pass
5.2 Stream 模式(高级 API)
StreamReader 和 StreamWriter 是基于 Protocol 的更高层封装,提供类似文件读写的 API:
async def tcp_echo_client():
reader, writer = await asyncio.open_connection(
'localhost', 8888)
writer.write(b'Hello')
await writer.drain() # 等待发送缓冲区清空
data = await reader.read(100)
writer.close()
await writer.wait_closed()
注意: 常见的错误是忘记 await writer.drain(),这会导致数据可能没有完全发送就被关闭连接。
5.3 同步原语
asyncio 提供了线程版对应的异步同步原语:
async def worker(lock, name):
async with lock: # 异步上下文管理器
print(f"{name} 获取了锁")
await asyncio.sleep(1)
| 同步原语 | 线程版 | 异步版 | 区别 |
|---|---|---|---|
| 锁 | threading.Lock | asyncio.Lock | await 不阻塞事件循环 |
| 事件 | threading.Event | asyncio.Event | await wait 挂起协程 |
| 条件变量 | threading.Condition | asyncio.Condition | await notify 异步通知 |
| 信号量 | threading.Semaphore | asyncio.Semaphore | 支持 async with |
重要区别: 异步锁在等待时不会阻塞事件循环,其他协程可以继续执行。而线程锁在等待时整个线程都被阻塞。
5.4 运行引擎对比
| 执行方式 | 版本 | 特点 | 适用场景 |
|---|---|---|---|
asyncio.run() |
3.7+ | 创建新事件循环,运行完毕关闭 | 主入口,推荐使用 |
loop.run_until_complete() |
3.5+ | 需手动管理循环 | 旧代码兼容 |
loop.run_forever() |
3.5+ | 永久运行 | 长期服务 |
六、同步 vs 异步性能对比
6.1 阻塞调用对比
# 同步版本
def sync_fetch_all(urls):
results = []
for url in urls:
resp = requests.get(url) # 阻塞 100ms
results.append(resp.text)
return results
# 100 个 URL → 10 秒
# 异步版本
async def async_fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 100 个 URL → ~100ms(并发时)
6.2 性能数据模型
flowchart LR
subgraph "同步模型"
S1[请求1 100ms] --> S2[请求2 100ms] --> S3[请求3 100ms]
end
subgraph "异步模型"
A1[请求1 100ms] -.- A2[请求2 100ms]
A2 -.- A3[请求3 100ms]
A1 -.- T[总时长 ≈100ms]
A3 -.- T
end
6.3 何时用同步,何时用异步
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| CPU 密集型 | 同步 + 多进程 | async 解决不了 CPU 阻塞 |
| I/O 密集型(网络/文件) | 异步 | 并发等待时间大幅减少 |
| 简单脚本 | 同步 | 代码更简单直观 |
| Web 服务 | 异步 | 高并发连接处理 |
| 计算密集型混合 | 异步 + 线程池 | loop.run_in_executor() |
误区澄清: 异步编程不会让代码执行更快,但能让等待并发化。如果有 10 次网络请求各 100ms,同步需要 1000ms,异步 ≈ 100ms(并发)。但如果是 10 次 CPU 密集型计算各 100ms,异步也是 1000ms。
七、总结
Python 异步编程的核心在于事件循环驱动的协作式多任务。与操作系统抢占式调度不同,协程主动在 I/O 等待点 await 挂起自己,将执行权交还给事件循环。这种模型在 I/O 密集型场景下达到了极高的吞吐效率。
关键要点总结:
- 事件循环是调度核心,使用
epoll/kqueue实现高效的 I/O 事件监听 - 协程本质是状态机,
await是挂起点也是恢复点 - Task 包装协程使其可被调度,内部通过
__step方法驱动 - 异步 ≠ 并行,适合 I/O 密集型而非 CPU 密集型
- 正确管理资源:异步上下文管理器(
async with)和 Task 的生命周期管理


暂无评论内容