为什么要集成celery
当集成了celery之后,我们可以利用celery的异步任务和定时任务来处理我们django项目逻辑。例如:异步发送邮件和定时清理缓存等,在我的博客我就利用celery等配置完成定时对豆瓣影单书单的爬取,并且可控计划(循环、定时、时钟)类型。
django集成celery—demo
如果把celery集成到django,利用celery的异步处理任务就很方便的解决站点需要异步处理的逻辑,需要注意的是最新的celery4.x已经不能再使用django-celery了
- 通过pycharm新建django工程名为celeryproject
在celeryproject/celeryproject目录下新建celery.py文件,内容如下:
from __future__ import absolute_import, unicode_literals from celery import Celery import os # 配置默认的django settings模块配置给celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryproject.settings') app = Celery('celeryproject') # 命名空间 namespace='CELERY'#定义所有与celery相关的配置的键名要以'CELERY_'为前缀。 app.config_from_object('django.conf:settings', namespace='CELERY') # 如果你把所有的task都定义在单独的tasks.py模块中,加上这句话celery会自动发现这些模块中的task, app.autodiscover_tasks()
添加celeryproject/celeryproject目录的init.py文件内容如下:
from __future__ import absolute_import, unicode_literals from .cerely import app as celery_app __all__ = ['celery_app']
- 在celeryproject/celeryproject目录下的settings.py文件添加以下内容:
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
- 执行
python manage.py startapp myapp
新建一个app - 在celeryproject/myapp目录下新建tasks.py文件内容如下:
from celery import shared_task import time @shared_task def send_email(): print("模拟发送邮件") import time #添加延时是为了模拟发送邮件的耗时 time.sleep(10) return "邮件发送成功"
在myapp目录下的view.py里面添加内容如下:
from django.http import HttpResponse from myapp.tasks import send_email def index(request): result=send_email.delay() return HttpResponse("ok!")
- 在
celeryproject/celeryproject
目录下urls.py文件配置路由如下from django.contrib import admin from django.urls import path from myapp import views urlpatterns = [ path('admin/', admin.site.urls), path('index/',views.index), ]
- 打开terminal窗口,在根目录下执行命令
celery worker -A celeryproject -l info -P eventlet
,出现界面如下,代表celery启动正常
- 运行django工程,浏览器访问
http://127.0.0.1:8000/index/
由于在模拟发送邮件的代码添加了时间等待,可以很明显的感觉到浏览器内容ok显示出来之后,回到terminal窗口,过了一会儿才提示邮件发送成功,说明任务的确是异步处理的
django集成django_celery_reuslts
- 安装
pip install celery django-celery-results
- 添加应用到django
INSTALLED_APPS = [ ... 'django_celery_results.apps.CeleryResultConfig', ]
- 在项目的settings.py更改存储结果后台为数据库存储
CELERY_RESULT_BACKEND = 'django-db' #存储任务结果
- 执行数据库迁移命令
当迁移完毕,django_celery_reuslts会生成python manage.py makemigrations python manage.py migrate
django_celery_results_taskresult
表,如果有任务执行完毕,django_celery_results会将数据结果存储到该表,大致表内容如下:django集成django_celery_beat
- 安装
pip install django-celery-beat
- 添加应用到django
INSTALLED_APPS = [ ... 'django_celery_results.apps.CeleryResultConfig', 'django_celery_beat.apps.BeatConfig', ]
- 修改项目的settings.py里面的内容为如下
CELERY_TIMEZONE = TIME_ZONE #保持时区跟当前项目时区一致 # CELERY_ENABLE_UTC=False DJANGO_CELERY_BEAT_TZ_AWARE = False #解决时区问题 CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' #redis存储任务结果 CELERY_RESULT_BACKEND = 'django-db' #存储任务结果 CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle' CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
- 执行数据库迁移命令
当迁移完毕,django_celery_reuslts会生成五张表(前面3张表比较重要)python manage.py makemigrations python manage.py migrate
表名 | 描述 |
---|---|
django_celery_beat_clockedschedule | 周期性任务表 |
django_celery_beat_intervalschedule | 间隔时间计划表(可循环) |
django_celery_beat_crontabschedule | 定时计划表 |
django_celery_beat_clockedschedule | 时钟计划表(单次执行) |
django_celery_beat_solarschedule | 经纬度表计划表 |
django_celery_beat_periodictask | 计划何时更改记录表 |
而且由于我们已经集成django_celery_reuslts,如果我们通过django_celery_beat后台配置任务时,当任务执行完毕,django_celery_results也会将数据结果存储到表里面
测试django-celery-beat任务
在django的应用目录下新建tasks.py文件
from __future__ import absolute_import from celery import shared_task @shared_task def test1(): print("测试间歇性任务") return "间歇性任务ok" @shared_task def test2(): print("测试定时任务") return "定时任务ok" @shared_task def test3(): print("测试时钟任务") return "时钟任务ok"
- 启动django访问后台
我这里用的是xamdin,我这里将django-celery-beat的模型注册到下xadmin了 本地开启redis服务(一定要记得开启)
执行命令
celery worker -A tasks -l info -P eventlet
可以看到任务已经被celery识别到
- 在后台新建intervalschedule、crontabschedule、clockedschedule三个任务
重点
1.每个任务任务只能选取一种schedule类型,否则会报错的
2.taskname要填写完整的任务路径
3.由于clockedschedule存在时区差8小时问题,故设置任务执行的时间必须要减去8小时才会准时执行。 - 任务创建后如下图
- 执行命令
celery beat -A blog -l info
可以看到间歇任务已经开始发送了,切回到worker的terminal界面可以看到如下界面- 间歇性任务执行结果
- 开始发送定时任务
- 定时任务执行结果
- 开始发送时钟任务
- 时钟任务执行结果(时钟任务只会执行一次,执行完毕计划表就会过期)
def test2(): print("测试django应用下tests.py里面的任务是否可以识别") 用执行下celery命令看下任务列表关于celery任务模块识别探讨
下面让我们来测试下,我在django的任意的应用目录下新建tasks.py文件,内容如下
def test3(): print("测试django应用下的tasks.py里面任务是否可以识别出来") 然后我在应用目录下的tests.py下新建任务如下:from celery import shared_task @shared_task
from celery import shared_task @shared_task
结果如下:celery worker -A blog -l info -P eventlet
从上面可以知道celery会自动发现django应用目录下tasks.py里面的任务,不是tasks.py里面的任务则无法识别[tasks] . local_apps.util.tasks.test3
不过我又发现了一个很奇怪的问题,比方说在任意应用的views.py里面这样
再执行下celery命令看下任务列表from local_apps.util.tests import test2
结果如下:celery worker -A blog -l info -P eventlet
发现celery又能识别出任务了。我猜测这样的操作应该是变相的完成了celery的[tasks] . local_apps.util.tasks.test3 . local_apps.util.tests.test2
CELERY_IMPORTS
配置,我们取消上面刚刚的导入,在项目的settings.py里面增加配置如下:
再执行下celery命令看下任务列表CELERY_IMPORTS=["local_apps.util.test"]
结果如下:celery worker -A blog -l info -P eventlet
也就是说任务不一定要放在tasks.py文件内,并且任务模块不一定要在配置文件里面注明,项目里面完成了该模块导入都行。[tasks] . local_apps.util.tasks.test3 . local_apps.util.tests.test2
- 间歇性任务执行结果
- 在后台新建intervalschedule、crontabschedule、clockedschedule三个任务
hhh
回复欢迎你的到来
回复有没有按个django,博客系统视频教学
回复搞这个博客的时候,过程比较混乱,没有成体系的记录笔记。再加上工作忙事情也多,看后期自己的计划了。
回复