原文链接:https://blog.csdn.net/dabao87/article/details/82414839
Linux上安装 think-queue ,请先进入到项目框架的根目录再运行
composer require topthink/think-queue';
安装成功后 项目进行配置
配置
配置目录:config/queue.php
<?php
use think\facade\Env;return ['connector' => 'Redis','expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称'host' => Env::get('REDIS_HOST'), // redis 主机ip'port' => 6379, // redis 端口'password' => Env::get('REDIS_PASSWORD'), // redis 密码'select' => 0, // 使用哪一个 db,默认为 db0'timeout' => 0, // redis连接的超时时间'persistent' => false, // 是否是长连接
];
消息的创建与推送
我们在业务控制器中创建一个新的消息,并推送到 helloJobQueue 队列
新增 \application\index\controller\JobTest.php 控制器,在该控制器中添加 actionWithHelloJob 方法
/
/*** [actionWithHelloJob 消息队列创建与推送]* @param [type] $jobHandlerClassName [当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法]* @param [type] $jobQueueName [当前任务归属的队列名称,如果为新队列,会自动创建]* @param [type] $jobData [.当前任务所需的业务数据 . 最终将转化为json形式的字符串]* @return [type] [description]*/public function actionWithHelloJob($jobHandlerClassName, $jobQueueName, $jobData ){
$isPushed = Queue::push( $jobHandlerClassName,$jobData,$jobQueueName ); //database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|falseif( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";}else{
echo 'Oops, something went wrong.';}}
注意: 在这个例子当中,我们是手动指定的 $jobHandlerClassName ,更合理的做法是先定义好消息名称与消费者类名的映射关系,然后由某个可以获取该映射关系的类来推送这个消息。这样,生产者只需要知道消息的名称,而无需指定哪个消费者类来处理。
除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName
);这种方式之外,还可以直接传入 Queue::push( $jobHandlerObject ,null , $jobQueueName
); 这时,需要在 $jobHandlerObject 中定义一个 handle()
方法,消息队列在执行到该任务时会自动反序列化该对象,并调用其 handle()方法。 该方式的缺点是无法传入自定义数据。
消息的消费与删除
编写 Hello 消费者类,用于处理 helloJobQueue 队列中的任务
新增 \application\job\job1.php 消费者类,并编写其 fire() 方法
<?phpnamespace app\job;use think\Controller;
use think\queue\Job;
use think\cache\driver\Redis;class Job1
{
/*** fire方法是消息队列默认调用的方法* @param Job $job 当前的任务对象* @param array|mixed $data 发布任务时自定义的数据*/public function fire(Job $job,$data){
// 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);if(!$isJobStillNeedToBeDone){
$job->delete();return true;}$isJobDone = $this->doHelloJob($data);if ($isJobDone) {
//如果任务执行成功, 记得删除任务$job->delete();print("<info>Hello Job has been done and deleted"."</info>\n");}else{
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");$job->delete();// 也可以重新发布这个任务//print("<info>Hello Job will be availabe again after 2s."."</info>\n");//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行}}}/*** 有些消息在到达消费者时,可能已经不再需要执行了* @param array|mixed $data 发布任务时自定义的数据* @return boolean 任务执行的结果*/private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;}/*** 根据消息中的数据进行实际的业务处理* @param array|mixed $data 发布任务时自定义的数据* @return boolean 任务执行的结果*/private function doHelloJob($data) {
// 根据消息中的数据进行实际的业务处理...if ($info['errcode'] == 0) {
print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");print("<info>Hello Job is Done!"."</info> \n");return true;}else{
return false;}}}
定时任务
首先我们写两个shell脚本
1.monitorHandleQueue.sh,作用是检查队列的进程是否在运行
pid=$(ps -ef| grep handleQueue |grep -v grep | awk ' NR==1 {print $2}')if [ -z $pid ]thensh /项目绝对路径/handleQueue.sh &>/dev/null 2>&1
fi
检查进程是否存在,如果不存在启动handleQueue.sh脚本,注意:monitorHandleQueue.sh脚本中的启动handleQueue.sh的路径写自己的,NR==1表示只取第一个进程,|grep -v grep 过滤掉自己的进程
handleQueue.sh 脚本
cd /项目绝对路径
while [ 2 > 0 ]dolen=`/usr/local/redis/bin/redis-cli -h 1.1.1.1 -p 6379 Llen queues:helloJobQueue`echo lenif [ $((len + 0 )) -gt 0 ];then/usr/local/php/bin/php think queue:work --queue helloJobQueueelsesleep 3/usr/local/php/bin/php think queue:work --queue helloJobQueue fi
done
解释一下handleQueue.sh脚本的逻辑:先切换到框架的根目录,while判断2大于0为真,所以会一直执行,连接到redis,获取队列的长度,if判断,如果队列的长度大于0直接执行队列,否则就停3秒再执行队列,很简单,写了很长时间,还有一点要注意,shell脚本最好不要在编辑器编辑,直接在Linux上编辑,因为如果在编辑器上编辑上传到Linux上会产生意想不到的问题(我在这里耽误了很长时间),找不到问题所在就直接在Linux上编写好了,省的麻烦
1.[ 2 > 0 ]格式为[空格判断表达式空格]
2.len=/usr/local/redis/bin/redis-cli -h 47.101.54.26 -p 6379 Llen queues:helloJobQueue
,等于号的左右两边不能有空格,反引号 `` 不知道怎么输出,在1是我左边的那个键
3. if 的格式,if [ $((len + 0 )) -gt 0 ];then 变量大于0,大于号用 -gt 表示
Linux发布定时任务
* * * * * /项目绝对路径/monitorHandleQueue.sh 2> /tmp/queue.log &
记录日志
/tmp/queue.log
当然也可以不写shell脚本
启动队列监听服务
Work 模式
php think queue:work \
--daemon //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
--queue helloJobQueue //要处理的队列的名称
--delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明
--memory 128 \ //该进程允许使用的内存上限,以 M 为单位
--sleep 3 \ //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
--tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
Listen 模式
php think queue:listen \
--queue helloJobQueue \ //监听的队列的名称
--delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--memory 128 \ //该进程允许使用的内存上限,以 M 为单位
--sleep 3 \ //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
--tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
--timeout 60 //创建的work子进程的允许执行的最长时间,以秒为单位