公开笔记

Celery 源码解读

详解 Python 分布式任务队列 Celery 核心原理、源码模块(task/consumer/backends)、AI Agent 场景落地方案,涵盖任务定义、重试机制、并发模型、结果存储、生产级配置,及 FastAPI 集成、常见坑避坑指南,适配长耗时异步任务开发。

发布于 更新于

Celery 是什么?

Celery 是 Python 生态中最主流的分布式任务队列框架。核心思想:

Producer(生产者)→ Broker(消息中间件)→ Worker(消费者)→ Backend(结果存储)

你的代码调用 .delay().apply_async(),任务消息被序列化扔进 Broker(通常是 Redis/RabbitMQ),Worker 进程从队列里取出来执行,结果写入 Backend。


app/task.py —— 任务定义与重试/指数退避

基础任务定义

from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@app.task 装饰器将函数包装成 Task 对象。调用方式:

add.delay(4, 6)                        # 最简调用
add.apply_async((4, 6), countdown=10) # 10秒后执行

Task 类的核心属性

@app.task(
    bind=True,          # self 指向 Task 实例本身
    name='myapp.add',   # 任务在 broker 中的唯一名称
    max_retries=3,       # 最大重试次数
    default_retry_delay=60,  # 默认重试间隔(秒)
    acks_late=True,      # 执行完再 ack,而非取出即 ack(更安全)
    time_limit=300,      # 硬超时:强杀进程
    soft_time_limit=280, # 软超时:抛 SoftTimeLimitExceeded,可捕获
)
def add(self, x, y):
    ...

bind=True 是关键 —— 拿到 self 才能调用 self.retry()

重试 + 指数退避(Exponential Backoff)

指数退避的核心公式:delay = base ^ attempt,加上 jitter(随机抖动)避免惊群。

import random
from celery import Task
from celery.exceptions import MaxRetriesExceededError
from requests.exceptions import RequestException

@app.task(bind=True, max_retries=5)
def fetch_data(self, url: str):
    try:
        resp = requests.get(url, timeout=10)
        resp.raise_for_status()
        return resp.json()

    except RequestException as exc:
        # 指数退避:2^1=2s, 2^2=4s, 2^3=8s, 2^4=16s, 2^5=32s
        # 加 jitter 避免多个 worker 同时重试打爆下游
        backoff = (2 ** self.request.retries) + random.uniform(0, 1)
        
        try:
            raise self.retry(exc=exc, countdown=backoff)
        except MaxRetriesExceededError:
            # 超过最大重试次数,记录日志或走死信队列
            logger.error(f"Task failed after {self.max_retries} retries: {url}")
            raise

self.request 常用字段:

字段含义
self.request.id任务 UUID
self.request.retries当前是第几次重试(从 0 开始)
self.request.args位置参数
self.request.kwargs关键字参数
self.request.hostname执行的 worker 节点

任务状态机

PENDING → STARTED → SUCCESS
                  ↘ FAILURE → RETRY → STARTED → ...

自定义状态(适合长任务进度上报):

@app.task(bind=True)
def long_task(self, items):
    total = len(items)
    for i, item in enumerate(items):
        process(item)
        # 实时上报进度
        self.update_state(
            state='PROGRESS',
            meta={'current': i + 1, 'total': total, 'percent': (i+1)/total*100}
        )
    return {'status': 'done', 'total': total}

worker/consumer.py —— 并发消费者模型

Worker 启动

celery -A myapp worker --loglevel=info --concurrency=8 -P prefork
参数说明
--concurrency并发数(默认 = CPU 核数)
-P prefork多进程(CPU 密集型,默认)
-P eventlet协程(IO 密集型,需安装 eventlet)
-P gevent协程(IO 密集型,需安装 gevent)
-P solo单线程(调试用)
-P threads多线程

Consumer 内部流程(源码层面)

Celery Worker 内部是一个事件循环 + 消费者池架构:

Worker 主进程
├── MainProcess(Bootstrap、信号处理)
├── Consumer(连接 Broker,拉取消息)
│   ├── Connection(AMQP/Redis 连接)
│   ├── Channel(逻辑信道)
│   └── QoS(prefetch_count 控流)
└── Pool(执行任务)
    ├── Worker-1(子进程/线程/协程)
    ├── Worker-2
    └── Worker-N

