重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

Celery-一个会做异步任务,定时任务的芹菜

Celery 分布式任务队列

同步与异步

比如说你要去一个餐厅吃饭,你点完菜以后假设服务员告诉你,你点的菜,要两个小时才能做完,这个时候你可以有两个选择

目前成都创新互联公司已为上1000家的企业提供了网站建设、域名、虚拟空间、网站托管运营、企业网站设计、乌拉特中网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

  • 一直在餐厅等着饭菜上桌
  • 你可以回家等着,这个时候你就可以把你的电话留给服务员,告诉服务员等什么时候你的饭菜上桌了,在给你打电话

​ 所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

​ 所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。

阻塞与非阻塞

继续上面的例子

  • 不管你的在餐厅等着还是回家等着,这个期间你的都不能干别的事,那么该机制就是阻塞的,表现在程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。
  • 你回家以后就可以去做别的事了,一遍做别的事,一般去等待服务员的电话,这样的状态就是非阻塞的,因为你(等待者)没有阻塞在这个消息通知上,而是一边做自己的事情一边等待。

​ 阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的

同步/异步与阻塞/非阻塞

同步阻塞形式

效率最低。拿上面的例子来说,就是你专心的在餐馆等着,什么别的事都不做。

异步阻塞形式

在家里等待的过程中,你一直盯着手机,不去做其它的事情,那么很显然,你被阻塞在了这个等待的操作上面;

异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

同步非阻塞形式

实际上是效率低下的。

想象一下你如果害怕服务员忘记给你打电话通知你,你过一会就要去餐厅看一下你的饭菜好了没有,没好 ,在回家等待,过一会再去看一眼,没好再回家等着,那么效率可想而知是低下的。

异步非阻塞形式

​ 比如说你回家以后就直接看电视了,把手机放在一边,等什么时候电话响了,你在去接电话.这就是异步非阻塞形式,大家想一下这样是不是效率是最高的

​ 那么同步一定是阻塞的吗?异步一定是非阻塞的吗?

生产者消费者模型

在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。

单单抽象出生产者和消费者,还够不上是生产者消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:

Celery-一个会做异步任务,定时任务的芹菜

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过消息队列(缓冲区)来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给消息队列,消费者不找生产者要数据,而是直接从消息队列里取,消息队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个消息队列就是用来给生产者和消费者解耦的。------------->这里又有一个问题,什么叫做解耦?

解耦:假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

因为太抽象,看过网上的说明之后,通过我的理解,我举了个例子:吃包子。

假如你非常喜欢吃包子(吃起来根本停不下来),今天,你妈妈(生产者)在蒸包子,厨房有张桌子(缓冲区),你妈妈将蒸熟的包子盛在盘子(消息)里,然后放到桌子上,你正在看巴西奥运会,看到蒸熟的包子放在厨房桌子上的盘子里,你就把盘子取走,一边吃包子一边看奥运。在这个过程中,你和你妈妈使用同一个桌子放置盘子和取走盘子,这里桌子就是一个共享对象。生产者添加食物,消费者取走食物。桌子的好处是,你妈妈不用直接把盘子给你,只是负责把包子装在盘子里放到桌子上,如果桌子满了,就不再放了,等待。而且生产者还有其他事情要做,消费者吃包子比较慢,生产者不能一直等消费者吃完包子把盘子放回去再去生产,因为吃包子的人有很多,如果这期间你好朋友来了,和你一起吃包子,生产者不用关注是哪个消费者去桌子上拿盘子,而消费者只去关注桌子上有没有放盘子,如果有,就端过来吃盘子中的包子,没有的话就等待。对应关系如下图:

Celery-一个会做异步任务,定时任务的芹菜

celery

生产者消费者模型

消费者
from  celery import Celery

task=Celery('task',broker="redis://10.211.55.19:6379") #task可以是任何名称,后面跟的是队列的缓存者,celery中一般称为中间人,如果要是密码访问的话,需要是redis://:{pass}@IP地址:端口

@task.task
def add(a,b):
    return a+b

启动 celery从4.0版本以后就不在支持windows了,如果想在windows环境下使用的话,需要安装eventlet这个包,启动的时候需要指定-P eventlet

celery worker -A c -l info 

生产者

from c import add
for i in range(10):
    add.delay(1,2)

模拟两个消费者

在不同的位置在启动一个worker既可以了
celery worker -A c -l info 

生产者消费者模型升级

消费者
from  celery import Celery

