Python 异步迭代器的使用场景

17次阅读

async for 仅适用于 i / o 密集型流式场景,如数据库游标、消息队列、流式下载;须配合真正异步的__aiter__和__anext__实现,禁用于内存迭代或伪异步;并发处理需限流,避免任务堆积;注意 python 版本与库兼容性。

Python 异步迭代器的使用场景

async for 用在哪些地方才不白写

异步迭代器不是为了炫技,只在真正需要「边拉数据边处理」且 I/O 占主导的场景里才有意义。比如从数据库游标逐行读大结果集、消费消息队列(Kafka / Redis Stream)、流式下载文件并实时解压——这些操作本身耗时、不可跳过,又不能等全部加载完再处理。

常见错误是把纯内存循环(如 range(1000))包进 async for:它不会变快,反而因协程调度开销更慢;或者误以为加了 async 就自动并发,其实 async for 是串行的,除非你显式并发启动多个任务。

  • 必须配合实现了 __aiter____anext__ 的对象,比如 aiofiles 的文件句柄、aiomysql.Cursorasyncpg.SaConnection
  • 不能用在普通生成器或列表上;async for x in [1,2,3]: 直接报 TypeError: 'list' object is not an async iterator
  • 如果底层 I/O 没有真正异步(比如用 threading 模拟的“假异步”),那整个链路还是阻塞的

怎么写一个靠谱的异步迭代器

自己实现 __aiter____anext__ 很容易漏掉边界逻辑。最稳妥的方式是用 async def __aiter__(self) 返回 self,再在 __anext__ 里做实际的 await 工作,并手动抛出 StopAsyncIteration

别直接在 __aiter__await 初始化动作——这会导致 async for 启动前就卡住;初始化应放在 __anext__ 首次调用时,或提前在构造函数里完成。

立即学习 Python 免费学习笔记(深入)”;

  • __anext__ 必须返回一个 awaitable,不能直接 return 值;也不能忘了 raise StopAsyncIteration 结束信号
  • 如果迭代过程中可能抛异常(如网络断开),要确保异常能透出,不要静默吞掉;否则 async for 会卡死或无限重试
  • 示例片段:
    async def __anext__(self):     if self._done:         raise StopAsyncIteration     try:         data = await self._fetch_one()         if not data:             self._done = True             raise StopAsyncIteration         return data     except ConnectionError:         raise

和普通 for、asyncio.gather 混用时的坑

很多人想“一边异步迭代,一边并发处理每项”,于是写出 async for item in it: await asyncio.create_task(handle(item)) ——这其实没并发,只是不断新建 task 而不 await 它们,最后可能堆积成千上万个 pending task,OOM 或被 event loop 拖垮。

真要并发处理,得控制并发数:asyncio.Semaphore 或用 asyncio.gather(*tasks, return_exceptions=True) 批量提交,但注意:后者要求你先把所有待处理项攒出来,违背了流式迭代的初衷。

  • 别在 async for 循环体里无节制地 create_task;要用 asyncio.Semaphore(5) 限流
  • gather 适合已知长度、可批量获取的场景(如并发请求 10 个 URL),不适合无限流或大结果集
  • 混合使用时,记得处理 return_exceptions=True 下的异常:isinstance(x, Exception) 判断,而不是直接 print(x)

async for 在不同 Python 版本里的行为差异

Python 3.5 引入 async for,但直到 3.7 才默认启用 asyncio.run(),而 3.8 开始 asyncio.create_task() 成为推荐方式。低版本下容易踩兼容性雷。

最隐蔽的问题是:某些第三方库(尤其老版本 aiohttpaiomysql)在 3.6 下返回的异步迭代器,可能在 3.9+ 因 event loop 策略变更而提前关闭连接,报 RuntimeError: socket is closed

  • 检查库文档是否明确支持你用的 Python 版本;优先选标注了 py37+py38+ 的版本
  • 避免在 __aexit__ 未执行完时就退出 event loop;用 asyncio.run(main()) 而非手动 loop.run_until_complete()
  • 如果遇到 DeprecationWarning: There is no current event loop,说明你在非 async 上下文里用了 async 迭代器,比如在普通函数里直接写 async for
事情说清了就结束

text=ZqhQzanResources