关键源码路径(Celery 源码):

celery/
├── app/
│   └── task.py        ← Task 基类,retry/apply_async 实现
├── worker/
│   ├── consumer/
│   │   ├── consumer.py   ← Consumer 主循环
│   │   ├── tasks.py      ← 任务接收、反序列化
│   │   └── mingle.py     ← Worker 启动时互相握手
│   └── strategy.py       ← 消息路由策略
└── concurrency/
    ├── prefork.py        ← 多进程池实现
    └── asynpool.py       ← 异步进程池

Prefetch(预取)控制 —— 重要性能参数

# celeryconfig.py
worker_prefetch_multiplier = 1   # 每个 worker 每次只预取 1 条消息
# 默认值是 4,即每个 worker 进程预取 4 条

task_acks_late = True            # 配合 prefetch=1 确保公平分发

为什么要设为 1?

默认 prefetch=4,worker-1 抢了 4 条任务后挂了
→ 这 4 条任务丢失或延迟重新入队

prefetch=1,每次只拿一条,执行完再拿
→ 负载均衡更公平,任务更安全

并发模型对比

# prefork(多进程)- 适合 CPU 密集
# 每个子进程独立内存空间,GIL 不影响
celery worker -P prefork --concurrency=4

# eventlet/gevent(协程)- 适合 IO 密集(HTTP请求、DB查询)
# 单进程内大量协程,上下文切换开销小
pip install eventlet
celery worker -P eventlet --concurrency=1000

# 实际场景建议
# - 调用外部 API / 发邮件 → gevent/eventlet,concurrency=200~1000
# - 图片处理 / 数据计算  → prefork,concurrency=CPU核数

backends/ —— 结果存储

常用 Backend

# Redis Backend(最常用)
app = Celery('myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'  # 用不同 DB 隔离
)

# Database Backend(结果持久化到 DB)
app = Celery('myapp',
    backend='db+postgresql://user:pass@localhost/mydb'
)

# 不需要结果时,关掉 backend 节省资源
@app.task(ignore_result=True)
def fire_and_forget(data):
    ...

获取任务结果

# 异步提交
result = add.delay(4, 6)
task_id = result.id   # 保存这个 ID

# 之后查询(不阻塞)
from celery.result import AsyncResult
res = AsyncResult(task_id, app=app)

print(res.state)    # PENDING / STARTED / SUCCESS / FAILURE
print(res.ready())  # True/False
print(res.get())    # 阻塞等待结果(生产环境慎用!)
print(res.get(timeout=5))  # 最多等 5 秒

# 失败时 get() 会重新抛出异常
try:
    result = res.get(propagate=True)
except Exception as e:
    print(f"Task failed: {e}")

结果过期清理

# 结果默认保存 24 小时,可配置
result_expires = 3600  # 1小时后过期

# 定期清理(配合 Celery Beat)
from celery.schedules import crontab
beat_schedule = {
    'cleanup-results': {
        'task': 'celery.backend_cleanup',
        'schedule': crontab(hour=4, minute=0),  # 每天凌晨4点
    }
}

Agent 场景

把 Agent 任务接进 Celery:

# agent_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger

app = Celery('agent_tasks', broker='redis://localhost:6379/0',
             backend='redis://localhost:6379/1')
logger = get_task_logger(__name__)

@app.task(bind=True, max_retries=3)
def run_crew_task(self, topic: str) -> dict:
    try:
        # 这里放你的 CrewAI kickoff
        result = crew.kickoff(inputs={"topic": topic})
        return {"status": "success", "output": result.raw}
    except Exception as exc:
        # 指数退避重试:第1次等2s,第2次等4s,第3次等8s
        countdown = 2 ** self.request.retries
        logger.warning(f"任务失败,{countdown}s 后重试: {exc}")
        raise self.retry(exc=exc, countdown=countdown)

完整生产级配置示例

# celeryconfig.py

# Broker & Backend
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'

# 序列化
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

# 可靠性
task_acks_late = True
worker_prefetch_multiplier = 1
task_reject_on_worker_lost = True  # worker 挂掉时消息重新入队

# 超时
task_soft_time_limit = 300
task_time_limit = 360

