Django使用Celery异步任务队列的使用,分布式任务

一、简介

  Celery是由Python开辟、不难、灵活、可信赖的布满式职分队列,其本质是劳动者消费者模型,生产者发送任务到音讯队列,花费者肩负管理任务。Celery侧重于实时操作,但对调解扶助也很好,其每一天能够管理数以百万计的职分。特点:

  • 简单易行:熟习celery的工作流程后,配置使用轻便
  • 高可用:当任务实践停业或实施进度中发出延续中断,celery会自动尝试重新实践职务
  • 快快:一个单进度的celery每秒钟可管理上百万个任务
  • 利落:差非常的少celery的种种零部件都得以被扩充及自定制

利用场景举个例子:

  1.web利用:当顾客在网址开展有些操作必要不短日子完毕时,大家得以将这种操作交给Celery施行,直接回到给客商,等到Celery实施到位现在公告客商,大大提好网站的出现乃至客户的体验感。

  2.职责场景:比方在运营场景下要求批量在几百台机械实施某个命令只怕义务,此时Celery能够轻易解决。

  3.定时任务:向按期导数据报表、定期发送文告类似境况,固然Linux的布置任务能够帮自个儿达成,可是这些不便于管理,而Celery能够提供管理接口和拉长的API。

1 Celery简介

Celery是异步职分队列,能够单独于主进度运行,在主进度退出后,也不影响队列中的职责试行。

义务实践非凡退出,重新起动后,会继续施行队列中的其余职责,同一时间能够缓存截止时期接收的工作职责,那个功效信赖于音讯队列(MQ、Redis)。

二、架构&专门的学问规律

  Celery由以下三片段构成:音讯中间件(Broker)、任务奉行单元Worker、结果存款和储蓄(Backend),如下图:

  图片 1

干活规律:

  1. 职责模块Task包罗异步职务和定期职务。个中,异步职分平时在专门的学业逻辑中被触发并发往新闻队列,而定期职务由Celery Beat进度周期性地将职务发往音信队列;
  2. 任务推行单元Worker实时监视新闻队列获取队列中的职务试行;
  3. Woker推行完职责后将结果保存在Backend中;

1.1 Celery原理

图片 2

Celery的 架构 由三有些构成,信息中间件(message broker),职务实施单元(worker)和天职实施结果存款和储蓄(task result store)组成。

音信中间件:Celery本身不提供新闻服务,可是可以方便的和第三方提供的音讯中间件集成。包涵, RabbitMQRedis ,  MongoDB  (experimental), 亚马逊(Amazon) SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。推荐使用:RabbitMQ、Redis作为音讯队列。

任务试行单元:Worker是Celery提供的天职实施的单元,worker并发的周转在布满式的体系节点中。

任务结果存款和储蓄:Task result store用来囤积Worker实行的天职的结果,Celery辅助以分歧方式存款和储蓄职分的结果,包蕴AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

音信中间件Broker

  音讯中间件Broker官方提供了大多筹划方案,援救RabbitMQ、Redis、AmazonSQS、MongoDB、Memcached 等,官方推荐RabbitMQ。

1.2Celery适用场景

异步任务管理:比方给登记顾客发送短音讯或许确认邮件职分。 大型职责:实行时间较长的职分,举个例子摄像和图表管理,增多水印和转码等,须要奉行职务时间长。 定期履行的天职:扶植义务的按期实施和设定时期实践。举例品质压测定期奉行。

义务试行单元Worker

  Worker是天职推行单元,负担从消息队列中抽取任务实施,它可以运营四个只怕五个,也能够运转在不一致的机械节点,那就是其促元素布式的主干。

 2Celery开拓条件计划

结果存款和储蓄Backend

  Backend结果存储官方也提供了相当多的囤积格局帮衬:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。

 2.1 景况筹算

软件名称

版本号

说明

Linux

Centos 6.5(64bit)

操作系统

Python

3.5.2

Django

1.10

Web框架

Celery

4.0.2

异步任务队列

Redis

2.4

消息队列

三、安装使用 

  此处自个儿使用的redis作为音讯中间件,redis安装能够参照

Celery安装: 

pip3 install celery

2.2     Celery安装

