django2定时任务,分布式任务队列学习笔记

一、前言

  Celery是三个依照python开采的遍及式职责队列,如若不精晓请阅读小编上一篇博文Celery入门与晋级,而做python WEB开荒最为盛行的框架莫属Django,然而Django的呼吁管理进程都以一路的一点计策也施展不出贯彻异步职分,若要实现异步职务管理须要通过其他方法(前端的日常技术方案是ajax操作),而后台Celery就是没有错的选料。倘诺一个客户在实施有个别操作须求等待非常久才回去,那大大裁减了网址的吞吐量。上面将汇报Django的乞请管理大约流程(图片来源网络):

图片 1

恳请进度大致表达:浏览器发起呼吁-->诉求管理-->央浼经过中间件-->路由映射-->视图管理专门的职业逻辑-->响应必要(template或response)

英特网有不菲celery django完结定时任务的学科,不过它们大部分是依赖djcelery celery3的;
要么是行使django_celery_beat配置较为麻烦的。

富有演示均依照Django2.0

Celery 是多个简练、灵活且保险的,管理多量音信的遍及式系统,况兼提供保养这么一个系统的必需工具。
它是贰个瞩目于实时管理的天职队列,同不常候也帮助职务调整。
如上是celery本身官方网址的牵线

二、配置利用

  celery很轻巧集成到Django框架中,当然假若想要达成按时任务的话还索要设置django-celery-beta插件,前面会表达。供给小心的是Celery4.0只扶持Django版本>=1.8的,假诺是低于1.8版本需求运用Celery3.1。

明明简洁而火速才是大家最终的言情,而celery4已经无需分外插件就能够与django结合达成定时义务了,原生的celery beat就能够很好的达成定期使命功能。

celery是一个基于python开辟的简易、灵活且保证的布满式职分队列框架,援救选取职务队列的点子在分布式的机器/过程/线程上进行职分调解。采取独立的生产者-花费者模型,首要由三局地构成:

celery的应用场景很广阔

配置

  新确立项目taskproj,目录结构(每一个app下多了个tasks文件,用于定义任务):

taskproj
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── taskproj
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

在类型目录taskproj/taskproj/目录下新建celery.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj')

app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

taskproj/taskproj/__init__.py:

from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ['celery_app']

taskproj/taskproj/settings.py

CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

跻身项目标taskproj目录运营worker:

celery worker -A taskproj -l debug

自然使用原生方案的同有时候有几点插件所带来的功利被大家抛弃了:

  • 音讯队列broker:broker实际上正是一个MQ队列服务,能够选择redis、rabbitmq等作为broker
  • 管理职分的花费者workers:broker布告worker队列中有职务,worker去队列中收取任务奉行,每八个worker就是叁个进度
  • 积累结果的backend:实行结果存款和储蓄在backend,暗中认可也会积存在broker使用的MQ队列服务中,也足以独立陈设用何种服务做backend
  • 拍卖异步职分
  • 任务调整
  • 管理定期任务
  • 布满式调整

概念与触发职分

  职务定义在每一个tasks文件中,app01/tasks.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x   y


@shared_task
def mul(x, y):
    return x * y

视图中触发任务

from django.http import JsonResponse
from app01 import tasks

# Create your views here.

def index(request,*args,**kwargs):
    res=tasks.add.delay(1,3)
    #任务逻辑
    return JsonResponse({'status':'successful','task_id':res.task_id})

访问

图片 2

 若想赢得任务结果,能够通过task_id使用AsyncResult获取结果,还足以平昔通过backend获取:

图片 3

 

  • 插件提供的定期职分管理将不在可用,当大家只须要职分定时实行而无需人工资调度度的时候这一点忽略不计。
  • 力不能支飞速的管住或追踪按时任务,定期任务的追踪其实交给日志更客观,但是对职务的修改就从不那么方便人民群众了,但是假若无需日常转移/增减职分的话这一点也在可接受范围内。

图片 4

平价也相当多,越发在接纳python构建的采取种类中,无缝对接,使用十三分便利。