# 结果
result_expires = 3600
task_ignore_result = False

# 路由(不同任务走不同队列)
task_routes = {
    'myapp.tasks.send_email': {'queue': 'email'},
    'myapp.tasks.process_image': {'queue': 'heavy'},
    'myapp.tasks.*': {'queue': 'default'},
}
# 针对不同队列启动不同 worker
celery -A myapp worker -Q email -P eventlet --concurrency=500
celery -A myapp worker -Q heavy -P prefork --concurrency=4
celery -A myapp worker -Q default --concurrency=8

学习路径建议

基础  → @app.task 装饰器、delay/apply_async
重试  → self.retry + 指数退避 + acks_late
监控  → Flower(celery flower)可视化 Worker 状态
调度  → Celery Beat(定时任务)
进阶  → Canvas(chain/group/chord 任务编排)
源码  → worker/consumer.py 的 on_task_received 方法

Celery 解决什么问题

Agent 任务是典型的重 IO 操作,跑一次可能需要几十秒甚至几分钟。 HTTP 接口不能一直阻塞等待,用户体验差且容易超时。

Celery 的核心模型

调用方                    Broker(Redis)         Worker
  │                         │                    │
  │── .delay(args) ──→ │                    │
  │← task_id 立即返回      │                    │
  │                         │── 消费消息 ──→│
  │                         │                    │── 执行任务
  │                         │                    │
  │                         │ ←  store_result ─│
  │── .get(task_id) ──→─────────→  │
  │←  result ─────────────────  │

三个角色:

  • Producer(调用方):投任务,拿 task_id,立即返回
  • Broker(Redis/RabbitMQ):消息队列,存储待执行的任务消息
  • Worker:后台进程,消费队列里的任务,执行完把结果写进 Backend

app/task.py —— 任务定义与重试机制

基础任务定义

from celery import Celery

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',   # 任务队列
    backend='redis://localhost:6379/1'   # 结果存储(不同 db)
)

@app.task
def run_agent(topic: str) -> str:
    # 你的 Agent 逻辑
    return f"结果:{topic}"

@app.task 装饰器把普通函数注册进 Celery 的任务注册表(app.tasks 字典)。
注册时用模块路径作为 key,比如 myapp.tasks.run_agent

调用方式

# 方式一:delay(最常用)
result = run_agent.delay("MySQL 幻读")
# 等价于:
result = run_agent.apply_async(args=["MySQL 幻读"])

# result 是 AsyncResult 对象,此时任务还在队列里
print(result.id)           # task_id,UUID
print(result.status)       # PENDING / STARTED / SUCCESS / FAILURE / RETRY
print(result.get(timeout=60))  # 阻塞等待结果,最多等 60s

# 方式二:apply_async(支持更多参数)
result = run_agent.apply_async(
    args=["MySQL 幻读"],
    countdown=10,        # 10 秒后执行
    queue='high_priority' # 指定队列
)

重试机制(源码层分析)

Celery 提供两种重试方式:

方式一:手动重试(bind=True)

@app.task(bind=True, max_retries=3)
def call_llm_api(self, prompt: str) -> str:
    try:
        return llm_client.call(prompt)
    except RateLimitError as exc:
        # self.request.retries:当前是第几次重试(从0开始)
        countdown = 2 ** self.request.retries  # 1s, 2s, 4s
        raise self.retry(exc=exc, countdown=countdown)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)  # 其他错误等 60s

bind=Trueself 指向任务实例,可以访问 self.request.retries
raise self.retry(...) 必须用 raise,否则函数会继续执行而不是真正重试。

方式二:autoretry_for(声明式,更简洁)

from requests.exceptions import RequestException

@app.task(
    autoretry_for=(RequestException, TimeoutError),  # 这些异常自动重试
    max_retries=5,           # 最多重试 5 次(总共执行 6 次)
    retry_backoff=True,      # 开启指数退避:1s, 2s, 4s, 8s, 16s
    retry_backoff_max=300,   # 退避上限 300s(防止等太久)
    retry_jitter=True,       # 加随机抖动(默认 True,防止雷群效应)
)
def call_external_service(url: str) -> dict:
    return requests.get(url).json()

指数退避的计算公式(源自 celery/utils/time.py):