应用办法介绍:

Celery的运营信任新闻队列,使用时索要设置redis或然rabbit。

此间大家运用Redis。安装redis库:

sudo yum install redis

启动redis:

sudo service redis start

安装celery库

sudo pip install celery==4.0.2

简轻松单利用

  目录结构:

project/
├── __init__.py  
├── config.py
└── tasks.py

各目录文件表明:

__init__.py:最早化Celery以致加载配置文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from celery import Celery
app = Celery('project')                                # 创建 Celery 实例
app.config_from_object('project.config')               # 加载配置模块

config.py:  Celery相关配置文件,越多安排参谋:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
)

tasks.py :任务定义文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
@app.task
def show_name(name):
    return name

启动Worker:

celery worker -A project -l debug

逐一参数含义:

  worker: 代表第运维的剧中人物是work当然还应该有beat等任何剧中人物;

  -A :项目路线,这里自个儿的目录是project

  -l:运转的日记品级,越来越多参数使用celery --help查看

查阅日志输出,会发觉大家定义的职分,以致相关配置:

图片 3

 

  尽管起步了worker,不过大家还索要经过delay或apply_async来将任务增多到worker中,这里大家经过交互式方法加多职务,并重返AsyncResult对象,通过AsyncResult对象获得结果:

图片 4

AsyncResult除了get方法用于常用获取结果方法外还提以下常用艺术或质量:

  • state: 再次回到职分情形;
  • task_id: 再次回到职务id;
  • result: 重回任务结果,同get()方法;
  • ready(): 推断义务是或不是以致有结果,有结果为True,不然False;
  • info(): 获取职责新闻,默以为结果;
  • wait(t): 等待t秒后获得结果,若职责试行完成,则不等待直接拿走结果,若职责在实行中,则wait时期平素不通,直到超时报错;
  • successfu(): 判别职分是不是中标,成功为True,不然为False;

3Celery单独执行任务

四、进级使用

  对于普通的天职的话大概餍足不断我们的职分急需,所以还亟需精晓部分进级用法,Celery提供了广大调治措施,例如任务编排、依据任务状态执行不一样的操作、重试机制等,以下会对常用高阶用法举行描述。

 3.1编写职务

创建task.py文件

注解:这里开头Celery实例时就加载了配备,使用的redis作为音讯队列和仓库储存义务结果。

图片 5

运行celery:

$ celery -A task worker --loglevel=info

看样子上面的打字与印刷,表达celery成功运营。

图片 6

定期义务&陈设义务

  Celery的提供的按时职分器重靠schedules来成功,通过beat组件周期性将职务发送给woker执行。在演示中,新建文件period_task.py,并丰硕职责到布置文件中:

period_task.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, add.s(1,3), name='1 3=') # 每10秒执行add
    sender.add_periodic_task(
        crontab(hour=16, minute=56, day_of_week=1),      #每周一下午四点五十六执行sayhai
        sayhi.s('wd'),name='say_hi'
    )



@app.task
def add(x,y):
    print(x y)
    return x y


@app.task
def sayhi(name):
    return 'hello %s' % name

config.py

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task', #定时任务
)

启动worker和beat:

celery worker -A project -l debug #启动work
celery beat -A  project.period_task -l  debug #启动beat,注意此时对应的文件路径

我们能够观测worker日志:

图片 7

还足以经过布署文件措施钦赐定期和安顿职务,此时的布局文件如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
    'period_add_task': {    # 计划任务
        'task': 'project.period_task.add',  #任务路径
        'schedule': crontab(hour=18, minute=16, day_of_week=1),
        'args': (3, 4),
    },
'add-every-30-seconds': {          # 每10秒执行
        'task': 'project.period_task.sayhi',  #任务路径
        'schedule': 10.0,
        'args': ('wd',)
    },
}

此时的period_task.py只要求登记到woker中就行了,如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    print(x y)
    return x y


@app.task
def sayhi(name):
    return 'hello %s' % name

同样运行worker和beat结果和第一种办法同样。愈来愈多详细的原委请参照他事他说加以考察:

3.2 调用职分

一向张开python交互命令行

推行上面代码:

图片 8

能够celery的窗口看看职责的推行消息