扩展

  除了redis、rabbitmq能做结果存款和储蓄外,还能动用Django的orm作为结果存款和储蓄,当然须要设置正视插件,那样的好处在于大家能够直接通过django的数据查见到任务状态,同一时候为能够拟定更多的操作,上面介绍如何行使orm作为结果存款和储蓄。

1.安装

pip install django-celery-results

2.配置settings.py,注册app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

4.修改backend配置,将redis改为django-db

#CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储

5.修改数据库

python3 manage.py migrate django_celery_results

此时会见到数据库会多成立:

图片 5 当然你有的时候候供给对task表进行操作,以下源码的表结构定义:

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(_('task id'), max_length=255, unique=True)
    task_name = models.CharField(_('task name'), null=True, max_length=255)
    task_args = models.TextField(_('task arguments'), null=True)
    task_kwargs = models.TextField(_('task kwargs'), null=True)
    status = models.CharField(_('state'), max_length=50,
                              default=states.PENDING,
                              choices=TASK_STATE_CHOICES
                              )
    content_type = models.CharField(_('content type'), max_length=128)
    content_encoding = models.CharField(_('content encoding'), max_length=64)
    result = models.TextField(null=True, default=None, editable=False)
    date_done = models.DateTimeField(_('done at'), auto_now=True)
    traceback = models.TextField(_('traceback'), blank=True, null=True)
    hidden = models.BooleanField(editable=False, default=False, db_index=True)
    meta = models.TextField(null=True, default=None, editable=False)

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

 

Celery定时职务布署

在开展示公布署前先来拜会项目布局:

.├── linux_news│   ├── celery.py│   ├── __init__.py│   ├── settings.py│   ├── urls.py│   └── wsgi.py├── manage.py├── news│   ├── admin.py│   ├── apps.py│   ├── __init__.py│   ├── migrations│   ├── models│   ├── tasks.py│   ├── tests.py│   └── views└── start-celery.sh

个中news是大家的app,用于从部分rss订阅源获取情报新闻,linux_news则是大家的project。我们供给关怀的注重是celery.py,settings.py,tasks.py和start-celery.sh。

率先是celery.py,想让celery执行职务就必得实例化一个celery app,并把settings.py里的计划传入app:

import osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'linux_news.settings')app = Celery('linux_news')# 'django.conf:settings'表示django,conf.settings也就是django项目的配置,celery会根据前面设置的环境变量自动查找并导入# - namespace表示在settings.py中celery配置项的名字的统一前缀,这里是'CELERY_',配置项的名字也需要大写app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()

配置正是如此轻巧,为了能在django里使用那几个app,大家须求在__init__.py中程导弹入它:

from .celery import app as celery_app

然后大家来看tasks.py,它应该献身你的app目录中,前面我们布署了电动发掘,所以celery会自动找到那几个tasks,大家的tasks将写在此一模块中,代码涉及了部分orm的行使,为了符合焦点作者做了些简洁明了:

from linux_news.celery import celery_app as appfrom .models import *import timeimport feedparserimport pytzimport html@app.task(ignore_result=True)def fetch_news(origin_name):    """    fetch all news from origin_name    """    origin = get_feeds_origin(origin_name)    feeds = feedparser.parse(origin.feed_link)    for item in feeds['entries']:        entry = NewsEntry()        entry.title = item.title        entry.origin = origin        entry.author = item.author        entry.link = item.link        # add timezone        entry.publish_time = item.time.replace(tzinfo=pytz.utc)        entry.summary = html.escape(item.summary)        entry.save()@app.task(ignore_result=True)def fetch_all_news():    """    这是我们的定时任务    fetch all origins' news to db    """    origins = NewsOrigin.objects.all()    for origin in origins:        fetch_news.delay(origin.origin_name)

tasks里是一对耗费时间操作,比方网络IO或然数据库读写,因为我们不关切职务的重临值,所以采纳@app.task(ignore_result=True)将其屏蔽了。

职务布署完毕后大家就要铺排celery了,大家挑选redis作为职分队列,小编刚毅提议在生养条件中央银行使rabbitmq也许redis作为天职队列或结果缓存后端,而不应该运用关系型数据库:

