重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
celery,rabbitmq,django,db,计划任务,后台管理 。基本的命令和代码。
成都创新互联专注为客户提供全方位的互联网综合服务,包含不限于成都网站制作、网站设计、海湖新网络推广、微信小程序开发、海湖新网络营销、海湖新企业策划、海湖新品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们大的嘉奖;成都创新互联为所有大学生创业者提供海湖新建站搭建服务,24小时服务热线:13518219792,官方网址:www.cdcxhl.com0. 安装
0.1 install
1. celery 基础命令
1.1. 启动 celery
1.2. 后台进程启动celery worker
1.3. 重启 celery worker 后台进程
1.4. 停止 celery worker 后台进程
1.5. 等待 停止 celery worker 后台进程
1.6. 指定 pidfile & logfile
1.7. 启动 多个 worker 并且指定不同参数
1.8. 手动杀死所有worker进程
1.9. 较完整的celery的启动命令
2. 管理相关命令
2.1. 启动 flower
2.2. 需要 目录
2.3. 使用 librabbitmq
0.1 install
sudoapt-getinstallbuild-essentialpython-dev sudopipinstallcelery sudopipinstalllibrabbitmq sudopipinstallflower sudoapt-getinstallrabbitmq-server sudorabbitmq-pluginsenablerabbitmq_management sudoservicerabbitmq-serverrestart sudorabbitmq-pluginsdisablerabbitmq_management
访问 http://host:15672 即可进入管理界面。
默认用户名,密码都是 guest
1.1. 启动 celery
celery-Ataskworker-linfo
启动时指定要使用的 queue
celery-Ataskworker-linfo-Qbrand_queue,new_queue-E
1.2. 后台进程启动celery worker
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue-E
可通过命令查看后台启动的进程:
ps-aux|grepcelery [celeryd:w1@x:MainProcess]-active-(worker-E-Atask-linfo-Qbrand_queue,new_queue--logfile=w1.log--pidfile=w1.pid--hostname=w1@x)
可以看到默认添加了几个参数:
--logfile=w1.log默认在当前文件夹新建w1.log文件 --pidfile=w1.pid默认在当前文件夹新建w1.pid文件 --hostname=w1@x默认实例名woker_name/机器名
1.3. 重启 celery worker 后台进程
celerymultirestartw1-Atask-linfo-Qbrand_queue,new_queue-E
1.4. 停止 celery worker 后台进程
celerymultistopw1-Atask-linfo-Qbrand_queue,new_queue-E
stop 命令是异步的,worker 会立即停止,即使当时仍然有任务在执行,
并且不会写停止worker相关的日志
1.5. 等待 停止 celery worker 后台进程
celerymultistopwaitw1-Atask-linfo-Qbrand_queue,new_queue-E
这个停止命令,会等待正在运行的所有任务都完成再停止。
1.6. 指定 pidfile & logfile
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue-E--pidfile=/var/www/api/space/run/%n.pid--logfile=/var/www/api/space/logs/%n%I.log
1.7. 启动 多个 worker 并且指定不同参数
celerymultistart10-Atask-linfo-E-Q:1-3images,video-Q:4,5data-Qdefault-L:4,5debug
启动了10个worker:
worker 1,2,3 使用了队列 images, video
worker 4,5 使用了队列 data
worker 其他 使用了队列 default
-L 是什么参数?
1.8. 手动杀死所有worker进程
psauxww|grep\'celeryworker\'|awk\'{print$2}\'|xargskill-9
1.9. 较完整的celery的启动命令
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue,time_queue,cron_queue-E-B-s/var/www/api/space/run/celerybeat-schedule--pidfile=/var/www/api/space/run/%n.pid--logfile=/var/www/api/space/logs/%n%I.log
2.1 启动 flower
celeryflower--broker=amqp://tiger:tiger@192.168.0.6:5672/vr_tiger--address=192.168.0.4--port=5555--broker_api=http://tiger:tiger@192.168.0.6:15672/api/--basic_auth=tiger:tiger rabbitmq主机地址:192.168.0.6 本机地址:192.168.0.4 本地监听端口:5555
2.2 需要目录
run/ log/
2.3 使用 librabbitmq
Ifyou’reusingRabbitMQ(AMQP)asthebrokerthenyoucaninstallthelibrabbitmqmoduletouseanoptimizedclientwritteninC: $pipinstalllibrabbitmq
注意
1. 后台运行 celery 的是否,worker 信息不会保存。所以,每次对 worker 操作时都需要加上相同的参数。特别是 pidfile 和 logfile 需要相同。
2. 我没有使用 celery 提供的任务结果存储。我在业务中自己处理 过程及结果。
代码示例
配置文件:
#-*-coding=utf-8-*- #FileName:task_conf.py from__future__importabsolute_import fromceleryimportCelery fromdatetimeimporttimedelta fromcelery.schedulesimportcrontab \'\'\' task_random:是任务的名称 broker:通过amqp://用户名:密码@ip/虚拟主机连接amqp include:任务程序 \'\'\' #消息队列配置 mq_host=\'192.168.0.6\' mq_name=\'tiger\' mq_pass=\'tiger\' mq_vr=\'vr_tiger\' broker=\'amqp://%s:%s@%s/%s\'%(mq_name,mq_pass,mq_host,mq_vr) #初始化app app=Celery(\'name_wash\',broker=broker,include=[\'task\']) #指定任务存储队列 app.conf.update( CELERY_ROUTES={ \'task.exe_task\':{\'queue\':\'brand_queue\'}, \'task.task_sms_send\':{\'queue\':\'new_queue\'}, \'task.task_sec\':{\'queue\':\'time_queue\'}, \'task.task_cron\':{\'queue\':\'cron_queue\'} }, CELERYBEAT_SCHEDULE={ \'exe-every-10-seconds\':{ \'task\':\'task.task_sec\', \'schedule\':timedelta(seconds=30), \'args\':[1], }, \'add-every-monday-morning\':{ \'task\':\'task.task_cron\', \'schedule\':crontab(hour=15,minute=47,day_of_week=5), \'args\':(15232897835,), }, }, #CELERY_TASK_SERIALIZER=\'json\', #CELERY_ACCEPT_CONTENT=[\'json\'],#Ignoreothercontent #CELERY_RESULT_SERIALIZER=\'json\', CELERY_EVENT_QUEUE_TTL=5, CELERY_TIMEZONE=\'Asia/Shanghai\', CELERY_ENABLE_UTC=True, CELERY_DISABLE_RATE_LIMITS=True, CELERY_IGNORE_RESULT=True ) if__name__==\'__main__\': app.start()
任务文件:
#-*-coding=utf-8-*- #FileName:task.py \'\'\' task \'\'\' from__future__importabsolute_import importtime importtraceback fromjob.task_confimportapp @app.task(ignore_result=True) defexe_task(task_id,number): \'\'\'根据参数执行任务\'\'\' try: print\'exetask:\',task_id time.sleep(number) except: traceback.print_exc() return(task_id,number,-1) return\'true:)\' @app.task deftask_sms_send(mobile,content): \'\'\'任务-发送短信\'\'\' try: print\'sendsms:mobile->%s,content->%s\'%(mobile,content) except: traceback.print_exc() return\'Fail:(\' return\'Success:)\' @app.task deftask_sec(mobile): \'\'\'测试任务时间定制\'\'\' try: print\'sendsms:mobile->%s.\'%mobile except: traceback.print_exc() return\'F\' return\'S\' @app.task deftask_cron(mobile): \'\'\'测试任务时间定制Cron\'\'\' try: print\'sendsms:mobile->%s.\'%mobile except: traceback.print_exc() return\'F-cron\' return\'S-cron\' defmain(): res=exe_task(2,2) print\'res:\',res if__name__==\'__main__\': main()
添加任务代码:
#-*-coding=utf-8-*- #FileName:add_task.py importtime importtraceback importrandom fromtaskimportexe_task,task_sms_send defaction(): tries=0 while1: try: tries+=1 iftries>=20: break task_id=tries number=random.randint(1,5) exe_task.apply_async(args=[task_id,number],queue=\'brand_queue\') print\'addedonetask\' time.sleep(1) except: traceback.print_exc() pass print\'addtaskdone\' defadd_task_by_django(task_id,number): \'\'\'测试从django添加任务\'\'\' exe_task.apply_async(args=[task_id,number],queue=\'brand_queue\') defadd_task_sms(mobile,content): \'\'\'添加发送短信任务\'\'\' #列表参数或者字典参数 #task.apply_async(args=[arg1,arg2],kwargs={\'kwarg1\':\'x\',\'kwarg2\':\'y\'}) task_sms_send.apply_async(args=[mobile,content],queue=\'new_queue\') print\'taskadded:sms\' defmain(): action() if__name__==\'__main__\': main()