图片 9

职分履市价况监察和控制和收获结果:

图片 10

职务绑定

  Celery可由此义务绑定到实例获取到职分的上下文,那样我们得以在职分局转时候取获得职分的景色,记录相关日志等。

修改职责中的period_task.py,如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
@app.task(bind=True)  # 绑定任务
def add(self,x,y):
    logger.info(self.request.__dict__)  #打印日志
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=3) # 出错每5秒尝试一次,总共尝试3次
    return x y

在以上代码中,通过bind参数将任务绑定,self指职责的上下文,通过self获取职分情形,同一时候在职分出错开上下班时间开展任务重试,大家重点日志:

图片 11

3.3任务调用方法总括

有两种办法:

delay和apply_async ,delay方法是apply_async简化版。

add.delay(2, 2)
add.apply_async((2, 2))
add.apply_async((2, 2), queue='lopri')

delay方法是apply_async简化版本。

apply_async方法是足以带相当多的陈设参数,满含内定队列等

Queue 钦点队列名称,能够把不一致任务分配到区别的队列 3.4     职分状态

每种职责有二种景况:PENDING -> STARTED -> SUCCESS

任务查询状态:res.state

来查询职务的情事

图片 12

内置钩子函数

  Celery在进行义务时候,提供了钩子方法用于在职责试行到位时候实行相应的操作,在Task源码中提供了过多情状钩子函数如:on_success(成功后实行)、on_failure(战败时候实行)、on_retry(职责重试时候推行)、after_return(任务重返时候实行),在拓宽利用是我们只要求重写那一个主意,完毕相应的操作就可以。

在偏下示例中,大家后续修改period_task.py,分别定义三个职分来演示职务战败、重试、职责成功后进行的操作:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger
from celery import Task

logger = get_task_logger(__name__)

class demotask(Task):

    def on_success(self, retval, task_id, args, kwargs):   # 任务成功执行
        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))



    def on_failure(self, exc, task_id, args, kwargs, einfo):  #任务失败执行
        logger.info('task id:{} , arg:{} , failed ! erros : {}' .format(task_id,args,exc))


    def on_retry(self, exc, task_id, args, kwargs, einfo):    #任务重试执行
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

@app.task(base=demotask,bind=True)
def add(self,x,y):
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=1) # 出错每5秒尝试一次,总共尝试1次
    return x y

@app.task(base=demotask)
def sayhi(name):
    a=[]
    a[10]==1
    return 'hi {}'.format(name)

@app.task(base=demotask)
def sum(a,b):
    return 'a b={} '.format(a b)

此时的铺排文件config.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置

CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
'add': {          # 每10秒执行
        'task': 'project.period_task.add',  #任务路径
        'schedule': 10.0,
        'args': (10,12),
    },
'sayhi': {          # 每10秒执行
        'task': 'project.period_task.sayhi',  #任务路径
        'schedule': 10.0,
        'args': ('wd',),
    },
'sum': {          # 每10秒执行
        'task': 'project.period_task.sum',  #任务路径
        'schedule': 10.0,
        'args': (1,3),
    },
}

接下来重启worker和beat,查看日志:

图片 13

 

4与Django集成

下面简单介绍了celery异步职分的宗旨办法,结合大家实际的利用,我们须求与Django一同使用,上边介绍如何与Django结合。

任务编排

  在无数境况下,二个职分急需由多少个子职责照旧三个任务须要广大手续技能到位,Celery同样也能促成那样的任务,完结那类型的职分通过以下模块形成:

  • group: 并行调整任务

  • chain: 链式职分调整

  • chord: 类似group,但分header和body2个部分,header能够是三个group职务,执行到位后调用body的义务

  • map: 映射调节,通过输入八个入参来很多次调解同八个职分

  • starmap: 类似map,入参类似*args

  • chunks: 将职务根据一定数量实行分组

 

修改tasks.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    return x y


@app.task
def mul(x,y):
    return x*y


@app.task
def sum(data_list):
    res=0
    for i in data_list:
        res =i
    return res

 

group: 组职务,组内每一个职分并行施行

和project同级目录新建consumer.py如下:

