当前位置: 代码迷 >> 综合 >> Soul 网关源码阅读之 Websocket 同步机制
  详细解决方案

Soul 网关源码阅读之 Websocket 同步机制

热度:54   发布时间:2023-12-15 21:34:03.0

这篇文章开始,将介绍 Soul 网关的同步机制。
根据网关的角色定位,用户会在运行期间不断的修改配置比如流量策略,或者防火墙等等。这时,就需要实时的同步机制,将 admin 拿到的配置信息同步给网关的内存。
Soul 提供了四种同步机制,分别为 websocket,zookeeper,http长轮询以及nacos。在接下来的几篇文章中,我们将从 admin 模块入手,依次介绍几种同步机制。

通过 Websocket 发送数据

ApplicationEventPublisher 是 Spring 的一个组件,配合 ApplicationListener 使用能够快速的使程序具有发布订阅的功能,soul 网关在 admin 模块使用这个组件在每次用户改动配置信息时,进行事件发布。
具体实例来说,当我们修改了 divide 模块的一个选择器规则时,在 org.dromara.soul.admin.service.impl.RuleServiceImpl 中有一个 publishEvent 方法将被调用:

	private void publishEvent(final RuleDO ruleDO, final List<RuleConditionDTO> ruleConditions) {
    SelectorDO selectorDO = selectorMapper.selectById(ruleDO.getSelectorId());PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());List<ConditionData> conditionDataList =ruleConditions.stream().map(ConditionTransfer.INSTANCE::mapToRuleDTO).collect(Collectors.toList());// publish change event.eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));}

其中,eventPublisher.publishEvent 即发布了一个事件。
接下来,soul 设计了 DataChangedEventDispatcher 类,它实现了 ApplicationListener 接口,每当有 org.dromara.soul.admin.listener.DataChangedEvent 类型的事件发布,都会调用 onApplicationEvent 方法:

	public void onApplicationEvent(final DataChangedEvent event) {
    for (DataChangedListener listener : listeners) {
    switch (event.getGroupKey()) {
    case APP_AUTH:listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());break;case PLUGIN:listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());break;case RULE:listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());break;case SELECTOR:listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());break;case META_DATA:listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());break;default:throw new IllegalStateException("Unexpected value: " + event.getGroupKey());}}}

这个方法将会根据具体的事件类型调用不同的 DataChangedListener 方法,所有的 listener 都会根据配置在程序启动时初始化好。
比如我们上述的例子,将会调用 org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListener 的 onRuleChanged 方法:

	public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
    WebsocketData<RuleData> configData =new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);}

最终,改动将使用 WebsocketCollector.send 方法发送出去,WebsocketCollector 维护了 Session 实例,使用 Websocket 协议发送数据。

接收改动

在 soul-sync-data-center 模块下的 soul-sync-data-websocket 子模块中维护了 Websocket 协议的接收代码。
org.dromara.soul.plugin.sync.data.weboscket.client.SoulWebsocketClient 类继承了 WebSocketClient,其中后者是第三方库 java-websocket,封装了一些诸如心跳的实现细节。
handleResult 将会在每次收到 admin 发来的 websocket 协议数据时被调用:

	private void handleResult(final String result) {
    // 拆解数据WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());String eventType = websocketData.getEventType();String json = GsonUtils.getInstance().toJson(websocketData.getData());// 后续的数据操作websocketDataHandler.executor(groupEnum, json, eventType);}

WebsocketDataHandler 维护了所有种类消息处理器的实例:

	public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {
    ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));}

每一个 Handler 都继承了 AbstractDataHandler,AbstractDataHandler 类将会在 handle 方法中根据数据中携带的事件类型进行方法的分发:

	public void handle(final String json, final String eventType) {
    List<T> dataList = convert(json);if (CollectionUtils.isNotEmpty(dataList)) {
    DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {
    case REFRESH:case MYSELF:doRefresh(dataList);break;case UPDATE:case CREATE:doUpdate(dataList);break;case DELETE:doDelete(dataList);break;default:break;}}} 

还是使用上面提到的例子,最终将会调用 RuleDataHandler 类的 doUpdate 方法并最终落到 Soul 网关的缓存组件中。

  相关解决方案