# redisREDIS_PORT = 6379REDIS_DB = 0# 从环境变量中取得redis服务器地址REDIS_HOST = os.environ.get('REDIS_ADDR', 'redis')# celery settings# 这两项必须设置,否则不能正常启动celery beatCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = TIME_ZONE# 任务队列配置CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_ACCEPT_CONTENT = ['application/json', ]CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_TASK_SERIALIZER = 'json'

接下来是我们的定期职务设置:

from celery.schedules import crontabCELERY_BEAT_SCHEDULE={        'fetch_news_every-1-hour': {            'task': 'news.tasks.fetch_all_news',            'schedule': crontab(minute=0, hour='*/1'),        }}

定期任务布署对象是一个dict,由任务名和计划项构成,首要配备想如下:

  • task:义务函数所在的模块,模块路线得写全,不然找不到将不恐怕运转该义务
  • schedule:按期宗旨,日常选择celery.schedules.crontab,上面例子为每小时的0分施行一遍职责,具体写法与linux的crontab类似能够参照文书档案表明
  • args:是个元组,给出职务急需的参数,如若无需参数也能够不写进配置,就如例子中的同样
  • 任何配置项相当少用,能够参照文书档案
    迄今甘休,配置celery beat的一对就与世长辞了。

异步职分

Celery

三、Django中使用定期职责

  如若想要在django中使用定期义务功用雷同是靠beat实现义务发送功用,当在Django中使用定期职分时,必要安装django-celery-beat插件。以下将介绍使用进程。

启动celery beat

布置实现后只需要运转celery了。

启航此前安顿一下遭遇。不要用root运转celery!不要用root运转celery!不要用root运营celery!首要的事情说壹遍。

start-celery.sh:

export REDIS_ADDR=127.0.0.1celery -A linux_news worker -l info -B -f /path/to/log

-A 表示app所在的目录,-B表示运行celery beat运维定期职责。
celery常常运营后就可以透过日记来查阅职责是不是正规运作了:

[2018-12-21 13:00:00,022: INFO/MainProcess] Received task: news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d]  [2018-12-21 13:00:00,046: INFO/MainProcess] Received task: news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b]  [2018-12-21 13:00:00,051: INFO/ForkPoolWorker-2] Task news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] succeeded in 0.02503809699555859s: None[2018-12-21 13:00:00,052: INFO/MainProcess] Received task: news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25]  [2018-12-21 13:00:00,449: INFO/ForkPoolWorker-5] Task news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] succeeded in 0.39487219898728654s: None[2018-12-21 13:00:00,606: INFO/ForkPoolWorker-3] Task news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] succeeded in 0.5523456179944333s: None

如上便是celery4运行定时职分的源委,如有错误和脱漏,接待指正。

自家的异步使用意况为品种上线:前端web上有个上线开关,点击开关后发央求给后端,后端施行上线进程要5秒钟,后端在接到到伏乞后把义务放入队列异步实践,相同的时候马上赶回给前端二个职责试行中的结果。若果未有异步施行会什么啊?同步的景况就是推行进度中前端一贯在等后端重临结果,页面转呀转的就转超时了。

安装

安装配备

1.beat插件装置

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
    ....   
    'django_celery_beat',
]

3.数据库更动

python3 manage.py migrate django_celery_beat

4.分别运行woker和beta

celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #启动beta 调度器使用数据库

celery worker -A taskproj -l info #启动woker

5.配置admin

urls.py

# urls.py
from django.conf.urls import url
from django.contrib import admin

urlpatterns = [
    url(r'^admin/', admin.site.urls),
]

6.创立顾客

python3 manage.py createsuperuser 

7.登陆admin进行政管理制(地址

图片 6

 

 使用示例:

图片 7

 

 

 

 

图片 8

 

 

 查看结果:

图片 9

 

异步义务布署

安装Celery

引入使用pip安装,假如你采纳的是虚构情况,请在虚构情状里设置

$ pip install celery

贰遍开垦

  django-celery-beat插件本质上是对数据库表变化检查,一旦有多少库表改变,调整器重新读取任务拓宽调节,所以只要想和睦定制的职务页面,只供给操作beat插件的四张表就足以了。当然你还是可以够团结定义调节器,django-celery-beat插件已经嵌入了model,只供给举办导入便可进展orm操作,以下小编用django reset api实行言传身教:

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]