from celery import group
from project.tasks import add,mul,sum
res = group(add.s(1,2),add.s(1,2))()  # 任务 [1 2,1 2] 
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

结果:

图片 14

 

chain:链式职责

链式职分中,默许上三个职务的回到结果作为参数字传送递给子职责

from celery import chain
from project.tasks import add,mul,sum
res = chain(add.s(1,2),add.s(3),mul.s(3))()  # 任务((1 2) 3)*3
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break
#结果
#res:18

仍然是能够使用|表示链式职务,上边职务也能够表示为:

res = (add.s(1,2) | add.s(3) | (mul.s(3)))()
res.get()

 

chord:职责分割,分为header和body两部分,hearder任务施行完在奉行body,当中hearder重临结果作为参数字传送递给body

from celery import chord
from project.tasks import add,mul,sum
res = chord(header=[add.s(1,2),mul.s(3,4)],body=sum.s())()  # 任务(1 2) (3*4)
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

#结果:
#res:15

 

chunks:职分分组,依据义务的个数分组

from project.tasks import add,mul,sum
res = add.chunks(zip(range(5),range(5)),4)()  # 4 代表每组的任务的个数
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

结果:

图片 15

 

4.1与Django集成方法

与Django集成有三种艺术:

  1. Django 1.8 以上版本:与Celery 4.0本子集成
  2. Django 1.8 以下版本:与Celery3.1版本集成,使用django-celery库

明天咱们介绍celery4.0 和django 1.8上述版本集成方法。

delay &apply_async

  对于delay和apply_async都足以用来开展职务的调整,本质上是delay对apply_async实行了再二遍封装(大概能够说是火速模式),两个都回去AsyncResult对象,以下是三个办法源码。

图片 16图片 17

    def delay(self, *args, **kwargs):
        """Star argument version of :meth:`apply_async`.

        Does not support the extra options enabled by :meth:`apply_async`.

        Arguments:
            *args (Any): Positional arguments passed on to the task.
            **kwargs (Any): Keyword arguments passed on to the task.
        Returns:
            celery.result.AsyncResult: Future promise.
        """
        return self.apply_async(args, kwargs)

delay源码

图片 18图片 19

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        """Apply tasks asynchronously by sending a message.

        Arguments:
            args (Tuple): The positional arguments to pass on to the task.

            kwargs (Dict): The keyword arguments to pass on to the task.

            countdown (float): Number of seconds into the future that the
                task should execute.  Defaults to immediate execution.

            eta (~datetime.datetime): Absolute time and date of when the task
                should be executed.  May not be specified if `countdown`
                is also supplied.

            expires (float, ~datetime.datetime): Datetime or
                seconds in the future for the task should expire.
                The task won't be executed after the expiration time.

            shadow (str): Override task name used in logs/monitoring.
                Default is retrieved from :meth:`shadow_name`.

            connection (kombu.Connection): Re-use existing broker connection
                instead of acquiring one from the connection pool.

            retry (bool): If enabled sending of the task message will be
                retried in the event of connection loss or failure.
                Default is taken from the :setting:`task_publish_retry`
                setting.  Note that you need to handle the
                producer/connection manually for this to work.

            retry_policy (Mapping): Override the retry policy used.
                See the :setting:`task_publish_retry_policy` setting.

            queue (str, kombu.Queue): The queue to route the task to.
                This must be a key present in :setting:`task_queues`, or
                :setting:`task_create_missing_queues` must be
                enabled.  See :ref:`guide-routing` for more
                information.

            exchange (str, kombu.Exchange): Named custom exchange to send the
                task to.  Usually not used in combination with the ``queue``
                argument.

            routing_key (str): Custom routing key used to route the task to a
                worker server.  If in combination with a ``queue`` argument
                only used to specify custom routing keys to topic exchanges.

            priority (int): The task priority, a number between 0 and 9.
                Defaults to the :attr:`priority` attribute.

            serializer (str): Serialization method to use.
                Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
                serialization method that's been registered
                with :mod:`kombu.serialization.registry`.
                Defaults to the :attr:`serializer` attribute.

            compression (str): Optional compression method
                to use.  Can be one of ``zlib``, ``bzip2``,
                or any custom compression methods registered with
                :func:`kombu.compression.register`.
                Defaults to the :setting:`task_compression` setting.

            link (Signature): A single, or a list of tasks signatures
                to apply if the task returns successfully.

            link_error (Signature): A single, or a list of task signatures
                to apply if an error occurs while executing the task.

            producer (kombu.Producer): custom producer to use when publishing
                the task.

            add_to_parent (bool): If set to True (default) and the task
                is applied while executing another task, then the result
                will be appended to the parent tasks ``request.children``
                attribute.  Trailing can also be disabled by default using the
                :attr:`trail` attribute

            publisher (kombu.Producer): Deprecated alias to ``producer``.

            headers (Dict): Message headers to be included in the message.

        Returns:
            celery.result.AsyncResult: Promise of future evaluation.

        Raises:
            TypeError: If not enough arguments are passed, or too many
                arguments are passed.  Note that signature checks may
                be disabled by specifying ``@task(typing=False)``.
            kombu.exceptions.OperationalError: If a connection to the
               transport cannot be made, or if the connection is lost.

        Note:
            Also supports all keyword arguments supported by
            :meth:`kombu.Producer.publish`.
        """
        if self.typing:
            try:
                check_arguments = self.__header__
            except AttributeError:  # pragma: no cover
                pass
            else:
                check_arguments(*(args or ()), **(kwargs or {}))

        app = self._get_app()
        if app.conf.task_always_eager:
            with denied_join_result():
                return self.apply(args, kwargs, task_id=task_id or uuid(),
                                  link=link, link_error=link_error, **options)

        if self.__v2_compat__:
            shadow = shadow or self.shadow_name(self(), args, kwargs, options)
        else:
            shadow = shadow or self.shadow_name(args, kwargs, options)

        preopts = self._get_exec_options()
        options = dict(preopts, **options) if options else preopts

        options.setdefault('ignore_result', self.ignore_result)

        return app.send_task(
            self.name, args, kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, result_cls=self.AsyncResult,
            shadow=shadow, task_type=self,
            **options
        )

