博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
3-django进阶之celery
阅读量:6031 次
发布时间:2019-06-20

本文共 9137 字,大约阅读时间需要 30 分钟。

Django集成Celery到项目

​ 本节将celery集成到Django项目中,实现异步任务处理和定时任务处理

Celery工作流程

celery流程图

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

1 Celery安装与配置

在虚拟环境中安装:

pip install django-celery==3.2.2

pip install django-redis

pip install flower # celery 的web管理平台(异步任务可视化)

查看集成到Django中的celery版本, pip freeze

celery==3.1.26.post2 django-celery==3.2.2 flower==0.9.2

启动redis服务, 端口假设为6379

发现pip安装比较慢的情况

pip install pillow -i

2 Django中配置

(1)在主工程的配置文件settings.py 中应用注册表INSTALLED_APPS中加入 djcelery

INSTALLED_APPS = [    'django.contrib.admin',    'django.contrib.auth',    'django.contrib.contenttypes',    'django.contrib.sessions',    'django.contrib.messages',    'django.contrib.staticfiles',    'art',    'xadmin',    'crispy_forms',    'DjangoUeditor',    'djcelery',       #加入djcelery]

(2) 在settings.py 中加入celery配置信息

############################## celery 配置信息 start#############################import djcelerydjcelery.setup_loader()BROKER_URL = 'redis://127.0.0.1:6379/1'CELERY_IMPORTS = ('art.tasks')CELERY_TIMEZONE = 'Asia/Shanghai'CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' from celery.schedules import crontabfrom celery.schedules import timedeltaCELERYBEAT_SCHEDULE = {    #定时器策略    #定时任务一: 每隔30s运行一次    u'测试定时器1': {        "task": "art.tasks.tsend_email",        #"schedule": crontab(minute='*/2'),  # or 'schedule':   timedelta(seconds=3),        "schedule":timedelta(seconds=30),        "args": (),    },}############################## celery 配置信息 end#############################

​ 当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task

​ BROKER_URL:broker是代理人,它负责分发任务给worker去执行。我使用的是Redis作为broker

​ 没有设置 CELERY_RESULT_BACKEND,默认没有配置,此时Django会使用默认的数据库(也是你指定的orm数据库)。

CELERY_IMPORTS:是导入目标任务文件

CELERYBEAT_SCHEDULER:使用了django-celery默认的数据库调度模型,任务执行周期都被存在默认指定的orm数据库中.

CELERYBEAT_SCHEDULE:设置定时的时间配置, 可以精确到秒,分钟,小时,天,周等。

(3)创建应用实例

​ 在主工程目录添加celery.py, 添加自动检索django工程tasks任务

​ vim artproject/celery.py

#!/usr/bin/env python  # encoding: utf-8  #目的是拒绝隐式引入,celery.py和celery冲突。from __future__ import absolute_import,unicode_literals import osfrom celery import Celeryfrom django.conf import settings# 设置环境变量os.environ.setdefault("DJANGO_SETTINGS_MODULE", "artproject.settings")#创建celery应用app = Celery('art_project')app.config_from_object('django.conf:settings')#如果在工程的应用中创建了tasks.py模块,那么Celery应用就会自动去检索创建的任务。比如你添加了一个任#务,在django中会实时地检索出来。app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)

(4) 创建任务 tasks

每个任务本质上就是一个函数,在tasks.py中,写入你想要执行的函数即可。

在应用art中添加我们需要提供的异步服务和定时服务

vim art/tasks.py

#!/usr/bin/env python  # encoding: utf-8  from __future__ import absolute_importimport timefrom django.core.mail import send_mailfrom celery.utils.log import get_task_loggerfrom artproject.celery import appfrom art.utils.send_mail import pack_html, send_email@app.taskdef tsend_email():   url = "http://1000phone.com"   receiver = 'diyuhuan@1000phone.com'   content = pack_html(receiver, url)   # content = 'this is email content.'   send_email(receiver, content)   print('send email ok!')@app.taskdef add(x, y):   return x+y

上述我们把异步处理任务add和定时器任务tsend_email都放在了tasks.py 中

(5)迁移生成celery需要的数据表

python manage.py migrate

此时数据库表结构多出了几个

celery_taskmeta            || celery_tasksetmeta         || djcelery_crontabschedule   || djcelery_intervalschedule  || djcelery_periodictask      || djcelery_periodictasks     || djcelery_taskstate         || djcelery_workerstate

3 启动服务,测试

我们可以采用 python manage.py help 发现多出了 celery 相关选项。

(1)启动django celery 服务

启动服务:

python manage.py celery worker --loglevel=info

此时异步处理和定时处理服务都已经启动了

(2)web端接口触发异步任务处理

我们在web端加入一个入口,触发异步任务处理add函数

在应用art的urls.py 中加入如下对应关系

from art.views import add_handlerurl(r'^add', add_handler),

art/views.py 中加入处理逻辑

def add_handler(request):   x = request.GET.get('x', '1')   y = request.GET.get('y', '1')   from .tasks import add   add.delay(int(x), int(y))   res = {'code':200, 'message':'ok', 'data':[{'x':x, 'y':y}]}   return HttpResponse(json.dumps(res))

启动web服务,通过url传入的参数,通过handler的add.delay(x, y)计算并存入mysql

:8000/art/add?x=188&y=22

(4) 测试定时器,发送邮件

在终端输入 python manage.py celerybeat -l info

会自动触发每隔30s执行一次tsend_email定时器函数,发送邮件:

CELERYBEAT_SCHEDULE = {    #定时器策略    #定时任务一: 每隔30s运行一次    u'测试定时器1': {        "task": "art.tasks.tsend_email",        #"schedule": crontab(minute='*/2'),  # or 'schedule': timedelta(seconds=3),        "schedule":timedelta(seconds=30),        "args": (),    },}

具体发送邮件服务程序见下面的第4节

4 邮件发送服务

项目中经常会有定时发送邮件的情形,比如发送数据报告,发送异常服务报告等。

可以编辑文件 art/utils/send_mail.py, 内容编辑如下:

#!/usr/bin/env python#-*- coding:utf-8 -*-#written by diyuhuan#发送邮件(wd_email_check123账号用于内部测试使用,不要用于其他用途)import smtplib  from email.mime.multipart import MIMEMultipart  from email.mime.text import MIMEText  from email.mime.image import MIMEImage from email.header import Headerimport timesender = 'wd_email_check123@163.com'  subject = u'api开放平台邮箱验证'smtpserver = 'smtp.163.com'username = 'wd_email_check123'password = 'wandacheck1234'mail_postfix="163.com"def send_email(receiver, content):    try:        me = username+"<"+username+"@"+mail_postfix+">"        msg = MIMEText(content, 'html', 'utf-8')        msg['Subject'] = subject        msg['From'] = sender        msg['To'] = receiver        smtp = smtplib.SMTP()          smtp.connect(smtpserver)          smtp.login(username, password)        smtp.sendmail(sender, receiver, msg.as_string())          smtp.quit()        return True    except Exception as e:        print('send_email has error with : ' + str(e))        return Falsedef pack_html(receiver, url):    html_content = u"
尊敬的用户
%s 您好!
" \ "
感谢您关注我们的平台 ,我们将为您提供最贴心的服务,祝您购物愉快。
" \ "
点击以下链接,即可完成邮箱安全验证:
" \ "
%s
" \ "
为保障您的帐号安全,请在24小时内点击该链接;
" \ "
若您没有申请过验证邮箱 ,请您忽略此邮件,由此给您带来的不便请谅解。
" \ "" % (receiver, url, url) html_content = html_content return html_contentif __name__ == "__main__": url = "http://1000phone.com" receiver = 'diyuhuan@1000phone.com' #content = pack_html(receiver, url) content = 'this is email content. at %s.'%int(time.time()) send_email(receiver, content)

至此,在celery ui界面可以看到两类,定时器处理和异步处理。

5 启动flower服务

​ python manager celery flower

案例

clipboard.png

读书网站实现抢读功能

qd(request, id) :抢读视图函数

quereyQD(request,id) :查询抢读的视图函数

settings.py

INSTALLED_APPS = [   'djcelery',]...import djcelery# 装载djcelery对象djcelery.setup_loader()# 配置消息中间件的位置BROKER_URL = 'redis://127.0.0.1:6379/12'CELERY_TIMEZONE = 'Asia/Shanghai'# 配置批量调试器CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

在主工程目录添加celery.py, 添加自动检索django工程tasks任务

celery.py

from __future__ import absolute_importimport osfrom celery import Celery# 设置环境变量from HArtPro import settingsos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'HArtPro.settings')# 创建Celery对象app = Celery('hart')# 加载配置app.config_from_object('django.conf:settings')# 自动发现task的异步任务app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)

当前app目录下建立tasks.py

from MArtPro.celery import appfrom utils import redis_cache@app.taskdef advanceArt(artId, userId):    # 抢读文章(artId 文章id, userId 当前用户登录的Id)    print('用户', userId, '正在抢读', artId)    # 判断当前抢读的hash对象AdvanceArt长度是否达到5个    if redis_cache.hlen('AdvanceArt') >= 5:        return artId + '抢读失败'    redis_cache.hset('AdvanceArt', userId, artId)    return artId + '抢读成功!'

views.py

from redis_ import rd  # rd 对象from art import tasksdef (request, artId):    # 抢读    login_user = request.session.get('login_user')    if not login_user:        return JsonResponse({'status': 101,                             'msg': '亲,请先登录,再抢读,谢谢!'})    # 判断当前用户是否已抢过    user_id =json.loads(login_user).get('id')    if redis_cache.hexists('AdvanceArt', user_id):        return JsonResponse({'status': 205,                             'msg': '亲,你只能抢一本'})    # 任务延迟执行    tasks.advanceArt.delay(artId, user_id)    return JsonResponse({'status': 201,                         'msg': '正在抢读...'})def queryAdvance(request, artId):    # 查询抢读是否成功    login_user = request.session.get('login_user')    if not login_user:        return JsonResponse({'status': 101,                             'msg': '亲,请先登录,再查看抢读,谢谢!'})    user_id = json.loads(login_user).get('id')    artId = redis_cache.hget('AdvanceArt', user_id)    if artId:        art = Art.objects.get(id=artId.decode())        return JsonResponse({'status': 200,                             'msg': '恭喜您,抢读%s 成功'%art.title})    else:        if redis_cache.hlen('AdvanceArt')< 5:            return JsonResponse({'status': 202,                                 'msg': '正在抢读...'})        else:            return JsonResponse({'status': 203,                                 'msg': '抢读失败, 请下次碰碰运气!'})

前端通过定时器,每秒执行查询函数

转载地址:http://sedhx.baihongyu.com/

你可能感兴趣的文章
Docker入门实战 (二) - Docker环境的搭建方法
查看>>
Elasticsearch通关教程(一): 基础入门
查看>>
阿里云联合8家芯片商推“全平台通信模组”,加速物联网生态建设
查看>>
阿里云服务器怎么升级配置?升级有哪些限制?
查看>>
企业分布式微服务云SpringCloud SpringBoot mybatis (十二)断路器监控(Hystrix Dashboard)...
查看>>
看懂架构设计中的服务隔离
查看>>
Win10-MySQL-zip安装方法
查看>>
Linux-权限管理(ACL权限)
查看>>
星舆科技:打造下一代定位技术 以高精度位置感知构筑AI+时代基础力量
查看>>
Redis命令——哈希(Hash)
查看>>
进程与线程 thread (二)——线程概念
查看>>
用户体验为什么重要?如何提升产品的用户体验?(写给产品小白)
查看>>
python人工智能机器人工具书籍: Python Robotics Projects - 2018
查看>>
k8s环境部署及使用方式
查看>>
Unity 之 Pure版Entity Component System (ECS) 官方Rotation示例解析
查看>>
会话管理(session)
查看>>
Python爬取新浪微博用户信息及微博内容
查看>>
微信小程序开发:canvas 多行文字换行
查看>>
解决Mysql数据库提示innodb表不存在的问题!
查看>>
malloc函数及用法
查看>>