场景:
- 多台服务器
- 同一用户,可拥有多个客户端,消息可共享
- uid1 的连接fd 在 S1 服务器上
- uid2 的连接fd 在 S2 服务器上
- uid1 想要和 uid2 通讯,推送消息
问题:uid1 和 uid2 用户 连接不同的服务,fd 是相对于当前连接的服务的文件描述符,作用域当然也仅限于当前服务器,跨服务想实现通讯,肯定要借助中间件,来传递消息。
解决:
方案一、
1、谁发起,谁负责,无法承担,甩锅给队列,大家共同负责
详解:uid1 在服务 S1 上,想要推送给 uid2,先查看uid2 是否在S1 上,在直接推送就好了;不在,则把消息事件推送到Queue,
每个服务都订阅消息,消费消息,(消息事件触发)
关键:两个消费群组,消费一个topic,通过队列,实现触发动作
缺点:相当于遍历所有服务,增加服务压力
技术:MQ 不限(RabbitMQ、kafka等),也可以redis 实现
方案二:
1、网关负责,分发连接那个服务,然后指定推送
详解:uid1 客户端发起连接,hash 取值到相应服务;uid2 客户端发起连接,hash 取值到相应服务;
uid1 想要和 uid2 通讯,则对uid2 进行hash 取值,是否和自己在同一台服务器,在则直接推送,不在则直接推送到相应的服务器
比较:
- 相比方案一:服务压力相对较小,减轻不必要的查询(优)
- 相比方案一:设计更复杂 (缺)
方案三:
1、协议叠加,不通过MQ
方法同方案二类似,用户连接,redis记录所在服务器,不在自己的服务器,则通过HTTP 或者其他协议,推送到目标服务器
回调函数 onRequest,可以接收 HTTP协议
官方文档:https://wiki.swoole.com/wiki/page/397.html
深入讨论:
关键点:1、用户在哪台服务器,怎么记录?
2、跨服务消息怎么通讯?
问题一:用户在哪台服务器,怎么记录?上面采用了两种方式
1、连接之后redis 记录, uid1:server1; uid2:server2;uid3:server1;
2、通过hash 分配 (扩容之后,节点会发生变化,会影响部分节点的客户端连接)
问题二:跨服务消息怎么通讯?
1、通过消息队列中间件,进行信息的传递
1.1 :一种为广播:创建一个topic,其他服务都订阅这个topic,会增加没必要的遍历查询
1.2 :一种为精准投递,需要维护多个topic,服务节点消费者,只需订阅自己的专属的消费队列,
2、通过协议叠加,传递消息。因为webSocket 本身就是 HTTP的升级版(upgrade),可以接收来自HTTP的请求(见上方:方案三)
总结:相比较而言,个人认为方案三,最简单,部署更方便快捷。
方案三、Server 伪代码:
<?php
/*** Created by PhpStorm.* User: 奔跑吧笨笨* Date: 2019/8/8* Time: 12:04 PM*/include_once './class/UsersBind.php';//创建websocket服务器对象,监听0.0.0.0:9876端口
$ws = new swoole_websocket_server("0.0.0.0", 9876);$ws->set(['heartbeat_check_interval' => 30, //心跳检测 每隔多少秒,遍历一遍所有的连接'heartbeat_idle_time' => 65, //心跳检测 最大闲置时间,超时触发close并关闭 默认为heartbeat_check_interval的2倍,两倍是容错机制,多一点是网络延迟的弥补
]);//监听WebSocket连接打开事件
$ws->on('open', function ($ws, $request) {var_dump($request->fd, $request->get, $request->server);$ws->push($request->fd, "hello, welcome\n");//file_put_contents('./a.log','open:'.json_encode($request).PHP_EOL,FILE_APPEND);//一、建立连接 uid、fd 关系绑定(双向绑定)$bindObj = new UsersBind();$bindObj->setBindId(1,$request->fd);
});//监听WebSocket消息事件
$ws->on('message', function ($ws, $frame) {//二、连接关闭 通过uid,获取fd$bindObj = new UsersBind();$fd = $bindObj->getBindFd(2);if($fd) {// push 消息} else {//分布式解决方案//在该服务中 不存在 则把消息 推送到 队列中,其他服务订阅消息,收到消息,执行请求(检查是否在自己的服务器上,在则执行相应的推送)}echo "Message: {$frame->data}\n";//心跳 回复if($frame->data == 'ping') {$ws->push($frame->fd, 'pong');} else {$ws->push($frame->fd, "server: {$frame->data}");}
});//监听WebSocket连接关闭事件
$ws->on('close', function ($ws, $fd) {$close_str = "client-{$fd} is closed\n";echo $close_str;file_put_contents('./a.log',$close_str,FILE_APPEND);//三、连接关闭 uid、fd 关系解除$bindObj = new UsersBind();$bindObj->unbindFd(1,$fd);
});//允许 HTTP 请求
$ws->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {// 终止响应站点图标if ($request->server['request_uri'] == '/favicon.ico' || $request->server['path_info'] == '/favicon.ico') {$response->end();}global $ws;//调用外部的server// $server->connections 遍历所有websocket连接用户的fd,给所有用户推送foreach ($ws->connections as $fd) {// 需要先判断是否是正确的websocket连接,否则有可能会push失败if ($ws->isEstablished($fd)) {$ws->push($fd, $request->get['message']);}}file_put_contents('./a.log',json_encode($request).PHP_EOL,FILE_APPEND);$response->end('success');
});$ws->start();
番外篇:
一、针对方案二,如果选择精准投递消费者,需要维护多个Queue,会不会 代码入侵?
可以,采用服务管理来实现,动态配置。
二、针对方案二,如果扩容怎么办?
加入服务管理注册,发现;(简易实现)
服务注册,添加到redis 当中 {s1:consumers_s1,s2:consumers_s2}
服务发现,读取redis ,获取相应服务节点订阅的topic
三、hash 会不会比求模不平均
一致性hash 均衡算法,解决Hash倾斜性的问题
我为人人,人人为我,美美与共,天下大同。