apply_async源码

对此其利用,apply_async补助常用参数:

  • eta:钦定职分实施时间,类型为datetime时间档次;
  • countdown:倒计时,单位秒,浮点类型;
  • expires:职务过期时间,如果任务在跨超过期时刻还未推行则回收职务,浮点类型获取datetime类型;
  • retry:任务实践停业时候是否尝试,布尔类型。;
  • serializer:种类化方案,补助pickle、json、yaml、msgpack;
  • priority:职责优先级,有0~9优先级可安装,int类型;
  • retry_policy:职责重试机制,个中含有多少个重试参数,类型是dict如下:

图片 20图片 21

max_retries:最大重试次数

interval_start:重试等待时间

interval_step:每次重试叠加时长,假设第一重试等待1s,第二次等待1+n秒

interval_max:最大等待时间

####示例
 add.apply_async((1, 3), retry=True, retry_policy={
        'max_retries': 1,
        'interval_start': 0,
        'interval_step': 0.8,
        'interval_max': 5,
    })

View Code

越来越多参数参考:

 

  

4.2 创设项目文件

创办三个项目:名字称为proj

- proj/
 - proj/__init__.py
 - proj/settings.py
 - proj/urls.py
 - proj/wsgi.py
- manage.py

创立多个新的文件: proj/proj/mycelery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

在proj/proj/__init__.py:添加

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .mycelery import app as celery_app

__all__ = ['celery_app']

 五、管理与监督检查

  Celery管理和监察功能是透过flower组件完成的,flower组件不仅提供监控功效,还提供HTTP API可达成对woker和task的保管。

4.3 配置Celery

咱俩在mycelery.py文件中表达celery的布局文件在settings.py中,并且是以CELEEvoqueY早先。

app.config_from_object('django.conf:settings', namespace='CELERY')

在settings.py文件中增添celery配置:

图片 22

大家的安排是选用redis作为新闻队列,音讯的代理和结果都以用redis,职责的体系化使用json格式。

首要:redis://127.0.0.1:6379/0那几个注脚使用的redis的0号队列,如果有多少个celery任务都施用同一个队列,则会促成任务混乱。最佳是celery实例单独接纳贰个行列。

