当前位置: 代码迷 >> 综合 >> Celery 快速入门(二)
  详细解决方案

Celery 快速入门(二)

热度:12   发布时间:2024-01-25 12:56:57.0

Redis 本地安装并启动

  1. Redis 安装此处不做过多讲述;请参考:Redis 安装传送门
  2. 启动本地 Redis 服务
redis-server /usr/local/etc/redis.conf
  1. 可以查看 Redis 动态日志
# 切换到安装 Redis 时日志配置文件路径cd /Users/wawa/Logsls -l
# -rw-r--r--  1 wawa  staff  151232 Jan 17 15:37 redis_logs
tail -f redis_logs

Redis 动态日志

创建 Celery 实例

将下面的代码保存为文件 tasks.py

import time
from celery import Celerybroker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379/0'app = Celery('my_task', broker=broker, backend=backend)@app.task
def add(x, y):time.sleep(5)     # 模拟耗时操作return x + y

上面的代码做了几件事:

  • 创建了一个 Celery 实例 app,名称为 my_task自己任意定义);
  • 指定消息中间件用 redis,URL 为 redis://127.0.0.1:6379
  • 指定存储用 redis,URL 为 redis://127.0.0.1:6379/0
  • 创建了一个 Celery 任务 add,当函数被 @app.task 装饰后,就成为可被 Celery 调度的任务;

启动 Celery Worker

在当前目录,使用如下方式启动 Celery Worker:

$ celery worker -A tasks --loglevel=info

其中:

  • 参数 -A 指定了 Celery 实例的位置,本例是在 tasks.py 中,Celery 会自动在该文件中寻找 Celery 对象实例,当然,我们也可以自己指定,在本例,使用 -A tasks.app
  • 参数 --loglevel 指定了日志级别,默认为 warning,也可以使用 -l info 来表示;

在生产环境中,我们通常会使用 Supervisor 来控制 Celery Worker 进程。(笔者暂时还没明白,后续学习)
启动成功后,控制台会显示如下输出:
在这里插入图片描述

调用任务

现在,我们可以在应用程序中使用 delay()apply_async() 方法来调用任务。
当前目录打开 Python 控制台,输入以下代码:

>>> from tasks import add
>>> add.delay(100, 200)
<AsyncResult: 94a52b9f-9607-4f2a-a0dd-62004a34561a>

在上面,我们从 tasks.py 文件中导入了 add 任务对象,然后使用 delay() 方法将任务发送到消息中间件(Broker),Celery Worker 进程监控到该任务后,就会进行执行。我们将窗口切换到 Worker 的启动窗口,会看到多了两条日志:

[2020-01-17 16:00:21,135: INFO/MainProcess] Received task: tasks.add[94a52b9f-9607-4f2a-a0dd-62004a34561a]  
[2020-01-17 16:00:26,153: INFO/ForkPoolWorker-2] Task tasks.add[94a52b9f-9607-4f2a-a0dd-62004a34561a] succeeded in 5.01387233499554s: 300

这说明任务已经被调度并执行成功。
另外,我们如果想获取执行后的结果,可以这样做:

>>> result = add.delay(111, 222)
>>> result.ready()     # 使用 ready() 判断任务是否执行完毕
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()     # 使用 get() 获取任务结果
333

在上面,我们是在 Python 的环境中调用任务。事实上,我们通常在应用程序中调用任务。比如,将下面的代码保存为 client.py:

print("程序执行开始".center(50, "="))
time_before = time.time()
print(f"执行前时间戳为:{time_before}")
# 异步任务
add.delay(2, 8)
time_after = time.time()
print(f"执行后时间戳为:{time_after}")
print("程序执行结束".center(50, "="))

运行命令 $ python client.py,可以看到,虽然任务函数 add 需要等待 5 秒才返回执行结果,但由于它是一个异步任务,不会阻塞当前的主程序,因此主程序会往下执行 print 语句,打印出结果。