task=Celery('task',broker="redis://10.211.55.19:6379/0",backend="redis://10.211.55.19:6379/2")#broker和backend可以是不同的队列,这里使用redis不同的库来模拟不同的队列,当然也可以一样

@task.task
def add(a,b):

    return a+b

启动过程跟上面一样

生产者
from c import add

for i in range(10):
    t=add.delay(i,2)
    print(t.get()) #获取结果
登录redis查看信息
redis-cli
127.0.0.1:6379[1]> SELECT 2
127.0.0.1:6379[2]> KEYS *
127.0.0.1:6379[2]> get celery-task-meta-6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42
"{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": [], \"task_id\": \"6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42\"}"
## 解析数据
d="{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": [], \"task_id\": \"6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42\"}"
import json
print(json.loads(d))

获取执行状态

倘若任务抛出了一个异常, get() 会重新抛出异常, 但你可以指定 propagate 参数来覆盖这一行为:

result.get(propagate=False)

如果任务抛出了一个异常,你也可以获取原始的回溯信息:

result.traceback…
print(t)
print(t.ready())
print(t.get())
print(t.ready())

定时任务

apply_async

t=add.apply_async((1,2),countdown=5) #表示延迟5秒钟执行任务
print(t)
print(t.get())
问题:是延迟5秒发送还是立即发送,消费者延迟5秒在执行那?

支持的参数 :

  • countdown : 等待一段时间再执行.

    add.apply_async((2,3), countdown=5)
  • eta : 定义任务的开始时间.这里的时间是UTC时间,这里有坑

    add.apply_async((2,3), eta=now+tiedelta(second=10))
  • expires : 设置超时时间.

    add.apply_async((2,3), expires=60)
  • retry : 定时如果任务失败后, 是否重试.

    add.apply_async((2,3), retry=False)
  • retry_policy : 重试策略.

    • max_retries : 最大重试次数, 默认为 3 次.
    • interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.
    • interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2
    • interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .

周期任务

from c import task
task.conf.beat_schedule={
    timezone='Asia/Shanghai',
    "each20s_task":{
        "task":"c.add",
        "schedule":3, # 每3秒钟执行一次
        "args":(10,10)
    },

}

其实celery也支持linux里面的crontab格式的书写的

from celery.schedules import crontab
task.conf.beat_schedule={
     timezone='Asia/Shanghai',
    "each4m_task":{
        "task":"c.add",
        "schedule":crontab(minute=3), #每小时的第3分钟执行
        "args":(10,10)
    },
     "each4m_task":{
        "task":"c.add",
        "schedule":crontab(minute=*/3), #每小时的第3分钟执行
        "args":(10,10)
    },
}

后台启动

worker:
    celery multi start worker1 \
    -A c \
    --pidfile="$HOME/run/celery/%n.pid" \
    --logfile="$HOME/log/celery/%n%I.log"

    celery multi restart worker1 \
    -A proj \
    --logfile="$HOME/log/celery/%n%I.log" \
    --pidfile="$HOME/run/celery/%n.pid

    celery multi stopwait worker1 --pidfile="$HOME/run/celery/%n.pid"

beat:
    celery -A d beat --detach -l info -f beat.log

与django结合

1.执行异步任务

1.1 在生成的目录文件中添加celery文件,内容如下
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tests.settings') #与项目关联

app = Celery('tests',backend='redis://10.211.55.19/3',broker='redis://10.211.55.19/4')
#创建celery对象
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
#在django中创建celery的命名空间
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
#自动加载任务
1.2编辑settings.py同级目录的init.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']
1.3 在项目中添加tasks文件,用来保存tasks的文件
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)
1.4添加views文件内容
from .tasks import add

def index(request):
    result = add.delay(2, 3)
    return HttpResponse('返回数据{}'.format(result.get()))
1.5 启动worker
celery -A tests  worker -l info
1.6添加url并调用

2.执行周期性任务

2.1需要安装一个django的组件来完成这个事情
pip install django-celery-beat
2.2将django-celery-beat添加到INSTALLED_APPS里面
INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
2.3刷新到数据库
python3 manage.py makemigrations #不执行这个会有问题
python3 manage.py migrate
2.4 admin配置
2.5启动beat
celery -A tests beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
2.6 启动worker
celery -A tests worker -l info 

网站标题:Celery-一个会做异步任务,定时任务的芹菜
链接URL:http://cqcxhl.cn/article/jdjheh.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP