seata是一款开源的分布式事务解决方案,这里主要讲解决方案其中的一种AT模式中客服端启动的时候是怎么向服务端注册RM资源管理器的源码分析。
标题要使用AT模式,必须向spring ioc注入DataSourceProxy
@Beanpublic DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){return new DataSourceProxy(druidDataSource);}
- 如果使用了seata-spring-boot-start.jar这个包,就不需要手动向spring ioc注入DataSourceProxy。这个包里面配置了spring boot的自动装配
在spring boot启动的时候就会扫描到这个类:
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);@Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)@ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})public SpringApplicationContextProvider springApplicationContextProvider() {return new SpringApplicationContextProvider();}@Bean(BEAN_NAME_FAILURE_HANDLER)@ConditionalOnMissingBean(FailureHandler.class)public FailureHandler failureHandler() {return new DefaultFailureHandlerImpl();}@Bean@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Automatically configure Seata");}return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);}@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());}
}
-
是不是很奇怪这里并没有配置DataSourceProxy?看一下最后一个方法中返回了一个SeataAutoDataSourceProxyCreator,这个对象是用来干嘛的呢,它继承了AbstractAutoProxyCreator,AbstractAutoProxyCreator是aop里面一个把目标对象转换成代理对象的一个后置处理器。在spring中,只要把后置处理器的bean定义给到ioc容器,BeanFactory就调用后置处理器的各种方法参与到bean的生命周期的各个步骤中。
-
来看一下SeataAutoDataSourceProxyCreator,它的shouldSkip是说这个后置处理器只会对DataSource对象生成其代理对象,它用到的横切关注点逻辑SeataAutoDataSourceProxyAdvice。
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);private final String[] excludes;private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {this.excludes = excludes;setProxyTargetClass(!useJdkProxy);}@Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {if (LOGGER.isInfoEnabled()) {LOGGER.info("Auto proxy of [{}]", beanName);}return new Object[]{advisor};}// 这个方法里面确定对那些Bean不起作用,可以看到非DataSource的都会不起作用@Overrideprotected boolean shouldSkip(Class<?> beanClass, String beanName) {return SeataProxy.class.isAssignableFrom(beanClass) ||!DataSource.class.isAssignableFrom(beanClass) ||Arrays.asList(excludes).contains(beanClass.getName());}
}
- SeataAutoDataSourceProxyAdvice里面的invoke方法,一旦调用DataSource的方法,就会把它替换成DataSourceProxy对象。
@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());Method method = invocation.getMethod();Object[] args = invocation.getArguments();Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());if (m != null) {return m.invoke(dataSourceProxy, args);} else {return invocation.proceed();}}
DataSourceProxy初始化的时候向server注册RM资源管理器
- DataSourceProxy构造函数中调用了init方法
public DataSourceProxy(DataSource targetDataSource) {this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);}public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {super(targetDataSource);init(targetDataSource, resourceGroupId);}
- init方法里面调用了DefaultResourceManager的registerResource
private void init(DataSource dataSource, String resourceGroupId) {this.resourceGroupId = resourceGroupId;try (Connection connection = dataSource.getConnection()) {jdbcUrl = connection.getMetaData().getURL();dbType = JdbcUtils.getDbType(jdbcUrl);if (JdbcConstants.ORACLE.equals(dbType)) {userName = connection.getMetaData().getUserName();}} catch (SQLException e) {throw new IllegalStateException("can not init dataSource", e);}DefaultResourceManager.get().registerResource(this);if (ENABLE_TABLE_META_CHECKER_ENABLE) {tableMetaExcutor.scheduleAtFixedRate(() -> {try (Connection connection = dataSource.getConnection()) {TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId());} catch (Exception ignore) {}}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);}}
- DefaultResourceManager的registerResource方法,首先根据resource的branchType选择一个ResourceManager,resource是DataSourceProxy,它的branchType是BranchType.AT,BranchType.AT对应的ResourceManager是DataSourceManager。
@Override
public void registerResource(Resource resource) {getResourceManager(resource.getBranchType()).registerResource(resource);
}
- DataSourceManager的registerResource方法,最终调用了父类的registerResource,父类就是AbstractResourceManager。
@Overridepublic void registerResource(Resource resource) {DataSourceProxy dataSourceProxy = (DataSourceProxy)resource;dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);super.registerResource(dataSourceProxy);}
- AbstractResourceManager的registerResource方法,调用RmRpcClient中的registerResource方法去了。
@Overridepublic void registerResource(Resource resource) {RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());}
- RmRpcClient的registerResource方法,配置的seata server可能是单机或者集群,集群的话需要向每个sever都注册一下。
public void registerResource(String resourceGroupId, String resourceId) {if (getClientChannelManager().getChannels().isEmpty()) {getClientChannelManager().reconnect(transactionServiceGroup);return;}synchronized (getClientChannelManager().getChannels()) {for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {String serverAddress = entry.getKey();Channel rmChannel = entry.getValue();if (LOGGER.isInfoEnabled()) {LOGGER.info("will register resourceId:{}", resourceId);}sendRegisterMessage(serverAddress, rmChannel, resourceId);}}}
- sendRegisterMessage里面生成了RegisterRMRequest对象,然后把RegisterRMRequest对象传给sendAsyncRequestWithoutResponse方法。
public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);message.setResourceIds(resourceId);try {super.sendAsyncRequestWithoutResponse(channel, message);} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {getClientChannelManager().releaseChannel(channel, serverAddress);if (LOGGER.isInfoEnabled()) {LOGGER.info("remove not writable channel:{}", channel);}} else {LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);}} catch (TimeoutException e) {LOGGER.error(e.getMessage());}}
- sendAsyncRequestWithoutResponse里面调用了sendAsyncRequest
protected Object sendAsyncRequestWithoutResponse(Channel channel, Object msg) throwsTimeoutException {return sendAsyncRequest(null, channel, msg, 0);}
- sendAsyncRequest方法里面,第一步生成了RpcMessage 和MessageFuture 对象;第二步真正调用server;第三步,等待server返回结果。重要的是第二步,又分有没有开启多线程去处理发送消息,如果有,就把RpcMessage 直接放到阻塞队列里面,等待线程处理,没有的话直接调用sendSingleRequest方法。
private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)throws TimeoutException {if (channel == null) {LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");return null;}final RpcMessage rpcMessage = new RpcMessage();rpcMessage.setId(getNextMessageId());rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);rpcMessage.setBody(msg);final MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeout);futures.put(rpcMessage.getId(), messageFuture);if (address != null) {/*The batch send.Object From big to small: RpcMessage -> MergedWarpMessage -> AbstractMessage@see AbstractRpcRemotingClient.MergedSendRunnable*/if (NettyClientConfig.isEnableClientBatchSendRequest()) {ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;BlockingQueue<RpcMessage> basket = map.get(address);if (basket == null) {map.putIfAbsent(address, new LinkedBlockingQueue<>());basket = map.get(address);}basket.offer(rpcMessage);if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());}if (!isSending) {synchronized (mergeLock) {mergeLock.notifyAll();}}} else {// the single send.sendSingleRequest(channel, msg, rpcMessage);if (LOGGER.isDebugEnabled()) {LOGGER.debug("send this msg[{}] by single send.", msg);}}} else {sendSingleRequest(channel, msg, rpcMessage);}if (timeout > 0) {try {return messageFuture.get(timeout, TimeUnit.MILLISECONDS);} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), address, msg);if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}} else {return null;}}
- sendSingleRequest,这里真正调用了 channel.writeAndFlush把数据发送出去。
private void sendSingleRequest(Channel channel, Object msg, RpcMessage rpcMessage) {ChannelFuture future;channelWritableCheck(channel, msg);future = channel.writeAndFlush(rpcMessage);future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {if (!future.isSuccess()) {MessageFuture messageFuture = futures.remove(rpcMessage.getId());if (messageFuture != null) {messageFuture.setResultMessage(future.cause());}destroyChannel(future.channel());}}});}