当前位置: 代码迷 >> 综合 >> ThinkphpP5中 redis消息队列 结合think-queue 插件
  详细解决方案

ThinkphpP5中 redis消息队列 结合think-queue 插件

热度:91   发布时间:2023-11-18 11:31:46.0

原文链接:https://blog.csdn.net/dabao87/article/details/82414839

Linux上安装 think-queue ,请先进入到项目框架的根目录再运行

composer require topthink/think-queue';

安装成功后 项目进行配置

配置

配置目录:config/queue.php

<?php
use think\facade\Env;return ['connector' => 'Redis','expire'     => 60,				// 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default'    => 'default',		// 默认的队列名称'host'       => Env::get('REDIS_HOST'),	    // redis 主机ip'port'       => 6379,			// redis 端口'password'   => Env::get('REDIS_PASSWORD'),				// redis 密码'select'     => 0,				// 使用哪一个 db,默认为 db0'timeout'    => 0,				// redis连接的超时时间'persistent' => false,			// 是否是长连接
];

消息的创建与推送

我们在业务控制器中创建一个新的消息,并推送到 helloJobQueue 队列

新增 \application\index\controller\JobTest.php 控制器,在该控制器中添加 actionWithHelloJob 方法

/
/*** [actionWithHelloJob 消息队列创建与推送]* @param [type] $jobHandlerClassName [当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法]* @param [type] $jobQueueName [当前任务归属的队列名称,如果为新队列,会自动创建]* @param [type] $jobData [.当前任务所需的业务数据 . 最终将转化为json形式的字符串]* @return [type] [description]*/public function actionWithHelloJob($jobHandlerClassName, $jobQueueName, $jobData ){
    $isPushed = Queue::push( $jobHandlerClassName,$jobData,$jobQueueName ); //database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|falseif( $isPushed !== false ){
      echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";}else{
    echo 'Oops, something went wrong.';}}

注意: 在这个例子当中,我们是手动指定的 $jobHandlerClassName ,更合理的做法是先定义好消息名称与消费者类名的映射关系,然后由某个可以获取该映射关系的类来推送这个消息。这样,生产者只需要知道消息的名称,而无需指定哪个消费者类来处理。

除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName
);这种方式之外,还可以直接传入 Queue::push( $jobHandlerObject ,null , $jobQueueName
); 这时,需要在 $jobHandlerObject 中定义一个 handle()
方法,消息队列在执行到该任务时会自动反序列化该对象,并调用其 handle()方法。 该方式的缺点是无法传入自定义数据。

消息的消费与删除

编写 Hello 消费者类,用于处理 helloJobQueue 队列中的任务

新增 \application\job\job1.php 消费者类,并编写其 fire() 方法

<?phpnamespace app\job;use think\Controller;
use think\queue\Job;
use think\cache\driver\Redis;class Job1 
{
    /*** fire方法是消息队列默认调用的方法* @param Job $job 当前的任务对象* @param array|mixed $data 发布任务时自定义的数据*/public function fire(Job $job,$data){
    // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);if(!$isJobStillNeedToBeDone){
    $job->delete();return true;}$isJobDone = $this->doHelloJob($data);if ($isJobDone) {
    //如果任务执行成功, 记得删除任务$job->delete();print("<info>Hello Job has been done and deleted"."</info>\n");}else{
    if ($job->attempts() > 3) {
    //通过这个方法可以检查这个任务已经重试了几次了print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");$job->delete();// 也可以重新发布这个任务//print("<info>Hello Job will be availabe again after 2s."."</info>\n");//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行}}}/*** 有些消息在到达消费者时,可能已经不再需要执行了* @param array|mixed $data 发布任务时自定义的数据* @return boolean 任务执行的结果*/private function checkDatabaseToSeeIfJobNeedToBeDone($data){
    return true;}/*** 根据消息中的数据进行实际的业务处理* @param array|mixed $data 发布任务时自定义的数据* @return boolean 任务执行的结果*/private function doHelloJob($data) {
    // 根据消息中的数据进行实际的业务处理...if ($info['errcode'] == 0) {
    print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");print("<info>Hello Job is Done!"."</info> \n");return true;}else{
    return false;}}}

定时任务

首先我们写两个shell脚本

1.monitorHandleQueue.sh,作用是检查队列的进程是否在运行


pid=$(ps -ef| grep handleQueue |grep -v grep | awk ' NR==1 {print $2}')if [ -z $pid ]thensh /项目绝对路径/handleQueue.sh &>/dev/null 2>&1
fi

检查进程是否存在,如果不存在启动handleQueue.sh脚本,注意:monitorHandleQueue.sh脚本中的启动handleQueue.sh的路径写自己的,NR==1表示只取第一个进程,|grep -v grep 过滤掉自己的进程

handleQueue.sh 脚本

cd /项目绝对路径
while [ 2 > 0 ]dolen=`/usr/local/redis/bin/redis-cli -h 1.1.1.1 -p 6379 Llen queues:helloJobQueue`echo lenif [ $((len + 0 )) -gt 0 ];then/usr/local/php/bin/php think queue:work --queue  helloJobQueueelsesleep 3/usr/local/php/bin/php think queue:work --queue  helloJobQueue    fi
done

解释一下handleQueue.sh脚本的逻辑:先切换到框架的根目录,while判断2大于0为真,所以会一直执行,连接到redis,获取队列的长度,if判断,如果队列的长度大于0直接执行队列,否则就停3秒再执行队列,很简单,写了很长时间,还有一点要注意,shell脚本最好不要在编辑器编辑,直接在Linux上编辑,因为如果在编辑器上编辑上传到Linux上会产生意想不到的问题(我在这里耽误了很长时间),找不到问题所在就直接在Linux上编写好了,省的麻烦

1.[ 2 > 0 ]格式为[空格判断表达式空格]

2.len=/usr/local/redis/bin/redis-cli -h 47.101.54.26 -p 6379 Llen queues:helloJobQueue,等于号的左右两边不能有空格,反引号 `` 不知道怎么输出,在1是我左边的那个键
3. if 的格式,if [ $((len + 0 )) -gt 0 ];then 变量大于0,大于号用 -gt 表示

Linux发布定时任务

* * * * * /项目绝对路径/monitorHandleQueue.sh 2>  /tmp/queue.log & 

记录日志

/tmp/queue.log 

当然也可以不写shell脚本

启动队列监听服务

Work 模式

php think queue:work \
--daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
--queue  helloJobQueue  //要处理的队列的名称
--delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
--memory 128 \      //该进程允许使用的内存上限,以 M 为单位
--sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
--tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0

Listen 模式

php think queue:listen \
--queue  helloJobQueue \   //监听的队列的名称
--delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--memory 128 \       //该进程允许使用的内存上限,以 M 为单位
--sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
--tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
--timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位