当前位置: 代码迷 >> 综合 >> 14.flask+celery
  详细解决方案

14.flask+celery

热度:56   发布时间:2023-09-12 06:11:29.0

Flask+Celery

安装

  • 模块
pip install celery
pip install eventlet
pip install -U "celery[redis]"
pip install redis

在Windows操作系统上,还需要安装另外一个东西,eventlet

  • redis数据库
  • redis desktop manager

14.flask+celery

相关文档

celery文档

关系

14.flask+celery

  • task,任务
  • broker(中间人),存储任务的队列(借助redis实现)
  • worker:真正执行任务的工作者
  • backend:用来存储任务执行后的结果

配置

想要在flask中使用celery,可以配合redis数据库,redis配置如下:

app.conf.broker_url = 'redis://localhost:6379/0'

其中redis数据库链接格式为:

redis://:password@hostname:port/db_number

若没有密码,则为:

redis://hostname:port/db_number

多个redis链接

app.conf.broker_url = 'redis://localhost:6379/0;redis://localhost:6378/0'

URL 的所有配置都可以自定义配置的,默认使用的是 localhost 的 6379 端口中 0 数据库。( Redis 默认有 16 个数据库)

可见性超时

可见性超时为将消息重新下发给另外一个程序之前等待确认的任务秒数。请注意查看下面的。

可以通过 broker_transport_options 选项进行修改:

app.conf.broker_transport_options = {'visibility_timeout': 3600} # 一个小时

默认的可见性超时时间为1个小时。

结果

如果想保存任务执行返回结果保存到Redis,需要进行以下配置:

app.conf.result_backend = 'redis://localhost:6379/0'

简单测试

  • 用法
celery = Celery("tasks",broker="redis://118.24.128.18:6379/0",backend="redis://118.24.128.18:6379/0")
存储结果的位置:backend
  1. 第一个参数为当前模块的名称
  2. 第二个参数为中间人(Broker)的链接 URL
  3. 第三个参数backend为存储结果的位置
  • 创建task.py
from celery import Celery
import timecelery = Celery("tasks",broker="redis://localhost:6379/0",backend="redis://localhost:6379/0")@celery.task
def send_mail(a,b):time.sleep(5000)return a+b

可以看见send_mail()中设置等待5000秒,通常这个函数会运行很久,毫无体验感,但是为了获取体验感使用celery进行加速。

运行 Celery 职程(Worker)服务

现在可以使用 worker 参数进行执行刚刚创建职程(Worker):

celery -A task worker --loglevel=info

格式:

celery -A 文件名称 worker --loglevel=info

14.flask+celery

14.flask+celery

调用任务

需要调用我们创建的实例任务,可以通过 delay() 进行调用。

delay()apply_async() 的快捷方法,可以更好的控制任务的执行:

>>> from task import *
>>> r = add.delay(4,5)
>>> r
<AsyncResult: 1d616754-2447-43df-a1a4-1368ce01a86b>

如果出现以下报错:

ValueError: not enough values to unpack (expected 3, got 0)

解决

celery -A your_app_name worker --pool=solo -l info

其中mymodule为文件名称

ready() 可以检测是否已经处理完毕:

>>> r.ready()
True

整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:

>>> r.get()
9

14.flask+celery

>>> r.get(timeout=1)

如果运行出现如下错误:

14.flask+celery

如果任务出现异常,get() 会再次引发异常,可以通过 propagate 参数进行覆盖:

>>> r.get(propagate=False)

如果任务出现异常,可以通过以下命令进行回溯:

>>> r.traceback

flask-celery

flask celery文档

最简示例

通过上面的步骤,下面即是在 Flask 中使用 Celery 的最简示例:

from flask import Flask
from celery import Celeryapp = Flask(__name__)
app.config.update(CELERY_BROKER_URL='redis://localhost:6379',CELERY_RESULT_BACKEND='redis://localhost:6379'
)def make_celery(app):celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])celery.conf.update(app.config)TaskBase = celery.Taskclass ContextTask(TaskBase):abstract = Truedef __call__(self, *args, **kwargs):with app.app_context():return TaskBase.__call__(self, *args, **kwargs)celery.Task = ContextTaskreturn celerycelery = make_celery()@celery.task()
def add(a, b):return a + bif __name__ == "__main__":app.run()

在后台调用:

>>> from app import *
>>> result = add.delay(4,5)
>>> result.wait()

运行 Celery 职程

通过动手得知如上.wait() 永远不会实际地返回。这是因为需要运行 Celery。可以这样把 Celery 以职程运行:

celery -A your_app_name.celery worker --pool=solo --loglevel=info

实例:

celery -A app.celery worker --pool=solo --loglevel=info

而在linux中,这样使用

celery -A app.celery worker --pool=eventlet --loglevel=info

这次结果就有了。

>>> from app import *
>>> result = add.delay(4,5)
>>> result.wait()
9

常见用途

  • 邮件发送,由于可能在发送邮件时,由于网络等其他原因,导致发送时间过长,从而时用户体验感低,因此配合celery使用,就能够让等待时间降低,从而提高用户体验感。