countdown = retry_backoff_factor * (2 ** retry_count)

如果 retry_jitter=True
countdown = random.randrange(0, countdown + 1)

实测 countdowns(retry_backoff=True,即 factor=1,关闭 jitter):

retry 0 → 1s
retry 1 → 2s  
retry 2 → 4s
retry 3 → 8s
retry 4 → 16s

单位测试里验证的序列:[1, 2, 4, 8, 16, 32](见 t/unit/tasks/test_tasks.py

max_retries 耗尽后:抛出 MaxRetriesExceededError,任务状态变为 FAILURE

bind=True 下的任务实例属性

@app.task(bind=True)
def my_task(self, arg):
    self.request.id          # 当前任务的 task_id
    self.request.retries     # 已重试次数(第一次执行时为 0)
    self.request.args        # 位置参数
    self.request.kwargs      # 关键字参数
    self.request.hostname    # 执行此任务的 worker 主机名
    
    # 主动更新任务状态(用于进度上报)
    self.update_state(
        state='PROGRESS',
        meta={'current': 50, 'total': 100}
    )

worker/consumer/consumer.py —— 消费者模型

Worker 的整体架构

Worker 进程启动后,内部由一套 Blueprint(启动步骤链)控制初始化顺序:

# consumer.py 里的 Blueprint.default_steps
default_steps = [
    'celery.worker.consumer.connection:Connection',  # 1. 建立 Broker 连接
    'celery.worker.consumer.mingle:Mingle',          # 2. 和其他 Worker 同步状态
    'celery.worker.consumer.events:Events',          # 3. 启动事件收集
    'celery.worker.consumer.gossip:Gossip',          # 4. Worker 间 gossip 协议
    'celery.worker.consumer.heart:Heart',            # 5. 心跳发送
    'celery.worker.consumer.control:Control',        # 6. 远程控制命令
    'celery.worker.consumer.tasks:Tasks',            # 7. 开始消费任务队列 ← 核心
    'celery.worker.consumer.consumer:Evloop',        # 8. 事件循环
]

任务消费的完整链路

从 Broker 收到一条消息,到任务函数被执行,经历以下步骤:

Broker 发来消息

consumer.on_task_received(message)

1. 查 strategies 字典(task_type → 执行策略)
   如果任务类型未注册 → 丢弃消息,打 UNKNOWN_TASK_ERROR 日志

2. 限流检查(TokenBucket)
   如果超过 rate_limit → 延迟放入定时器队列

3. task_reserved(request) → 加入 reserved_requests 集合

4. on_task_request(request) → 放进线程池执行

5. 任务函数执行

6. 执行完毕 → ack 消息 → store_result 写 Backend

prefetch_count 与并发控制

prefetch_count 控制 Worker 一次从 Broker 预取多少条消息:

# 默认计算方式
prefetch_count = num_processes * prefetch_multiplier
# 默认 prefetch_multiplier=4,4个进程 → prefetch_count=16

# 修改方式
celery -A myapp worker --prefetch-multiplier=1
# prefetch_multiplier=1 意味着每个进程同时只处理一个任务
# 对于耗时长的 Agent 任务,推荐设为 1 防止任务堆积

连接丢失时的处理(源码里的关键逻辑):

def on_connection_error_after_connected(self, exc):
    # 连接丢失后,如果开启了 worker_cancel_long_running_tasks_on_connection_loss
    for request in tuple(active_requests):
        if request.task.acks_late and not request.acknowledged:
            # 取消正在执行的任务,让它重新回队列
            request.cancel(self.pool)
    
    # 连接恢复后,临时降低 prefetch_count
    # 防止把正在处理的任务再次预取导致重复执行
    self.initial_prefetch_count = max(
        self.prefetch_multiplier,
        self.max_prefetch_count - len(tuple(active_requests)) * self.prefetch_multiplier
    )

限流:TokenBucket(令牌桶)

# consumer.py 里的限流实现
def bucket_for_task(self, type):
    limit = rate(getattr(type, 'rate_limit', None))
    return TokenBucket(limit, capacity=1) if limit else None

# 使用方式(在任务定义上)
@app.task(rate_limit='10/m')  # 每分钟最多 10 次
def my_task(): ...

@app.task(rate_limit='100/s')  # 每秒最多 100 次
def my_fast_task(): ...

对应 Agent 场景里的 LLM API 限流:

@app.task(
    rate_limit='60/m',  # 对应 RPM=60 的 API 限制
    autoretry_for=(RateLimitError,),
    retry_backoff=True,
)
def call_llm(prompt: str) -> str: ...

backends/ —— 结果存储

Redis Backend(最常用)

写结果(worker 侧),源自 celery/backends/redis.py

def _set(self, key, value):
    with self.client.pipeline() as pipe:
        if self.expires:
            pipe.setex(key, self.expires, value)  # 带 TTL
        else:
            pipe.set(key, value)
        pipe.publish(key, value)  # ← 关键:同时 Pub/Sub 发布
        pipe.execute()

两件事同时做:

  1. SET key value:持久化结果
  2. PUBLISH key value:通知所有订阅了这个 key 的客户端

读结果(调用方侧)

result = my_task.delay(args)
value = result.get(timeout=30) # 让客户端去“订阅并开始监听“

result.get() 内部不是轮询,而是用 Redis Pub/Sub 订阅 celery-task-meta-{task_id} 频道,收到消息就立刻返回。这是 Redis Backend 比数据库 Backend 响应更快的根本原因。

结果的 key 格式

celery-task-meta-{task_id}

默认 TTLresult_expires = 86400(1 天),到期自动删除。

特性Redis Pub/Sub专业 MQ(Kafka/RabbitMQ)
消息持久化❌ 无✅ 支持
消息确认❌ 无✅ 支持
负载均衡❌ 不支持✅ 支持
消息重试❌ 无✅ 支持
适用场景轻量、非核心、实时广播核心业务、可靠消息传递
性能极高高(略低于 Redis)

任务状态流转

PENDING    → 任务已投递,等待 Worker 消费
STARTED    → Worker 开始执行(需要 task_track_started=True 才有此状态)
RETRY      → 任务失败,正在等待重试
SUCCESS    → 执行成功,结果可读
FAILURE    → 执行失败,max_retries 耗尽
REVOKED    → 任务被手动撤销

acks_late 与消息可靠性

@app.task(acks_late=True)
def reliable_agent_task(topic: str): ...
acks_late=False(默认)acks_late=True
何时 ack任务开始执行前任务成功执行后
Worker 崩溃时任务丢失任务重新入队
风险任务丢失任务可能重复执行
适用场景幂等任务不能丢失的任务

Agent 任务如果耗时很长,建议 acks_late=True,但要确保任务是幂等的(相同输入多次执行结果一样)。

注意acks_late=True 配合 Redis Broker 有一个陷阱——如果任务执行时间超过 Redis 的 visibility_timeout(默认 1 小时),任务会被认为超时,重新投递,导致重复执行。对于长时间 Agent 任务,需要增大 visibility_timeout

app.conf.broker_transport_options = {
    'visibility_timeout': 43200  # 12小时
}

接入 Agent 任务的完整示例

# agent_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger

app = Celery(
    'agent_tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

app.conf.update(
    task_serializer='json',
    result_serializer='json',
    task_track_started=True,       # 记录 STARTED 状态
    task_acks_late=True,           # 执行完再 ack
    worker_prefetch_multiplier=1,  # 长任务用 1,防止堆积
    result_expires=3600,           # 结果保留 1 小时
)

logger = get_task_logger(__name__)

@app.task(
    bind=True,
    max_retries=3,
    autoretry_for=(Exception,),
    retry_backoff=True,      # 指数退避:1s, 2s, 4s
    retry_backoff_max=60,    # 最大等待 60s
    retry_jitter=True,       # 加随机抖动防雷群效应
    rate_limit='30/m',       # RPM 限流
    time_limit=300,          # 任务最长执行 5 分钟,超时 kill
    soft_time_limit=240,     # 4 分钟时先发 SoftTimeLimitExceeded
)
def run_crew_task(self, topic: str) -> dict:
    logger.info(f"开始执行 Agent 任务,topic={topic},重试次数={self.request.retries}")
    
    # 上报进度(可选)
    self.update_state(state='PROGRESS', meta={'stage': '初始化'})
    
    try:
        result = crew.kickoff(inputs={"topic": topic})
        return {
            "status": "success",
            "output": result.raw,
            "task_id": self.request.id,
        }
    except SoftTimeLimitExceeded:
        logger.warning("任务即将超时,开始清理")
        raise  # 让 autoretry 处理

FastAPI 接入

from fastapi import FastAPI
from celery.result import AsyncResult

api = FastAPI()

@api.post("/agent/run")
async def run_agent(topic: str):
    task = run_crew_task.delay(topic)
    return {"task_id": task.id, "status": "queued"}

@api.get("/agent/result/{task_id}")
async def get_result(task_id: str):
    result = AsyncResult(task_id, app=app)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
    }

生产环境配置清单

app.conf.update(
    # Broker
    broker_url='redis://localhost:6379/0',
    broker_connection_retry_on_startup=True,
    broker_transport_options={
        'visibility_timeout': 43200,  # 长任务必须增大
    },

    # Backend
    result_backend='redis://localhost:6379/1',
    result_expires=3600,

    # Worker 行为
    worker_prefetch_multiplier=1,   # 长任务设 1
    task_acks_late=True,            # 可靠性优先
    task_track_started=True,        # 追踪 STARTED 状态
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],

    # 超时保护
    task_time_limit=600,        # 硬超时 10 分钟
    task_soft_time_limit=540,   # 软超时 9 分钟(先发信号)
)

