Django中使用Celery执行异步和定时任务的注意事项
WEB前端开发社区 昨天
pip install celery==3.1.25
pip install gevent# 启动workercelery -A <module> worker -l info -P gevent
app = Celery('myproject')@app.task(bind=True)def debug_task(self):print('Request: {0!r}'.format(self.request))
from __future__ import absolute_importfrom celery import shared_task@shared_taskdef add(x, y):return x + y
connect_timeout, read_timeout = 5.0, 30.0response = requests.get(URL, timeout=(connect_timeout, read_timeout))
@app.task@decorator2@decorator1def add(x, y): return x + y
self),就像 Python 绑定方法类似,如下例所示:from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)@task(bind=True)def add(self, x, y):logger.info(self.request.id)
app.Task.retry() ),访问当前任务请求的信息,以及你添加到自定义任务基类的附加功能。ignore_result 选项,因为存储结果耗费时间和资源。你还可以可以通过 task_ignore_result 设置全局忽略任务结果。@app.task(ignore_result=True)def mytask(): something()
@app.taskdef update_page_info(url):page = fetch_page.delay(url).get()info = parse_page.delay(url, page).get()store_page_info.delay(url, info)@app.taskdef fetch_page(url):return myhttplib.get(url)@app.taskdef parse_page(url, page):return myparser.parse_document(page)@app.taskdef store_page_info(url, info):return PageInfo.objects.create(url, info)
def update_page_info(url):# fetch_page -> parse_page -> store_pagechain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)chain()@app.task()def fetch_page(url):return myhttplib.get(url)@app.task()def parse_page(page):return myparser.parse_document(page)@app.task(ignore_result=True)def store_page_info(info, url):PageInfo.objects.create(url=url, info=info)
class Article(models.Model):title = models.CharField()body = models.TextField()@app.taskdef expand_abbreviations(article):article.body.replace('MyCorp', 'My Corporation')article.save()
>>> article = Article.objects.get(id=102)>>> expand_abbreviations.delay(article)
@app.taskdef expand_abbreviations(article_id): article = Article.objects.get(id=article_id) article.body.replace('MyCorp', 'My Corporation') article.save()commit_on_success 装饰器,当视图返回时该事务会被提交,当视图抛出异常时会进行回滚。from django.db import transaction@transaction.commit_on_successdef create_article(request):article = Article.objects.create()expand_abbreviations.delay(article.pk)
on_commit 回调函数来在所有事务提交成功后启动任务。from django.db.transaction import on_commitdef create_article(request): article = Article.objects.create() on_commit(lambda: expand_abbreviations.delay(article.pk))
app.Task.retry() 函数可以用来重新执行任务。当一个任务被重试,它在重试前会等待给定的时间,并且默认的由 default_retry_delay 属性定义。默认设置为 3 分钟。注意延迟设置的单位是秒(int 或者 float)。你可以通过提供 countdown 参数覆盖这个默认值。# retry in 30 minutes.@app.task(bind=True, default_retry_delay=30 * 60) def add(self, x, y):try: something_raising()except Exception as exc:# overrides the default delay to retry after 1 minuteraise self.retry(exc=exc, countdown=60)
赞 (0)
