一、配置 dev.php
/*################ REDIS CONFIG ##################*/'REDIS' => ['host' => '127.0.0.1',//ip地址'port' => '6379',//端口'auth' => '123456',//密码'POOL_MAX_NUM' => '2','POOL_MIN_NUM' => '1','POOL_TIME_OUT' => '0.1',],
二、配置 EasySwooleEvent.php
本次使用了 Process与 redis 相结合,一定要有这两个配置
<?php
namespace EasySwoole\EasySwoole;use EasySwoole\EasySwoole\Crontab\Crontab;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;use EasySwoole\Socket\Dispatcher;
use App\WebSocket\WebSocketParser;
use App\WebSocket\WebSocketEvent;use EasySwoole\ORM\Db\Connection;
use EasySwoole\ORM\DbManager;
use Swoole\Coroutine\Scheduler;
use EasySwoole\Mysqli\QueryBuilder;use App\Process\TestProcess;class EasySwooleEvent implements Event
{public static function initialize(){date_default_timezone_set('Asia/Shanghai');$config = new \EasySwoole\ORM\Db\Config(Config::getInstance()->getConf('MYSQL'));$config->setMaxObjectNum(20);//配置连接池最大数量DbManager::getInstance()->addConnection(new Connection($config));//创建一个协程调度器$scheduler = new Scheduler();$scheduler->add(function () {$builder = new QueryBuilder();$builder->raw('select version()');DbManager::getInstance()->query($builder, true);//这边重置ORM连接池的pool,避免链接被克隆岛子进程,造成链接跨进程公用。//DbManager如果有注册多库链接,请记得一并getConnection($name)获取全部的pool去执行reset//其他的连接池请获取到对应的pool,然后执行reset()方法DbManager::getInstance()->getConnection()->getClientPool()->reset();});//执行调度器内注册的全部回调$scheduler->start();//清理调度器内可能注册的定时器,不要影响到swoole server 的event loop\Swoole\Timer::clearAll();}public static function mainServerCreate(EventRegister $register){$register->add($register::onWorkerStart,function (){//链接预热DbManager::getInstance()->getConnection()->getClientPool()->keepMin();});/*** **************** websocket控制器 ***********************/// 创建一个 Dispatcher 配置$conf = new \EasySwoole\Socket\Config();// 设置 Dispatcher 为 WebSocket 模式$conf->setType(\EasySwoole\Socket\Config::WEB_SOCKET);// 设置解析器对象$conf->setParser(new WebSocketParser());// 创建 Dispatcher 对象 并注入 config 对象$dispatch = new Dispatcher($conf);// 给server 注册相关事件 在 WebSocket 模式下 on message 事件必须注册 并且交给 Dispatcher 对象处理$register->set(EventRegister::onMessage, function (\swoole_websocket_server $server, \swoole_websocket_frame $frame) use ($dispatch) {$dispatch->dispatch($server, $frame->data, $frame);});// 注册服务事件$register->add(EventRegister::onOpen, [WebSocketEvent::class, 'onOpen']);$register->add(EventRegister::onClose, [WebSocketEvent::class, 'onClose']);/*** **************** redis ***********************/$config = new \EasySwoole\Pool\Config();$redisConfig = new \EasySwoole\Redis\Config\RedisConfig(Config::getInstance()->getConf('REDIS'));\EasySwoole\Pool\Manager::getInstance()->register(new \App\Pool\RedisPool($config,$redisConfig),'redis');/*** **************** Process 设置 ***********************/$processConfig = new \EasySwoole\Component\Process\Config();$processConfig->setProcessName('testProcess');//设置进程名称\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new TestProcess($processConfig));}public static function onRequest(Request $request, Response $response): bool{// TODO: Implement onRequest() method.return true;}public static function afterRequest(Request $request, Response $response): void{// TODO: Implement afterAction() method.}
}
三、新建 \App\Process\TestProcess.php 文件
<?php
namespace App\Process;
use EasySwoole\Component\Process\AbstractProcess;
use Swoole\Process;
use EasySwoole\EasySwoole\ServerManager;
use EasySwoole\Pool\Manager;
use EasySwoole\EasySwoole\Task\TaskManager;
use App\Models\User;
use EasySwoole\EasySwoole\Logger;class TestProcess extends AbstractProcess {protected function run($arg){//当进程启动后,会执行的回调// 每隔 10 秒执行一次//Logger::getInstance()->info('log level info');//记录info级别日志并输出到控制台\EasySwoole\Component\Timer::getInstance()->loop(10 * 1000, function () {$redis = Manager::getInstance()->get('redis')->getObj();$server = ServerManager::getInstance()->getSwooleServer();$res = $redis->exists('user_list');if(!empty($res)){$uid = $redis->lPop('user_list');$fd = $redis->get('uid_'.$uid);if($fd){//推送通知到用户$server->push($fd,'push in http at '. date('H:i:s'));}//添加异步操作 修改数据库数据状态$task = TaskManager::getInstance();$task->async(function () use($fd){$user = User::create()->get($fd);$user->update(['status' => 2]);echo "异步调用task1\n";});}//回收redisManager::getInstance()->get('redis')->recycleObj($redis);});echo 'Test Process start'."\n";}protected function onPipeReadable(Process $process){/** 该回调可选* 当有主进程对子进程发送消息的时候,会触发的回调,触发后,务必使用* $process->read()来读取消息*/}protected function onShutDown(){echo 'Test Process end'."\n";/** 该回调可选* 当该进程退出的时候,会执行该回调*/}protected function onException(\Throwable $throwable, ...$args){/** 该回调可选* 当该进程出现异常的时候,会执行该回调*/}}
四、运行试一下吧