Python 任务队列模型的基本实现

11次阅读

应使用队列而非直接 threading 是因队列能解耦任务提交与执行,实现可控并发、失败追溯及异步 / 延迟 / 重试语义;queue.queue 非持久、非跨进程、易卡死,需合理设 timeout 并避免误用;复杂场景应换 celery 或 rq。

Python 任务队列模型的基本实现

为什么不用 threading 直接跑任务,而要搞队列

因为并发控制和任务生命周期管理会失控。比如你用 threading.Thread 启一堆线程去处理 HTTP 请求,很快就会遇到连接池耗尽、CPU 抢占严重、异常任务无法重试、甚至主线程退出后子线程还在野跑的问题。

队列模型的核心价值不是“多线程”,而是把「任务提交」和「任务执行」解耦——你只管 put,有人(消费者)在另一端 get 并按规则处理。这带来三件事:可控的并发数、失败可追溯、支持异步 / 延迟 / 重试语义。

  • 别把 queue.Queue 当作“万能缓冲区”塞满就完事;它不持久、不跨进程、不抗崩溃
  • 如果任务需要重启后继续,queue.Queue 一断电就全丢,得换 redissqlite 做底层存储
  • queue.Queuejoin()task_done() 容易漏调用,导致主线程永远卡住

queue.Queue 的阻塞与超时怎么设才不卡死

默认 get() 是无限阻塞的,生产环境几乎不能用。必须配 timeout,但设太短会频繁抛 queue.Empty,设太长又拖慢响应。

典型做法是:消费者循环里用 get(timeout=1),配合 try/except queue.Empty 做轻量轮询,既避免空转耗 CPU,也不至于卡死。

立即学习 Python 免费学习笔记(深入)”;

  • put() 也建议加 timeout,否则生产者可能被堵死在满队列上(尤其 maxsize 设小了)
  • queue.Queue(maxsize=0) 不等于“无限”,而是系统限制下的最大值,实际仍受内存约束
  • 不要在多进程里共享同一个 queue.Queue 实例——它不是进程安全的,要用 multiprocessing.Queue

怎么让一个任务失败后自动重试三次再进死信队列

标准库 queue.Queue 不提供重试逻辑,得自己包一层。关键点不在“重试几次”,而在“怎么定义失败”和“怎么隔离失败任务”。

简单可靠的做法:每个任务封装成带元数据的字典,包含 funcargskwargsretry_countmax_retries。消费者执行时捕获异常,若未达上限就 put() 回队列(retry_count += 1),否则写入本地文件或发到 dead_letter_queue

  • 别在重试时直接 time.sleep(1) —— 这会阻塞整个消费者线程;应把延迟时间作为任务属性,由调度器统一处理
  • 重试任务和新任务混在同一个队列里,容易饿死新任务;建议拆成 main_queueretry_queue 两个实例
  • 如果用 functools.partial 封装函数,注意它不可序列化,跨进程或持久化时会报 PicklingError

什么时候该换掉 queue.Queue,上 celeryrq

当你的任务开始涉及远程调用、需要 Web 管理界面、要求定时触发、或者团队里有人改代码但不知道 task_done() 必须配对调用的时候,就该换了。

celery 重但全,rq 轻但只支持 redis。两者都帮你把序列化、连接管理、结果存储、重试策略、监控埋点这些脏活干完了。代价是:启动额外服务(redisrabbitmq)、学习新配置项、调试链路变长。

  • 本地开发快速验证用 rq 更顺手,redis-server 起一个就行;celery 默认还要配 brokerbackend 两套地址
  • rq 的任务函数必须在 PATH 可导入路径下,不能是临时定义的 lambda 或嵌套函数
  • 别在 celery 任务里直接操作全局变量或文件句柄——worker 是多进程的,状态不共享

真正难的从来不是“怎么实现队列”,而是决定哪些任务值得放进队列、失败后谁来兜底、以及日志里那条 Task xyz raised unexpected: KeyError('user_id') 到底该告警还是静默丢弃。

text=ZqhQzanResources