Redis 本地安装并启动
- Redis 安装此处不做过多讲述;请参考:Redis 安装传送门
- 启动本地 Redis 服务
redis-server /usr/local/etc/redis.conf
- 可以查看 Redis 动态日志
# 切换到安装 Redis 时日志配置文件路径cd /Users/wawa/Logsls -l
# -rw-r--r-- 1 wawa staff 151232 Jan 17 15:37 redis_logs
tail -f redis_logs
创建 Celery 实例
将下面的代码保存为文件 tasks.py
:
import time
from celery import Celerybroker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379/0'app = Celery('my_task', broker=broker, backend=backend)@app.task
def add(x, y):time.sleep(5) # 模拟耗时操作return x + y
上面的代码做了几件事:
- 创建了一个 Celery 实例 app,名称为
my_task
(自己任意定义); - 指定消息中间件用 redis,URL 为
redis://127.0.0.1:6379
; - 指定存储用 redis,URL 为
redis://127.0.0.1:6379/0
; - 创建了一个 Celery 任务
add
,当函数被@app.task
装饰后,就成为可被 Celery 调度的任务;
启动 Celery Worker
在当前目录,使用如下方式启动 Celery Worker:
$ celery worker -A tasks --loglevel=info
其中:
- 参数
-A
指定了 Celery 实例的位置,本例是在tasks.py
中,Celery 会自动在该文件中寻找 Celery 对象实例,当然,我们也可以自己指定,在本例,使用-A tasks.app
; - 参数
--loglevel
指定了日志级别,默认为 warning,也可以使用-l info
来表示;
在生产环境中,我们通常会使用 Supervisor 来控制 Celery Worker 进程。(笔者暂时还没明白,后续学习)
启动成功后,控制台会显示如下输出:
调用任务
现在,我们可以在应用程序中使用 delay()
或 apply_async()
方法来调用任务。
在当前目录打开 Python 控制台
,输入以下代码:
>>> from tasks import add
>>> add.delay(100, 200)
<AsyncResult: 94a52b9f-9607-4f2a-a0dd-62004a34561a>
在上面,我们从 tasks.py
文件中导入了 add
任务对象,然后使用 delay()
方法将任务发送到消息中间件(Broker),Celery Worker 进程监控到该任务后,就会进行执行。我们将窗口切换到 Worker 的启动窗口,会看到多了两条日志:
[2020-01-17 16:00:21,135: INFO/MainProcess] Received task: tasks.add[94a52b9f-9607-4f2a-a0dd-62004a34561a]
[2020-01-17 16:00:26,153: INFO/ForkPoolWorker-2] Task tasks.add[94a52b9f-9607-4f2a-a0dd-62004a34561a] succeeded in 5.01387233499554s: 300
这说明任务已经被调度并执行成功。
另外,我们如果想获取执行后的结果,可以这样做:
>>> result = add.delay(111, 222)
>>> result.ready() # 使用 ready() 判断任务是否执行完毕
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get() # 使用 get() 获取任务结果
333
在上面,我们是在 Python 的环境中调用任务。事实上,我们通常在应用程序中调用任务。比如,将下面的代码保存为 client.py
:
print("程序执行开始".center(50, "="))
time_before = time.time()
print(f"执行前时间戳为:{time_before}")
# 异步任务
add.delay(2, 8)
time_after = time.time()
print(f"执行后时间戳为:{time_after}")
print("程序执行结束".center(50, "="))
运行命令 $ python client.py
,可以看到,虽然任务函数 add
需要等待 5 秒才返回执行结果,但由于它是一个异步任务,不会阻塞当前的主程序,因此主程序会往下执行 print 语句,打印出结果。