urls.py

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index$', views.index),
    url(r'^res$', views.get_res),
    url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]

views.py

from django_celery_beat.models import PeriodicTask  #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """自定义分页"""
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination

访问

图片 10

 

1.安装rabbitmq,这里大家选拔rabbitmq作为broker,安装完成后默许运营了,也无需任何任何配置

设置音讯中间件

Celery 支持 RabbitMQ、Redis 以至别的数据库系统作为其消息代理中间件

你希望用怎么样中间件和后端就请自行设置,日常都采纳redis大概RabbitMQ

# apt-get install rabbitmq-server

安装Redis

在Ubuntu系统下利用apt-get命令就足以

$ sudo apt-get install redis-server

假使您利用redis作为中间件,还索要安装redis协助包,一样采用pip安装就可以

$ pip install redis

能冒出以下结果即为成功

redis 127.0.0.1:6379>

任何的redis知识这里不左介绍,假诺有意思味,能够自动了然

若果你接纳RabbitMQ,也请安装RabbitMQ

2.安装celery

安装RabbitMQ

$ sudo apt-get install rabbitmq-server
# pip3 install celery

使用Celery

3.celery用在django项目中,django项目目录结构(简化)如下

回顾间接选择

可以在急需的地点平素引进Celery,直接使用就能够。最简易的不二秘诀只须要配备一个职分和中间人就可以

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x   y

本身那边运用了redis作为中间件,那是足以按自身的习于旧贯替换的

鉴于默许的配置不是最切合大家的花色实际需求,平日的话大家都亟待按我们和好的要求陈设部分,
而是出于供给将品种解耦,也好维护,我们最棒应用单独的叁个文本编制配置。

