不要使用数据库作为 AMQP Broker

随着worker 的不断增多可能给数据库IO和连接造成很大压力。 Docker 上很多 相关的镜像。

使用多个队列

对于不同的 task ,尽量使用不同的队列来处理。

@app.task()
def my_taskA(a, b, c):
	print("doing something here...")
@app.task()
def my_taskB(x, y):
	print("doing something here...")

celery_config.py 中定义

task_queues=(
    Queue('default', routing_key='default'),
    Queue('other', routing_key='other'),

在 task 上定义

@app.task(queue='other')
def parse_something():
	pass

定义具有优先级的 workers

假如有一个 taskA 去处理一个队列 A 中的信息,一个 taskB 去处理队列 B 中的数据,然后起了 x 个 worker 去处理队列 A ,其他的 worker 去处理队列 B。而这时也可能会出现队列 B 中一些 task 急需处理,而此时堆积在队列 B 中的 tasks 很多,需要耗费很长时间来处理队列 B 中的 task。此时就需要定义优先队列来处理紧急的task。

celery 中可以在定义 Queue 时,指定 routing_key

Queue('other', routing_key='other_high'),
Queue('other', routing_key='other_low'),

然后定义

task_routes={
	# see
	# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_routes
	# http://docs.celeryproject.org/en/latest/userguide/routing.html#routing-basics
	'path.to.task' : {
		'queue': 'other',
		'routing_key': 'other_high'
	},
	'path.to.task' : {
		'queue': 'other',
		'routing_key': 'other_low'
	},
}

在启动 worker 时指定 routing_key

celery worker -E -l INFO -n workerA -Q other_high
celery worker -E -l INFO -n workerB -Q other_low

使用 celery 的错误处理机制

一般情况下可能因为网络问题,或者第三方服务暂时性错误而导致 task 执行出错。这时可以使用 celery task 的重试机制。

@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
	try:
		print("doing stuff here...")
	except SomeNetworkException as e:
		print("maybe do some clenup here....")
		self.retry(e)

一般添加 default_retry_delay 重试等待时间和 max_retries 重试次数来限定,防止任务无限重试。

使用Flower

Flower 项目 为监控 celery tasks 和 workers 提供了一系列的便利。他使用 Web 界面提供 worker 当前状态, task 执行进度,各个worker 详细信息,甚至可以在网页上动态更行执行速率。

只有在真正需要时才去追踪 celery 的 result

任务的状态存储任务在退出时成功或者失败的信息,这些信息有些时候很重要,尤其是在后期分析数据时,但是大部分情况下更加关心task执行过程中真正想要保存的数据,而不是任务的状态。

所以,可以使用 task_ignore_result = True 来忽略任务结果。

不要将 Database/ORM 对象传入 tasks

不应该讲 Database objects 比如一个 User Model 传入在后台执行的任务,因为这些 object 可能包含过期的数据。相反应该传入一个 user id ,让 task 在执行过程中向数据库请求全新的 User Object。

以上七条来自:https://denibertovic.com/posts/celery-best-practices/

尽量简化 tasks

task 应该简洁(concise):

  • 将主要task逻辑包含在对象方法或者方法中
  • 确保方法抛出明确的异常 (identified exceptions)
  • 只有在切当的时机再实现重试机制

假设需要实现一个发送邮件的task

import requests
from myproject.tasks import app  # app is your celery application
from myproject.exceptions import InvalidUserInput
from utils.mail import api_send_mail
@app.task(bind=True, max_retries=3)
def send_mail(self, recipients, sender_email, subject, body):
	"""Send a plaintext email with argument subject, sender and body to a list of recipients."""
	try:
		data = api_send_mail(recipients, sender_email, subject, body)
	except InvalidUserInput:
		# No need to retry as the user provided an invalid input
		raise
	except Exception as exc:
		# Any other exception. Log the exception to sentry and retry in 10s.
		sentrycli.captureException()
		self.retry(countdown=10, exc=exc)
	return data

通常任务真实的实现只有一层,而剩余的其他部分都是错误处理。而通常这么处理会更加容易维护。

设置task 超时

设置一个全局的任务超时时间

task_soft_time_limit = 600   # 600 seconds

超时之后会抛出 SoftTimeLimitExceeded 异常

from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
	try:
		return do_work()
	except SoftTimeLimitExceeded:
		cleanup_in_a_hurry()

同样,定义任务时也能够指定超时时间,如果任务block尽快让其失败,尽量配置 task 的超时时间。不让长时间 block task 的进程。

@app.task(
	bind=True,
	max_retries=3,
	soft_time_limit=5 # time limit is in seconds.
)
def send_mail(self, recipients, sender_email, subject, body):
	...

将task重复部分抽象出来

使用task的基类来复用部分 task 逻辑

from myproject.tasks import app
class BaseTask(app.Task):
	"""Abstract base class for all tasks in my app."""
	abstract = True
	def on_retry(self, exc, task_id, args, kwargs, einfo):
		"""Log the exceptions to sentry at retry."""
		sentrycli.captureException(exc)
		super(BaseTask, self).on_retry(exc, task_id, args, kwargs, einfo)
	def on_failure(self, exc, task_id, args, kwargs, einfo):
		"""Log the exceptions to sentry."""
		sentrycli.captureException(exc)
		super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo)

@app.task(
	bind=True,
	max_retries=3,
	soft_time_limit=5,
	base=BaseTask)
def send_mail(self, recipients, sender_email, subject, body):
	"""Send a plaintext email with argument subject, sender and body to a list of recipients."""
	try:
		data = api_send_mail(recipients, sender_email, subject, body)
	except InvalidUserInput:
		raise
	except Exception as exc:
		self.retry(countdown=backoff(self.request.retries), exc=exc)
	return data

将大型task作为类

一般情况下将使用方法作为 task 就已经足够,如果遇到大型 task ,可以将其写成

class handle_event(BaseTask):   # BaseTask inherits from app.Task
	def validate_input(self, event):
		...
	def get_or_create_model(self, event):
		...
	def stream_event(self, event):
		...
	def run(self, event):
		if not self.validate_intput(event):
			raise InvalidInput(event)
		try:
			model = self.get_or_create_model(event)
			self.call_hooks(event)
			self.persist_model(event)
		except Exception as exc:
			self.retry(countdown=backoff(self.request.retries), exc=exc)
		else:
			self.stream_event(event)

单元测试

直接调用 worker task 中的方法,不要使用 task.delay() 。 或者使用 Eager Mode,使用 task_always_eager 设置来启用,当启用该选项之后,task会立即被调用。而 这两种方式都只能测试task worker 中的内容,官方1并不建议这么做。

对于执行时间长短不一的任务建议开启 -Ofair

celery 中默认都会有 prefork pool 会异步将尽量多的任务发送给 worker 执行,这也意味着 worker 会预加载一些任务。这对于通常的任务会有性能提升,但这也容易导致因为某一个长任务处理时间长二导致其他任务处于长时间等待状态。

对于执行时间长短不一的任务可以开启 -Ofair

celery -A proj worker -l info -Ofair

reference