安装使用

pip3 install flower

启动

 flower -A project --port=5555   
# -A :项目目录
#--port 指定端口

访问http:ip:5555

图片 23

api使用,例如获取woker音讯:

curl http://127.0.0.1:5555/api/workers

结果:

图片 24

更多api参考:

 

4.4创建APP

创建Django的App,名称为celery_task,在app目录下创办tasks.py文件。

完了后目录结构为:

├── celery_task
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tasks.py
│ ├── tests.py
│ └── views.py
├── db.sqlite3
├── manage.py
├── proj
│ ├── celery.py
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── templates

4.5编写task任务

编纂任务文件

tasks.py

在tasks.py文件中加多下边代码

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
 return x   y

@shared_task
def mul(x, y):
 return x * y

@shared_task
def xsum(numbers):
 return sum(numbers)

启动celery:celery -A proj.mycelery worker -l info

说明:proj 为模块名称,mycelery 为celery 的实例所在的文本。

启航成功打字与印刷:

图片 25

4.6在views中调用职责

在views中编辑接口,完毕七个功用:

  1. 接触职责,然后再次回到职务的结果和天职ID
  2. 基于职分ID查询职责情状

代码如下:

图片 26

启动django。

新开二个对话运转celery;运维命令为:celery –A proj.mycelery worker –l info

访问 ,能够看出重回的结果。

图片 27

在celery运营的页面,能够看看上边输出:

图片 28

4.7在views中查询职责状态

一部分时候职责执行时间较长,需求查询任务是还是不是施行到位,能够依赖任务的id来查询任务意况,根据情形举办下一步操作。

能够看来职分的情形为:SUCCESS

图片 29

5Celery按期职责

Celery作为异步职务队列,大家能够根据大家设置的日子,按时的实行一些任务,比如每一天数据库备份,日志转存等。

Celery的定期任务安排特别轻巧:

定时职责的布署照旧在setting.py文件中。

证实:要是感觉celery 的数目配置文件和Django 的都在setting.py 三个文件中不便于,能够分拆出来,只需求在mycelery.py 的文本中指明就可以。

app.config_from_object('django.conf:yoursettingsfile', namespace='CELERY')

5.1任务间距运营

#每30秒调用task.add
from datetime import timedelta

CELERY_BEAT_SCHEDULE = {
 'add-every-30-seconds': {
  'task': 'tasks.add',
  'schedule': timedelta(seconds=30),
  'args': (16, 16)
 },
}

5.2定期推行

定时每日午夜7:30分运作。

小心:设置职分时间时在乎时间格式,UTC时间恐怕地面时间。

#crontab任务
#每天7:30调用task.add
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
 # Executes every Monday morning at 7:30 A.M
 'add-every-monday-morning': {
  'task': 'tasks.add',
  'schedule': crontab(hour=7, minute=30),
  'args': (16, 16),
 },
}

5.3定期职责运转

配置了按时职分,除了worker进度外,还索要运营三个beat进度。

Beat进度的意义就一定于四个定期职务,依照布置来施行相应的职务。

5.3.1  启动beat进程

一声令下如下:celery -A proj.mycelery beat -l info

图片 30

5.3.2  启动worker进程

Worker进度运营和眼前运营命令同样。celery –A proj.mycelery worker –l info

图片 31

6 Celery深入

Celery职务帮忙多元的周转格局:

  1. 支撑动态内定并发数 --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
  2. 支持链式任务
  3. 支持Group任务
  4. 支撑职务区别优先级
  5. 支撑钦点职务队列
  6. 支撑采纳eventlet方式运转worker

举个例子:钦定并发数为一千

celery -A proj.mycelery worker -c 1000

那么些可以依附使用的中肯机关精通和学习。

上述就是本文的全体内容,希望对大家的学习抱有助于,也盼望我们多多指教脚本之家。

你只怕感兴趣的稿子:

  • Django中使用celery完结异步职责的亲自过问代码
  • 异步任务队列Celery在Django中的选拔办法

本文由星彩网app下载发布于计算机编程,转载请注明出处:Django使用Celery异步任务队列的使用,分布式任务

TAG标签: 星彩网app下载
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。