需要注意的坑

坑一:Redis Broker + 指数退避 = 任务爆炸

retry_backoff 的等待时间超过 visibility_timeout,Redis 会认为任务超时,重新投递。结合 acks_late=True,会出现指数级任务复制(16 → 32 → 64…)。解决方案:增大 visibility_timeout 到超过最大等待时间。

坑二:Worker 重启时的未完成任务

acks_late=True 下,Worker 重启会把正在执行的任务重新投回队列。如果任务不是幂等的,会导致重复执行。对 Agent 任务:确保任务结果写入前做幂等检查(根据 task_id 去重)。

坑三:result.get() 阻塞主线程

result.get() 在 FastAPI 异步环境里会阻塞事件循环。需要用 asyncio.get_event_loop().run_in_executor 包装,或者用轮询接口而不是直接 get。

先理解 FastAPI 的运行模型。FastAPI 基于 asyncio,整个服务跑在一个事件循环里。这个事件循环是单线程的,靠协程切换来处理并发——某个请求在等 IO 时,事件循环切去处理其他请求。

result.get() 是同步阻塞调用,内部是一个 while 循环不断轮询或等待 Redis Pub/Sub 响应。它不会释放事件循环,就是死等。

# 错误写法
@api.get("/result/{task_id}")
async def get_result(task_id: str):
    result = AsyncResult(task_id)
    value = result.get(timeout=30)  # ← 在这里卡住 30 秒
    # 这 30 秒内,整个服务无法处理任何其他请求
    return value

