我的项目目录结构如下所示, begin是我的应用
这里需要更改的是上图标记的三个文件以及settings.py文件,tasks.py文件是创建在django的应用下面,顾名思义里面放的就是我需要调用的耗时的任务。
1.创建celery.py文件
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery# 设置django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "test_project.settings")
app = Celery("test_project")app.config_from_object('django.conf:settings', namespace='CELERY')
# 发现任务文件每个app下的task.py
app.autodiscover_tasks()
2.修改__init__.py文件
from __future__ import absolute_import, unicode_literals# 这将确保在Django启动时始终导入应用程序,以便shared_task使用该应用程序。
from .celery import app as celery_app__all__ = ["celery_app"]
3.修改settings.py文件
在最后添加下面内容,这里我使用redis做celery的消息队列
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/3' # Broker配置,使用Redis作为消息中间件# CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3' # BACKEND配置,这里使用redis# CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
我项目执行的结果是写入文件中,所以不需要将结果序列化到redis中,就将对应代码注释了。
4.在应用下面创建tasks.py文件,该文件名好像是固定的,忘记了
做为演示,我创建一个函数如下所示
from celery.task import task
import time@task
def execute(key):print('开始执行耗时间的任务: {}'.format(key))time.sleep(30)
在 views.py 中测试调用该函数
from .tasks import execute...def test_celery(request):key = '***'result = execute.delay(key)print(result)return render(request, "celery.html")
我的函数那边需要接收参数,所以调用这边将参数传入到 delay 中即可,不穿参数的话就置空
测试前要先开启celery
celery -A test_project worker -P eventlet --loglevel=INFO --concurrency=10
这里我指定了10个worker, 默认worker的数量是你的CPU核心数
PS:celery4.x 在win10上使用 需要安装 eventlet包
pip install eventlet