celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。采用典型的生产者-消费者模型,主要由三部分组成:
- 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis、rabbitmq等作为broker
- 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程
- 存储结果的backend:执行结果存储在backend,默认也会存储在broker使用的MQ队列服务中,也可以单独配置用何种服务做backend
pip install eventlet(加入协程支持),如果不安装,celery -A project worker -l info运行会出现下面问题,详情见:这里
安装后使用 celery - A project_name worker -l info -P eventlet来运行
安装的RabbitMQ服务器(不多介绍,忘记了),运行的RabbitMQ服务,没有运行的RabbitMQ的话会出现下图所示问题:
配置settings.py文件
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672'
在项目的根路径下,也就是与settings.py文件同级目录下创建celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os# 为celery程序设置默认的Django设置模块。
os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'Demo1.settings')# 注册Celery的APP
app = Celery('Demo1')
# 绑定配置文件
app.config_from_object('django.conf.settings', namespace='CELERY')# 自动发现各个app下的tasks.py文件
app.autodiscover_tasks()
上面的demo1的是我的项目名,
在设置的同级目录中的init.py文件中添加下面内容
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app__all__ = ['celery_app']
在你的Django app下创建tasks.py文件
注意tasks.py必须建在各应用程序的根目录下,且只能叫tasks.py,不能随意命名
# tasks.py
from celery.task import task
import time@task
def test_celery():print("开始")time.sleep(6)return "test celery"
然后在你的views.py中调用这个函数
# views.py
from .tasks import test_celerydef index(request):result = test_celery.delay()print(result)return Httpresponse("test")
如果test_celery()中有参数,例如有参数A,那么在views.py中这样使用----> test_celery.delay(A)
启动Django的项目
启动celery celery -A Demo1 worker -l info -P eventlet(在pycharm的终端中启动)
会出现下面内容
(Django_env2) E:\Work\Django-test\Django2_1\Demo1>celery -A Demo1 worker -l info -P eventlet-------------- celery@DESKTOP-6FUDOED v4.2.1 (windowlicker)
---- **** -----
--- * *** * -- Windows-10-10.0.17134-SP0 2018-12-27 20:18:24
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: Demo1:0x19fa742d630
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. model.tasks.test_celery[2018-12-27 20:18:24,201: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-12-27 20:18:24,211: INFO/MainProcess] mingle: searching for neighbors
[2018-12-27 20:18:25,241: INFO/MainProcess] mingle: all alone
[2018-12-27 20:18:25,257: INFO/MainProcess] pidbox: Connected to amqp://guest:**@127.0.0.1:5672//.
[2018-12-27 20:18:25,260: WARNING/MainProcess] e:\work\django-test\django_env2\lib\site-packages\celery\fixups\django.py:200: UserWarning: Using setting
s.DEBUG leads to a memory leak, never use this setting in production environments!warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-12-27 20:18:25,261: INFO/MainProcess] celery@DESKTOP-6FUDOED ready.
当我们进行到views.py中的index函数中,刷新页面不会有任何延迟。
终端中
参考:https://www.jianshu.com/p/3de08b043467
待续...