如果你第一次听说 Celery,可以去看下之前的 Celery 介绍 了解下 Celery 的基本功能,然后再来看这篇文章。
尽量不要使用数据库作为 AMQP Broker
随着 worker 的不断增多可能给数据库 IO 和连接造成很大压力。更具体来说不要把 Celery 的 task 数据和应用数据放到同一个数据库中。 Docker 上很多 相关的镜像。
使用多个队列
对于不同的 task ,尽量使用不同的队列来处理。
@app.task()
def my_taskA(a, b, c):
print("doing something here...")
@app.task()
def my_taskB(x, y):
print("doing something here...")
在 celery_config.py
中定义
task_queues=(
Queue('default', routing_key='default'),
Queue('other', routing_key='other'),
在 task 上定义
@app.task(queue='other')
def parse_something():
pass
定义具有优先级的 workers
假如有一个 taskA 去处理一个队列 A 中的信息,一个 taskB 去处理队列 B 中的数据,然后起了 x 个 worker 去处理队列 A ,其他的 worker 去处理队列 B。而这时也可能会出现队列 B 中一些 task 急需处理,而此时堆积在队列 B 中的 tasks 很多,需要耗费很长时间来处理队列 B 中的 task。此时就需要定义优先队列来处理紧急的 task。
celery 中可以在定义 Queue 时,指定 routing_key
Queue('other', routing_key='other_high'),
Queue('other', routing_key='other_low'),
然后定义
task_routes={
# see
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_routes
# http://docs.celeryproject.org/en/latest/userguide/routing.html#routing-basics
'path.to.task' : {
'queue': 'other',
'routing_key': 'other_high'
},
'path.to.task' : {
'queue': 'other',
'routing_key': 'other_low'
},
}
在启动 worker 时指定 routing_key
celery worker -E -l INFO -n workerA -Q other_high
celery worker -E -l INFO -n workerB -Q other_low
使用 celery 的错误处理机制
一般情况下可能因为网络问题,或者第三方服务暂时性错误而导致 task 执行出错。这时可以使用 celery task 的重试机制。
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)
一般添加 default_retry_delay
重试等待时间和 max_retries
重试次数来限定,防止任务无限重试。
使用 Flower
Flower 项目 为监控 celery tasks 和 workers 提供了一系列的便利。他使用 Web 界面提供 worker 当前状态, task 执行进度,各个 worker 详细信息,甚至可以在网页上动态更行执行速率。
只有在真正需要时才去追踪 celery 的 result
任务的状态存储任务在退出时成功或者失败的信息,这些信息有些时候很重要,尤其是在后期分析数据时,但是大部分情况下更加关心 task 执行过程中真正想要保存的数据,而不是任务的状态。
所以,可以使用 task_ignore_result = True
来忽略任务结果。
不要将 Database/ORM 对象传入 tasks
不应该讲 Database objects 比如一个 User Model 传入在后台执行的任务,因为这些 object 可能包含过期的数据。相反应该传入一个 user id ,让 task 在执行过程中向数据库请求全新的 User Object。
以上七条来自:https://denibertovic.com/posts/celery-best-practices/
尽量简化 tasks
task 应该简洁 (concise):
- 将主要 task 逻辑包含在对象方法或者方法中
- 确保方法抛出明确的异常 (identified exceptions)
- 只有在切当的时机再实现重试机制
假设需要实现一个发送邮件的 task
import requests
from myproject.tasks import app # app is your celery application
from myproject.exceptions import InvalidUserInput
from utils.mail import api_send_mail
@app.task(bind=True, max_retries=3)
def send_mail(self, recipients, sender_email, subject, body):
"""Send a plaintext email with argument subject, sender and body to a list of recipients."""
try:
data = api_send_mail(recipients, sender_email, subject, body)
except InvalidUserInput:
# No need to retry as the user provided an invalid input
raise
except Exception as exc:
# Any other exception. Log the exception to sentry and retry in 10s.
sentrycli.captureException()
self.retry(countdown=10, exc=exc)
return data
通常任务真实的实现只有一层,而剩余的其他部分都是错误处理。而通常这么处理会更加容易维护。
设置 task 超时
设置一个全局的任务超时时间
task_soft_time_limit = 600 # 600 seconds
超时之后会抛出 SoftTimeLimitExceeded 异常
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
同样,定义任务时也能够指定超时时间,如果任务 block 尽快让其失败,尽量配置 task 的超时时间。不让长时间 block task 的进程。
@app.task(
bind=True,
max_retries=3,
soft_time_limit=5 # time limit is in seconds.
)
def send_mail(self, recipients, sender_email, subject, body):
...
将 task 重复部分抽象出来
使用 task 的基类来复用部分 task 逻辑
from myproject.tasks import app
class BaseTask(app.Task):
"""Abstract base class for all tasks in my app."""
abstract = True
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Log the exceptions to sentry at retry."""
sentrycli.captureException(exc)
super(BaseTask, self).on_retry(exc, task_id, args, kwargs, einfo)
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Log the exceptions to sentry."""
sentrycli.captureException(exc)
super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@app.task(
bind=True,
max_retries=3,
soft_time_limit=5,
base=BaseTask)
def send_mail(self, recipients, sender_email, subject, body):
"""Send a plaintext email with argument subject, sender and body to a list of recipients."""
try:
data = api_send_mail(recipients, sender_email, subject, body)
except InvalidUserInput:
raise
except Exception as exc:
self.retry(countdown=backoff(self.request.retries), exc=exc)
return data
将大型 task 作为类
一般情况下将使用方法作为 task 就已经足够,如果遇到大型 task ,可以将其写成类
class handle_event(BaseTask): # BaseTask inherits from app.Task
def validate_input(self, event):
...
def get_or_create_model(self, event):
...
def stream_event(self, event):
...
def run(self, event):
if not self.validate_intput(event):
raise InvalidInput(event)
try:
model = self.get_or_create_model(event)
self.call_hooks(event)
self.persist_model(event)
except Exception as exc:
self.retry(countdown=backoff(self.request.retries), exc=exc)
else:
self.stream_event(event)
单元测试
直接调用 worker task 中的方法,不要使用 task.delay()
。 或者使用 Eager Mode,使用 task_always_eager
设置来启用,当启用该选项之后,task 会立即被调用。而 这两种方式都只能测试 task worker 中的内容,官方 1 并不建议这么做。
对于执行时间长短不一的任务建议开启 -Ofair
celery 中默认 都会有 prefork pool 会异步将尽量多的任务发送给 worker 执行,这也意味着 worker 会预加载一些任务。这对于通常的任务会有性能提升,但这也容易导致因为某一个长任务处理时间长,而导致其他任务处于长时间等待状态。
对于执行时间长短不一的任务可以开启 -Ofair
celery -A proj worker -l info -Ofair
设置 worker 的数量
Celery 默认会开启和 CPU core 一样数量的 worker,如果想要不想开启多个 worker ,可以通过启动时指定 --concurrency
选项
--concurrency=1
在 Celery 中使用多线程
上面提到使用 --concurrency=1
或者 -c 1
来设置 worker 的数量,Celery 同样支持 Eventlet 协程方式,如果你的 worker 有大量的 IO 操作,网络请求,那么此时使用 Eventlet 协程来提高 worker 的执行效率。确保在使用 Eventlet 之前对 Eventlet 非常了解,否则不要轻易使用
celery -A proj worker -P eventlet -c 10