这就像你开了一家餐厅,只有一个服务员,他去给一桌客人倒水,然后站在那里等客人喝完再说下一步,其他桌的客人全部没人管。

正确做法一:run_in_executor 把阻塞调用扔到线程池

import asyncio

@api.get("/result/{task_id}")
async def get_result(task_id: str):
    result = AsyncResult(task_id)
    loop = asyncio.get_event_loop()
    # 把 result.get 扔到线程池,不占用事件循环
    value = await loop.run_in_executor(None, result.get)
    return value

线程池里的线程可以随便阻塞,不影响事件循环继续处理其他请求。

正确做法二:轮询接口(更推荐)

不让接口等结果,而是让客户端自己来问:

@api.post("/agent/run")
async def run_agent(topic: str):
    task = run_crew_task.delay(topic)
    return {"task_id": task.id}  # 立刻返回,不等结果

@api.get("/agent/status/{task_id}")
async def get_status(task_id: str):
    result = AsyncResult(task_id)
    # ready() 和 status 是非阻塞的,只是读一次 Redis
    if result.ready():
        return {"status": "done", "result": result.result}
    return {"status": result.status}  # PENDING / STARTED / PROGRESS

客户端每隔几秒请求一次 /status/{task_id},直到拿到 done。这个模式在 Agent 场景里最合适,因为任务本来就是异步的,没必要在服务端等。

坑四:任务注册问题

