Celery 是什么?
Celery 是 Python 生态中最主流的分布式任务队列框架。核心思想:
Producer(生产者)→ Broker(消息中间件)→ Worker(消费者)→ Backend(结果存储)
你的代码调用 .delay() 或 .apply_async(),任务消息被序列化扔进 Broker(通常是 Redis/RabbitMQ),Worker 进程从队列里取出来执行,结果写入 Backend。
app/task.py —— 任务定义与重试/指数退避
基础任务定义
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@app.task 装饰器将函数包装成 Task 对象。调用方式:
add.delay(4, 6) # 最简调用
add.apply_async((4, 6), countdown=10) # 10秒后执行
Task 类的核心属性
@app.task(
bind=True, # self 指向 Task 实例本身
name='myapp.add', # 任务在 broker 中的唯一名称
max_retries=3, # 最大重试次数
default_retry_delay=60, # 默认重试间隔(秒)
acks_late=True, # 执行完再 ack,而非取出即 ack(更安全)
time_limit=300, # 硬超时:强杀进程
soft_time_limit=280, # 软超时:抛 SoftTimeLimitExceeded,可捕获
)
def add(self, x, y):
...
bind=True 是关键 —— 拿到 self 才能调用 self.retry()。
重试 + 指数退避(Exponential Backoff)
指数退避的核心公式:delay = base ^ attempt,加上 jitter(随机抖动)避免惊群。
import random
from celery import Task
from celery.exceptions import MaxRetriesExceededError
from requests.exceptions import RequestException
@app.task(bind=True, max_retries=5)
def fetch_data(self, url: str):
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.json()
except RequestException as exc:
# 指数退避:2^1=2s, 2^2=4s, 2^3=8s, 2^4=16s, 2^5=32s
# 加 jitter 避免多个 worker 同时重试打爆下游
backoff = (2 ** self.request.retries) + random.uniform(0, 1)
try:
raise self.retry(exc=exc, countdown=backoff)
except MaxRetriesExceededError:
# 超过最大重试次数,记录日志或走死信队列
logger.error(f"Task failed after {self.max_retries} retries: {url}")
raise
self.request 常用字段:
| 字段 | 含义 |
|---|---|
self.request.id | 任务 UUID |
self.request.retries | 当前是第几次重试(从 0 开始) |
self.request.args | 位置参数 |
self.request.kwargs | 关键字参数 |
self.request.hostname | 执行的 worker 节点 |
任务状态机
PENDING → STARTED → SUCCESS
↘ FAILURE → RETRY → STARTED → ...
自定义状态(适合长任务进度上报):
@app.task(bind=True)
def long_task(self, items):
total = len(items)
for i, item in enumerate(items):
process(item)
# 实时上报进度
self.update_state(
state='PROGRESS',
meta={'current': i + 1, 'total': total, 'percent': (i+1)/total*100}
)
return {'status': 'done', 'total': total}
worker/consumer.py —— 并发消费者模型
Worker 启动
celery -A myapp worker --loglevel=info --concurrency=8 -P prefork
| 参数 | 说明 |
|---|---|
--concurrency | 并发数(默认 = CPU 核数) |
-P prefork | 多进程(CPU 密集型,默认) |
-P eventlet | 协程(IO 密集型,需安装 eventlet) |
-P gevent | 协程(IO 密集型,需安装 gevent) |
-P solo | 单线程(调试用) |
-P threads | 多线程 |
Consumer 内部流程(源码层面)
Celery Worker 内部是一个事件循环 + 消费者池架构:
Worker 主进程
├── MainProcess(Bootstrap、信号处理)
├── Consumer(连接 Broker,拉取消息)
│ ├── Connection(AMQP/Redis 连接)
│ ├── Channel(逻辑信道)
│ └── QoS(prefetch_count 控流)
└── Pool(执行任务)
├── Worker-1(子进程/线程/协程)
├── Worker-2
└── Worker-N
关键源码路径(Celery 源码):
celery/
├── app/
│ └── task.py ← Task 基类,retry/apply_async 实现
├── worker/
│ ├── consumer/
│ │ ├── consumer.py ← Consumer 主循环
│ │ ├── tasks.py ← 任务接收、反序列化
│ │ └── mingle.py ← Worker 启动时互相握手
│ └── strategy.py ← 消息路由策略
└── concurrency/
├── prefork.py ← 多进程池实现
└── asynpool.py ← 异步进程池
Prefetch(预取)控制 —— 重要性能参数
# celeryconfig.py
worker_prefetch_multiplier = 1 # 每个 worker 每次只预取 1 条消息
# 默认值是 4,即每个 worker 进程预取 4 条
task_acks_late = True # 配合 prefetch=1 确保公平分发
为什么要设为 1?
默认 prefetch=4,worker-1 抢了 4 条任务后挂了
→ 这 4 条任务丢失或延迟重新入队
prefetch=1,每次只拿一条,执行完再拿
→ 负载均衡更公平,任务更安全
并发模型对比
# prefork(多进程)- 适合 CPU 密集
# 每个子进程独立内存空间,GIL 不影响
celery worker -P prefork --concurrency=4
# eventlet/gevent(协程)- 适合 IO 密集(HTTP请求、DB查询)
# 单进程内大量协程,上下文切换开销小
pip install eventlet
celery worker -P eventlet --concurrency=1000
# 实际场景建议
# - 调用外部 API / 发邮件 → gevent/eventlet,concurrency=200~1000
# - 图片处理 / 数据计算 → prefork,concurrency=CPU核数
backends/ —— 结果存储
常用 Backend
# Redis Backend(最常用)
app = Celery('myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1' # 用不同 DB 隔离
)
# Database Backend(结果持久化到 DB)
app = Celery('myapp',
backend='db+postgresql://user:pass@localhost/mydb'
)
# 不需要结果时,关掉 backend 节省资源
@app.task(ignore_result=True)
def fire_and_forget(data):
...
获取任务结果
# 异步提交
result = add.delay(4, 6)
task_id = result.id # 保存这个 ID
# 之后查询(不阻塞)
from celery.result import AsyncResult
res = AsyncResult(task_id, app=app)
print(res.state) # PENDING / STARTED / SUCCESS / FAILURE
print(res.ready()) # True/False
print(res.get()) # 阻塞等待结果(生产环境慎用!)
print(res.get(timeout=5)) # 最多等 5 秒
# 失败时 get() 会重新抛出异常
try:
result = res.get(propagate=True)
except Exception as e:
print(f"Task failed: {e}")
结果过期清理
# 结果默认保存 24 小时,可配置
result_expires = 3600 # 1小时后过期
# 定期清理(配合 Celery Beat)
from celery.schedules import crontab
beat_schedule = {
'cleanup-results': {
'task': 'celery.backend_cleanup',
'schedule': crontab(hour=4, minute=0), # 每天凌晨4点
}
}
Agent 场景
把 Agent 任务接进 Celery:
# agent_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
app = Celery('agent_tasks', broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1')
logger = get_task_logger(__name__)
@app.task(bind=True, max_retries=3)
def run_crew_task(self, topic: str) -> dict:
try:
# 这里放你的 CrewAI kickoff
result = crew.kickoff(inputs={"topic": topic})
return {"status": "success", "output": result.raw}
except Exception as exc:
# 指数退避重试:第1次等2s,第2次等4s,第3次等8s
countdown = 2 ** self.request.retries
logger.warning(f"任务失败,{countdown}s 后重试: {exc}")
raise self.retry(exc=exc, countdown=countdown)
完整生产级配置示例
# celeryconfig.py
# Broker & Backend
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# 序列化
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
# 可靠性
task_acks_late = True
worker_prefetch_multiplier = 1
task_reject_on_worker_lost = True # worker 挂掉时消息重新入队
# 超时
task_soft_time_limit = 300
task_time_limit = 360
# 结果
result_expires = 3600
task_ignore_result = False
# 路由(不同任务走不同队列)
task_routes = {
'myapp.tasks.send_email': {'queue': 'email'},
'myapp.tasks.process_image': {'queue': 'heavy'},
'myapp.tasks.*': {'queue': 'default'},
}
# 针对不同队列启动不同 worker
celery -A myapp worker -Q email -P eventlet --concurrency=500
celery -A myapp worker -Q heavy -P prefork --concurrency=4
celery -A myapp worker -Q default --concurrency=8
学习路径建议
基础 → @app.task 装饰器、delay/apply_async
重试 → self.retry + 指数退避 + acks_late
监控 → Flower(celery flower)可视化 Worker 状态
调度 → Celery Beat(定时任务)
进阶 → Canvas(chain/group/chord 任务编排)
源码 → worker/consumer.py 的 on_task_received 方法
Celery 解决什么问题
Agent 任务是典型的重 IO 操作,跑一次可能需要几十秒甚至几分钟。 HTTP 接口不能一直阻塞等待,用户体验差且容易超时。
Celery 的核心模型:
调用方 Broker(Redis) Worker
│ │ │
│── .delay(args) ──→ │ │
│← task_id 立即返回 │ │
│ │── 消费消息 ──→│
│ │ │── 执行任务
│ │ │
│ │ ← store_result ─│
│── .get(task_id) ──→─────────→ │
│← result ───────────────── │
三个角色:
- Producer(调用方):投任务,拿 task_id,立即返回
- Broker(Redis/RabbitMQ):消息队列,存储待执行的任务消息
- Worker:后台进程,消费队列里的任务,执行完把结果写进 Backend
app/task.py —— 任务定义与重试机制
基础任务定义
from celery import Celery
app = Celery(
'myapp',
broker='redis://localhost:6379/0', # 任务队列
backend='redis://localhost:6379/1' # 结果存储(不同 db)
)
@app.task
def run_agent(topic: str) -> str:
# 你的 Agent 逻辑
return f"结果:{topic}"
@app.task 装饰器把普通函数注册进 Celery 的任务注册表(app.tasks 字典)。
注册时用模块路径作为 key,比如 myapp.tasks.run_agent。
调用方式
# 方式一:delay(最常用)
result = run_agent.delay("MySQL 幻读")
# 等价于:
result = run_agent.apply_async(args=["MySQL 幻读"])
# result 是 AsyncResult 对象,此时任务还在队列里
print(result.id) # task_id,UUID
print(result.status) # PENDING / STARTED / SUCCESS / FAILURE / RETRY
print(result.get(timeout=60)) # 阻塞等待结果,最多等 60s
# 方式二:apply_async(支持更多参数)
result = run_agent.apply_async(
args=["MySQL 幻读"],
countdown=10, # 10 秒后执行
queue='high_priority' # 指定队列
)
重试机制(源码层分析)
Celery 提供两种重试方式:
方式一:手动重试(bind=True)
@app.task(bind=True, max_retries=3)
def call_llm_api(self, prompt: str) -> str:
try:
return llm_client.call(prompt)
except RateLimitError as exc:
# self.request.retries:当前是第几次重试(从0开始)
countdown = 2 ** self.request.retries # 1s, 2s, 4s
raise self.retry(exc=exc, countdown=countdown)
except Exception as exc:
raise self.retry(exc=exc, countdown=60) # 其他错误等 60s
bind=True 让 self 指向任务实例,可以访问 self.request.retries。
raise self.retry(...) 必须用 raise,否则函数会继续执行而不是真正重试。
方式二:autoretry_for(声明式,更简洁)
from requests.exceptions import RequestException
@app.task(
autoretry_for=(RequestException, TimeoutError), # 这些异常自动重试
max_retries=5, # 最多重试 5 次(总共执行 6 次)
retry_backoff=True, # 开启指数退避:1s, 2s, 4s, 8s, 16s
retry_backoff_max=300, # 退避上限 300s(防止等太久)
retry_jitter=True, # 加随机抖动(默认 True,防止雷群效应)
)
def call_external_service(url: str) -> dict:
return requests.get(url).json()
指数退避的计算公式(源自 celery/utils/time.py):
countdown = retry_backoff_factor * (2 ** retry_count)
如果 retry_jitter=True:
countdown = random.randrange(0, countdown + 1)
实测 countdowns(retry_backoff=True,即 factor=1,关闭 jitter):
retry 0 → 1s
retry 1 → 2s
retry 2 → 4s
retry 3 → 8s
retry 4 → 16s
单位测试里验证的序列:[1, 2, 4, 8, 16, 32](见 t/unit/tasks/test_tasks.py)
max_retries 耗尽后:抛出 MaxRetriesExceededError,任务状态变为 FAILURE。
bind=True 下的任务实例属性
@app.task(bind=True)
def my_task(self, arg):
self.request.id # 当前任务的 task_id
self.request.retries # 已重试次数(第一次执行时为 0)
self.request.args # 位置参数
self.request.kwargs # 关键字参数
self.request.hostname # 执行此任务的 worker 主机名
# 主动更新任务状态(用于进度上报)
self.update_state(
state='PROGRESS',
meta={'current': 50, 'total': 100}
)
worker/consumer/consumer.py —— 消费者模型
Worker 的整体架构
Worker 进程启动后,内部由一套 Blueprint(启动步骤链)控制初始化顺序:
# consumer.py 里的 Blueprint.default_steps
default_steps = [
'celery.worker.consumer.connection:Connection', # 1. 建立 Broker 连接
'celery.worker.consumer.mingle:Mingle', # 2. 和其他 Worker 同步状态
'celery.worker.consumer.events:Events', # 3. 启动事件收集
'celery.worker.consumer.gossip:Gossip', # 4. Worker 间 gossip 协议
'celery.worker.consumer.heart:Heart', # 5. 心跳发送
'celery.worker.consumer.control:Control', # 6. 远程控制命令
'celery.worker.consumer.tasks:Tasks', # 7. 开始消费任务队列 ← 核心
'celery.worker.consumer.consumer:Evloop', # 8. 事件循环
]
任务消费的完整链路
从 Broker 收到一条消息,到任务函数被执行,经历以下步骤:
Broker 发来消息
↓
consumer.on_task_received(message)
↓
1. 查 strategies 字典(task_type → 执行策略)
如果任务类型未注册 → 丢弃消息,打 UNKNOWN_TASK_ERROR 日志
↓
2. 限流检查(TokenBucket)
如果超过 rate_limit → 延迟放入定时器队列
↓
3. task_reserved(request) → 加入 reserved_requests 集合
↓
4. on_task_request(request) → 放进线程池执行
↓
5. 任务函数执行
↓
6. 执行完毕 → ack 消息 → store_result 写 Backend
prefetch_count 与并发控制
prefetch_count 控制 Worker 一次从 Broker 预取多少条消息:
# 默认计算方式
prefetch_count = num_processes * prefetch_multiplier
# 默认 prefetch_multiplier=4,4个进程 → prefetch_count=16
# 修改方式
celery -A myapp worker --prefetch-multiplier=1
# prefetch_multiplier=1 意味着每个进程同时只处理一个任务
# 对于耗时长的 Agent 任务,推荐设为 1 防止任务堆积
连接丢失时的处理(源码里的关键逻辑):
def on_connection_error_after_connected(self, exc):
# 连接丢失后,如果开启了 worker_cancel_long_running_tasks_on_connection_loss
for request in tuple(active_requests):
if request.task.acks_late and not request.acknowledged:
# 取消正在执行的任务,让它重新回队列
request.cancel(self.pool)
# 连接恢复后,临时降低 prefetch_count
# 防止把正在处理的任务再次预取导致重复执行
self.initial_prefetch_count = max(
self.prefetch_multiplier,
self.max_prefetch_count - len(tuple(active_requests)) * self.prefetch_multiplier
)
限流:TokenBucket(令牌桶)
# consumer.py 里的限流实现
def bucket_for_task(self, type):
limit = rate(getattr(type, 'rate_limit', None))
return TokenBucket(limit, capacity=1) if limit else None
# 使用方式(在任务定义上)
@app.task(rate_limit='10/m') # 每分钟最多 10 次
def my_task(): ...
@app.task(rate_limit='100/s') # 每秒最多 100 次
def my_fast_task(): ...
对应 Agent 场景里的 LLM API 限流:
@app.task(
rate_limit='60/m', # 对应 RPM=60 的 API 限制
autoretry_for=(RateLimitError,),
retry_backoff=True,
)
def call_llm(prompt: str) -> str: ...
backends/ —— 结果存储
Redis Backend(最常用)
写结果(worker 侧),源自 celery/backends/redis.py:
def _set(self, key, value):
with self.client.pipeline() as pipe:
if self.expires:
pipe.setex(key, self.expires, value) # 带 TTL
else:
pipe.set(key, value)
pipe.publish(key, value) # ← 关键:同时 Pub/Sub 发布
pipe.execute()
两件事同时做:
SET key value:持久化结果PUBLISH key value:通知所有订阅了这个 key 的客户端
读结果(调用方侧):
result = my_task.delay(args)
value = result.get(timeout=30) # 让客户端去“订阅并开始监听“
result.get() 内部不是轮询,而是用 Redis Pub/Sub 订阅 celery-task-meta-{task_id} 频道,收到消息就立刻返回。这是 Redis Backend 比数据库 Backend 响应更快的根本原因。
结果的 key 格式:
celery-task-meta-{task_id}
默认 TTL:result_expires = 86400(1 天),到期自动删除。
| 特性 | Redis Pub/Sub | 专业 MQ(Kafka/RabbitMQ) |
|---|---|---|
| 消息持久化 | ❌ 无 | ✅ 支持 |
| 消息确认 | ❌ 无 | ✅ 支持 |
| 负载均衡 | ❌ 不支持 | ✅ 支持 |
| 消息重试 | ❌ 无 | ✅ 支持 |
| 适用场景 | 轻量、非核心、实时广播 | 核心业务、可靠消息传递 |
| 性能 | 极高 | 高(略低于 Redis) |
任务状态流转
PENDING → 任务已投递,等待 Worker 消费
STARTED → Worker 开始执行(需要 task_track_started=True 才有此状态)
RETRY → 任务失败,正在等待重试
SUCCESS → 执行成功,结果可读
FAILURE → 执行失败,max_retries 耗尽
REVOKED → 任务被手动撤销
acks_late 与消息可靠性
@app.task(acks_late=True)
def reliable_agent_task(topic: str): ...
| acks_late=False(默认) | acks_late=True | |
|---|---|---|
| 何时 ack | 任务开始执行前 | 任务成功执行后 |
| Worker 崩溃时 | 任务丢失 | 任务重新入队 |
| 风险 | 任务丢失 | 任务可能重复执行 |
| 适用场景 | 幂等任务 | 不能丢失的任务 |
Agent 任务如果耗时很长,建议 acks_late=True,但要确保任务是幂等的(相同输入多次执行结果一样)。
注意:acks_late=True 配合 Redis Broker 有一个陷阱——如果任务执行时间超过 Redis 的 visibility_timeout(默认 1 小时),任务会被认为超时,重新投递,导致重复执行。对于长时间 Agent 任务,需要增大 visibility_timeout:
app.conf.broker_transport_options = {
'visibility_timeout': 43200 # 12小时
}
接入 Agent 任务的完整示例
# agent_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
app = Celery(
'agent_tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
app.conf.update(
task_serializer='json',
result_serializer='json',
task_track_started=True, # 记录 STARTED 状态
task_acks_late=True, # 执行完再 ack
worker_prefetch_multiplier=1, # 长任务用 1,防止堆积
result_expires=3600, # 结果保留 1 小时
)
logger = get_task_logger(__name__)
@app.task(
bind=True,
max_retries=3,
autoretry_for=(Exception,),
retry_backoff=True, # 指数退避:1s, 2s, 4s
retry_backoff_max=60, # 最大等待 60s
retry_jitter=True, # 加随机抖动防雷群效应
rate_limit='30/m', # RPM 限流
time_limit=300, # 任务最长执行 5 分钟,超时 kill
soft_time_limit=240, # 4 分钟时先发 SoftTimeLimitExceeded
)
def run_crew_task(self, topic: str) -> dict:
logger.info(f"开始执行 Agent 任务,topic={topic},重试次数={self.request.retries}")
# 上报进度(可选)
self.update_state(state='PROGRESS', meta={'stage': '初始化'})
try:
result = crew.kickoff(inputs={"topic": topic})
return {
"status": "success",
"output": result.raw,
"task_id": self.request.id,
}
except SoftTimeLimitExceeded:
logger.warning("任务即将超时,开始清理")
raise # 让 autoretry 处理
FastAPI 接入:
from fastapi import FastAPI
from celery.result import AsyncResult
api = FastAPI()
@api.post("/agent/run")
async def run_agent(topic: str):
task = run_crew_task.delay(topic)
return {"task_id": task.id, "status": "queued"}
@api.get("/agent/result/{task_id}")
async def get_result(task_id: str):
result = AsyncResult(task_id, app=app)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}
生产环境配置清单
app.conf.update(
# Broker
broker_url='redis://localhost:6379/0',
broker_connection_retry_on_startup=True,
broker_transport_options={
'visibility_timeout': 43200, # 长任务必须增大
},
# Backend
result_backend='redis://localhost:6379/1',
result_expires=3600,
# Worker 行为
worker_prefetch_multiplier=1, # 长任务设 1
task_acks_late=True, # 可靠性优先
task_track_started=True, # 追踪 STARTED 状态
task_serializer='json',
result_serializer='json',
accept_content=['json'],
# 超时保护
task_time_limit=600, # 硬超时 10 分钟
task_soft_time_limit=540, # 软超时 9 分钟(先发信号)
)
需要注意的坑
坑一:Redis Broker + 指数退避 = 任务爆炸
当
retry_backoff的等待时间超过visibility_timeout,Redis 会认为任务超时,重新投递。结合acks_late=True,会出现指数级任务复制(16 → 32 → 64…)。解决方案:增大visibility_timeout到超过最大等待时间。
坑二:Worker 重启时的未完成任务
acks_late=True下,Worker 重启会把正在执行的任务重新投回队列。如果任务不是幂等的,会导致重复执行。对 Agent 任务:确保任务结果写入前做幂等检查(根据 task_id 去重)。
坑三:result.get() 阻塞主线程
result.get()在 FastAPI 异步环境里会阻塞事件循环。需要用asyncio.get_event_loop().run_in_executor包装,或者用轮询接口而不是直接 get。
先理解 FastAPI 的运行模型。FastAPI 基于 asyncio,整个服务跑在一个事件循环里。这个事件循环是单线程的,靠协程切换来处理并发——某个请求在等 IO 时,事件循环切去处理其他请求。
result.get() 是同步阻塞调用,内部是一个 while 循环不断轮询或等待 Redis Pub/Sub 响应。它不会释放事件循环,就是死等。
# 错误写法
@api.get("/result/{task_id}")
async def get_result(task_id: str):
result = AsyncResult(task_id)
value = result.get(timeout=30) # ← 在这里卡住 30 秒
# 这 30 秒内,整个服务无法处理任何其他请求
return value
这就像你开了一家餐厅,只有一个服务员,他去给一桌客人倒水,然后站在那里等客人喝完再说下一步,其他桌的客人全部没人管。
正确做法一:run_in_executor 把阻塞调用扔到线程池
import asyncio
@api.get("/result/{task_id}")
async def get_result(task_id: str):
result = AsyncResult(task_id)
loop = asyncio.get_event_loop()
# 把 result.get 扔到线程池,不占用事件循环
value = await loop.run_in_executor(None, result.get)
return value
线程池里的线程可以随便阻塞,不影响事件循环继续处理其他请求。
正确做法二:轮询接口(更推荐)
不让接口等结果,而是让客户端自己来问:
@api.post("/agent/run")
async def run_agent(topic: str):
task = run_crew_task.delay(topic)
return {"task_id": task.id} # 立刻返回,不等结果
@api.get("/agent/status/{task_id}")
async def get_status(task_id: str):
result = AsyncResult(task_id)
# ready() 和 status 是非阻塞的,只是读一次 Redis
if result.ready():
return {"status": "done", "result": result.result}
return {"status": result.status} # PENDING / STARTED / PROGRESS
客户端每隔几秒请求一次 /status/{task_id},直到拿到 done。这个模式在 Agent 场景里最合适,因为任务本来就是异步的,没必要在服务端等。
坑四:任务注册问题
Worker 需要能 import 到任务模块,否则收到消息时会找不到任务类型(
KeyError: 'task_name'),消息被丢弃。确保 Worker 启动时imports配置包含所有任务模块。
先理解 FastAPI 的运行模型。FastAPI 基于 asyncio,整个服务跑在一个事件循环里。这个事件循环是单线程的,靠协程切换来处理并发——某个请求在等 IO 时,事件循环切去处理其他请求。
result.get() 是同步阻塞调用,内部是一个 while 循环不断轮询或等待 Redis Pub/Sub 响应。它不会释放事件循环,就是死等。
# 错误写法
@api.get("/result/{task_id}")
async def get_result(task_id: str):
result = AsyncResult(task_id)
value = result.get(timeout=30) # ← 在这里卡住 30 秒
# 这 30 秒内,整个服务无法处理任何其他请求
return value
这就像你开了一家餐厅,只有一个服务员,他去给一桌客人倒水,然后站在那里等客人喝完再说下一步,其他桌的客人全部没人管。
正确做法一:run_in_executor 把阻塞调用扔到线程池
import asyncio
@api.get("/result/{task_id}")
async def get_result(task_id: str):
result = AsyncResult(task_id)
loop = asyncio.get_event_loop()
# 把 result.get 扔到线程池,不占用事件循环
value = await loop.run_in_executor(None, result.get)
return value
线程池里的线程可以随便阻塞,不影响事件循环继续处理其他请求。
正确做法二:轮询接口(更推荐)
不让接口等结果,而是让客户端自己来问:
@api.post("/agent/run")
async def run_agent(topic: str):
task = run_crew_task.delay(topic)
return {"task_id": task.id} # 立刻返回,不等结果
@api.get("/agent/status/{task_id}")
async def get_status(task_id: str):
result = AsyncResult(task_id)
# ready() 和 status 是非阻塞的,只是读一次 Redis
if result.ready():
return {"status": "done", "result": result.result}
return {"status": result.status} # PENDING / STARTED / PROGRESS
客户端每隔几秒请求一次 /status/{task_id},直到拿到 done。这个模式在 Agent 场景里最合适,因为任务本来就是异步的,没必要在服务端等。
坑四:任务注册问题
Celery 的任务注册是基于 Python import 的。@app.task 装饰器在模块被 import 时执行,把任务函数注册进 app.tasks 这个字典。
问题在于:Worker 进程和你的 Producer(FastAPI)是两个独立进程。Worker 启动时如果没有 import 到任务模块,它的 app.tasks 字典里就没有这个任务。
Producer 进程 Worker 进程
app.tasks = { app.tasks = {} ← 空的!
"myapp.tasks.run_crew_task"
}
.delay() 把消息投进 Redis →→→→ 收到消息:type="myapp.tasks.run_crew_task"
查 app.tasks["myapp.tasks.run_crew_task"]
KeyError!任务类型未注册
消息被丢弃,打日志:
"Received unregistered task of type"
为什么会出现这个情况:你可能在 FastAPI 的 main.py 里 import 了任务模块,但启动 Worker 时用的是另一个入口文件,没有 import 到任务模块:
# 这样启动 Worker,如果 celery_app.py 里没有 import tasks,就会出问题
celery -A celery_app worker
解决方式:在 Celery app 配置里明确声明要加载的模块:
# celery_app.py
app = Celery('myapp')
app.conf.imports = [
'myapp.tasks.agent_tasks', # Worker 启动时强制 import 这些模块
'myapp.tasks.other_tasks',
]
或者用 autodiscover_tasks,让 Celery 自动扫描所有 app 下的 tasks.py:
app.autodiscover_tasks(['myapp', 'otherapp'])
验证方法:Worker 启动后,日志里会打出已注册的任务列表:
[tasks]
. myapp.tasks.agent_tasks.run_crew_task ← 有这行说明注册成功
. celery.backend_cleanup
没有你的任务名就说明没注册上,去查启动入口的 import 路径。