当前位置: 代码迷 >> 综合 >> Django==2.0.2 Celery==4.4.2 Redis==3.0.504
  详细解决方案

Django==2.0.2 Celery==4.4.2 Redis==3.0.504

热度:45   发布时间:2024-02-19 16:24:04.0

配置环境

windows10 Django2.0.2 Celery4.4.2 Redis==3.0.504 eventlet(windows环境下安装,linux环境下不需要,版本随意)

settings 同级目录下创建 celery.py

# -*- coding:utf-8 -*-from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings# 指定Django默认配置文件模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xrtProject.settings')# 为我们的项目myproject创建一个Celery实例。这里不指定broker backend 容易出现错误。
# 如果有密码 使用 'redis://:password@127.0.0.1:6379/0'app = Celery('xrt', broker='redis://localhost:6379/10',backend='redis://localhost:6379/9')
# 这里指定从django的settings.py里读取celery配置# app.config_from_object('django.conf:settings')from kombu import serialization
serialization.registry._decoders.pop("application/x-python-serialize")# celery 的配置项目,有其他的要求,可以按需求添加
app.conf.update(CELERY_ACCEPT_CONTENT = ['json'],CELERY_TASK_SERIALIZER = 'json',CELERY_RESULT_SERIALIZER = 'json',
)# 自动从所有已注册的django app中加载任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

应用app 中创建 tasks.py

# -*- coding:utf-8 -*-
from __future__ import absolute_import
import json
from celery import shared_task
from django_redis import get_redis_connectionfrom src.remote.models import OperationLog
from tools.add_message import SendMessageconn = get_redis_connection('default')# # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
# 定义的两个异步函数@shared_task(name="save_message", ignore_result=True)
def save_message(phone, recipient, remark, content, sup_links, links, name, remind):"""存储全局消息信息"""print("异步任务开始---------------》")send_ = SendMessage()send_.send_duan_xin(phone, content)send_.send_web(content, recipient, sup_links, links, remark, name, remind)openid = ''send_.send_we_chat(openid, content)print("异步任务完成---------------》")return "save_message"@shared_task(name='save_log', ignore_result=True)
def save_log(create_id, create_user, mod_f, mod_s, order, detail, feed_b, created_time):print(create_id, create_user, mod_f, mod_s, order, detail, feed_b, created_time)dic = {"create_id": create_id,"create_user": create_user,"mod_f": mod_f,"mod_s": mod_s,"order": order,"detail": detail,"feed_b": feed_b,"created_time": created_time,}OperationLog.objects.create(**dic)print("异步任务完成---------------》")return "save_message"

在views.py中进行调用

class Log(MyBaseView):"""日志管理"""model = OperationLogdef post(self, request):phone = "138****0342"recipient = "1111111111111111111111"remark = ''content = "您收到一条*********"sup_links = "立即查看"links = 'https://www.baidu.com/{}'.format("111111111")name = '申请'remind = ['短信', '小铃铛', '公众号']tasks.save_message.apply_async((phone, recipient, remark, content, sup_links, links, name, remind))  # 异步调用发送消息return JsonResponse({'code': 200, 'total': total, 'data': rows,  'currentPage': page_number},json_dumps_params={'default': mydefault})

启动

启动Django项目
python manage.py runserver
启动Celery
异步任务启动windows 10 环境下:(pip install eventlet)>celery -A devices_back worker -l info -P eventletlinux 下:>celery - A devices_back  worker - l info