Worker 需要能 import 到任务模块,否则收到消息时会找不到任务类型(KeyError: 'task_name'),消息被丢弃。确保 Worker 启动时 imports 配置包含所有任务模块。

先理解 FastAPI 的运行模型。FastAPI 基于 asyncio,整个服务跑在一个事件循环里。这个事件循环是单线程的,靠协程切换来处理并发——某个请求在等 IO 时,事件循环切去处理其他请求。

result.get() 是同步阻塞调用,内部是一个 while 循环不断轮询或等待 Redis Pub/Sub 响应。它不会释放事件循环,就是死等。

# 错误写法
@api.get("/result/{task_id}")
async def get_result(task_id: str):
    result = AsyncResult(task_id)
    value = result.get(timeout=30)  # ← 在这里卡住 30 秒
    # 这 30 秒内,整个服务无法处理任何其他请求
    return value

这就像你开了一家餐厅,只有一个服务员,他去给一桌客人倒水,然后站在那里等客人喝完再说下一步,其他桌的客人全部没人管。

正确做法一:run_in_executor 把阻塞调用扔到线程池

import asyncio

@api.get("/result/{task_id}")
async def get_result(task_id: str):
    result = AsyncResult(task_id)
    loop = asyncio.get_event_loop()
    # 把 result.get 扔到线程池,不占用事件循环
    value = await loop.run_in_executor(None, result.get)
    return value

线程池里的线程可以随便阻塞,不影响事件循环继续处理其他请求。

正确做法二:轮询接口(更推荐)

不让接口等结果,而是让客户端自己来问:

@api.post("/agent/run")
async def run_agent(topic: str):
    task = run_crew_task.delay(topic)
    return {"task_id": task.id}  # 立刻返回,不等结果

@api.get("/agent/status/{task_id}")
async def get_status(task_id: str):
    result = AsyncResult(task_id)
    # ready() 和 status 是非阻塞的,只是读一次 Redis
    if result.ready():
        return {"status": "done", "result": result.result}
    return {"status": result.status}  # PENDING / STARTED / PROGRESS

客户端每隔几秒请求一次 /status/{task_id},直到拿到 done。这个模式在 Agent 场景里最合适,因为任务本来就是异步的,没必要在服务端等。


坑四:任务注册问题

Celery 的任务注册是基于 Python import 的。@app.task 装饰器在模块被 import 时执行,把任务函数注册进 app.tasks 这个字典。

问题在于:Worker 进程和你的 Producer(FastAPI)是两个独立进程。Worker 启动时如果没有 import 到任务模块,它的 app.tasks 字典里就没有这个任务。

Producer 进程                    Worker 进程
app.tasks = {                    app.tasks = {}  ← 空的!
  "myapp.tasks.run_crew_task"    
}

.delay() 把消息投进 Redis →→→→  收到消息:type="myapp.tasks.run_crew_task"
                                  查 app.tasks["myapp.tasks.run_crew_task"]
                                  KeyError!任务类型未注册
                                  消息被丢弃,打日志:
                                  "Received unregistered task of type"

为什么会出现这个情况:你可能在 FastAPI 的 main.py 里 import 了任务模块,但启动 Worker 时用的是另一个入口文件,没有 import 到任务模块:

# 这样启动 Worker,如果 celery_app.py 里没有 import tasks,就会出问题
celery -A celery_app worker

解决方式:在 Celery app 配置里明确声明要加载的模块:

# celery_app.py
app = Celery('myapp')
app.conf.imports = [
    'myapp.tasks.agent_tasks',   # Worker 启动时强制 import 这些模块
    'myapp.tasks.other_tasks',
]

或者用 autodiscover_tasks,让 Celery 自动扫描所有 app 下的 tasks.py

app.autodiscover_tasks(['myapp', 'otherapp'])

验证方法:Worker 启动后,日志里会打出已注册的任务列表:

[tasks]
  . myapp.tasks.agent_tasks.run_crew_task  ← 有这行说明注册成功
  . celery.backend_cleanup

没有你的任务名就说明没注册上,去查启动入口的 import 路径。

← 返回 Notes

Contact

Contact Me

Leave a message here. The form sends directly from the browser to a form delivery service and then to my email.

Messages are delivered to lzx744008464@gmail.com.