website/
|-- deploy
|  |-- admin.py
|  |-- apps.py
|  |-- __init__.py
|  |-- models.py
|  |-- tasks.py
|  |-- tests.py
|  |-- urls.py
|  `-- views.py
|-- manage.py
|-- README
`-- website
  |-- celery.py
  |-- __init__.py
  |-- settings.py
  |-- urls.py
  `-- wsgi.py

单身铺排配置文件

比上边的有些复杂一点,大家必要创设五个公文,二个为config.py的celery配置文件,在里边填写适合大家项目标配备,在开创一个tasks.py文件来编排我们的天职。文件的名字能够按你的喜好和煦取名。

config.py内容为:

# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

安插好之后能够用以下命令检查陈设文件是或不是精确(config为布局文件名)

$ python -m config

tasks.py内容为:

# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x   y

再有一种同等设置配置的主意,不是很推荐

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先要求用以上办法批量创新配备文件。

4.创建 website/celery.py 主文件

在利用上接纳

工程目录结构为

proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x   y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启航celery只须要在proj同级目录下:

$ celery -A proj worker -l info
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')

app = Celery('website')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))

在django中使用celery

作者们的django的花色的目录结构相似如下

proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中动用celery,我们第一须求在django中配置celery

我们须要在与工程名同名的子文件夹中加多celery.py文件
在本例中也正是proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

下一场大家须求在同级目录下的**init.py文件中配备如下内容 proj/proj/init.py**

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

接下来大家就足以把须求的职责放到需求的app下的tasks.py中,将来项目目录结构如下

proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

或是的三个tasks.py文件内容如下:
myapp1/tasks.py为:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x y)
    return x   y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器能够令你制造task没有须求app实体

在急需的地点调用相关任务就能够,举例在myapp1/views.py中调用

from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

接下来就能够运行项目,celery必要单独运行,所以供给开两个极端,分别

开行web应用服务器

$ python manage.py runserver

启动celery

$ celery -A proj worker -l info

接下来访谈浏览器就足以在运行celery的极端中观察输出

图片 11

测试结果

5.在 website/__init__.py 文件中追加如下内容,确认保证django运转的时候那几个app能够被加载到

扩展

  • 倘诺你的品类要求在admin中管理调节,请使用django-celery-beat
  1. 使用pip安装django-celery-beat
$ pip install django-celery-beat

决不在采纳django-celery,那个连串已经甘休更新好好多年。。。。

  1. 在settings.py中加多这一个app
INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
  1. 手拉手一下数据库
$ python manage.py migrate
  1. 设置celery beat服务应用django_celery_beat.schedulers:DatabaseScheduler scheduler
$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

下一场在就足以admin界面看见了。

  • 即使您想使用Django-ORM可能Django Cache作为后端,需求设置django-celery-results强大(作者不提出)
  1. 使用pip安装django-celery-results
$ pip install django-celery-results

决不在应用django-celery,那个连串早已截止更新好许多年。。。。

  1. 在settings.py中丰硕这些app
INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
  1. 协助进行一下数据库
$ python manage.py migrate django_celery_results
  1. 布局后端,在settings.py中配置
# 使用数据库作为结果后端
CELERY_RESULT_BACKEND = 'django-db'

# 使用缓存作为结果后端
CELERY_RESULT_BACKEND = 'django-cache'

主导使用大致就是上述那一个,别的实际布置和动用还需协应用研讨读官方文档

注:

  • 上述条件在ubuntu16.04 lts django1.9中搭建测量试验成功
  • 上述文字皆为私家意见,如有错误或建议请及时交流自己
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

6.各使用成立tasks.py文件,这里为 deploy/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
  return x   y

只顾tasks.py必需建在各app的根目录下,且只好叫tasks.py,不能够随便命名

7.views.py中援用使用那几个tasks异步管理

from deploy.tasks import add

def post(request):
  result = add.delay(2, 3)


result.ready()
result.get(timeout=1)
result.traceback

8.启动celery

# celery -A website worker -l info

9.那样在调用post那些情势时,里边的add就能够异步管理了

定时任务

按时任务的运用处境就很宽泛了,比方本人急需定期发送报告给老总娘~

定期职务布置

  1. website/celery.py 文件增加如下配置以支撑定期任务crontab
from celery.schedules import crontab

app.conf.update(
  CELERYBEAT_SCHEDULE = {
    'sum-task': {
      'task': 'deploy.tasks.add',
      'schedule': timedelta(seconds=20),
      'args': (5, 6)
    }
    'send-report': {
      'task': 'deploy.tasks.report',
      'schedule': crontab(hour=4, minute=30, day_of_week=1),
    }
  }
)

概念了八个task:

  • 名字为'sum-task'的task,每20秒实践贰次add函数,并传了三个参数5和6
  • 名为'send-report'的task,周周五上午4:30实行report函数

timedelta是datetime中的贰个对象,必要 from datetime import timedelta 引入,有如下多少个参数

  • days
  • seconds
  • microseconds
  • milliseconds
  • minutes
  • hours

crontab的参数有:

month_of_year
day_of_month
day_of_week
hour
minute

  1. deploy/tasks.py 文件增加report方法:
@shared_task
def report():
  return 5

3.开发银行celery beat,celery运转了贰个beat进度一向在不断的决断是不是有义务急需推行

# celery -A website beat -l info

Tips

1.一旦你同有时间采用了异步职分和布署任务,有一种更简单的开发银行方式 celery -A website worker -b -l info ,可同一时候开动worker和beat

2.万一使用的不是rabbitmq做队列那么需求在主配置文件中 website/celery.py 配置broker和backend,如下:

# redis做MQ配置
app = Celery('website', backend='redis', broker='redis://localhost')
# rabbitmq做MQ配置
app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')

3.celery不可能用root客户运维的话必要在主配置文件中增添 platforms.C_FORCE_ROOT = True

4.celery在长日子运作后只怕出现内存泄漏,必要增添配置 CELERYD_MAX_TASKS_PER_CHILD = 10 ,表示各样worker试行了不怎么个任务就死掉

上述正是本文的全部内容,希望对大家的就学抱有利于,也冀望我们多都赐教脚本之家。

您大概感兴趣的稿子:

  • 异步职分队列Celery在Django中的施用办法
  • Django使用Celery异步任务队列的采纳
  • Django中使用celery实现异步职务的躬行实践代码

本文由星彩网app下载发布于计算机编程,转载请注明出处:django2定时任务,分布式任务队列学习笔记

TAG标签: 星彩网app下载
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。