作者
胡盛生
推荐理由
通过seata @GlobalTranscation注解为入口,通过源码解读分析seata实现分布式事务的原理
seata简介
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
@GlobalTranscation注解实现
基于Spring的AOP和Springboot的自动装配实现
自动装配类SeataAutoConfiguration
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({
SeataProperties.class})
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);@Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)@ConditionalOnMissingBean(name = {
BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})public SpringApplicationContextProvider springApplicationContextProvider() {
return new SpringApplicationContextProvider();}@Bean@DependsOn({
BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");}return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup());}@Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enableAutoDataSourceProxy", havingValue = "true", matchIfMissing = true)@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
return new SeataDataSourceBeanPostProcessor(seataProperties.isUseJdkProxy());}
}
GlobalTranscationScanner
afterPropertiesSet()
实现了InitializingBean接口,在bean初始化的时候,初始化了TMClient和RMClient。
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");}return;}initClient();}
private void initClient() {
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));}//init TM(初始化事务管理器Client->和seata server交互)TMClient.init(applicationId, txServiceGroup);//init RM(初始化资源管理器Client->和seata server交互)RMClient.init(applicationId, txServiceGroup);registerSpringShutdownHook();}
wrapIfNecessary(Object bean, String beanName, Object cacheKey)
继承了AbstractAutoProxyCreator类(AOP的顶层抽象父类),创建了GlobalTransactionalInterceptor拦截器来拦截处理含有@GlobalTranscation注解的方法
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (disableGlobalTransaction) {
return bean;}try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);// 校验bean的方法上是否含有@GlobalTranscation注解// 不存在则表示该bean不需要被代理if (!existsAnnotation(new Class[]{
serviceInterface})&& !existsAnnotation(interfacesIfJdk)) {
return bean;}if (interceptor == null) {
// 创建GlobalTransactional拦截器// 拦截被@GlobalTranscation注解注释的方法interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener) interceptor);}}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());// 生成代理对象if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {
throw new RuntimeException(exx);}
}
GlobalTransactionalInterceptor
invoke(final MethodInvocation methodInvocation)
调用方法时,含有@GlobalTranscation注解的方法会被拦截,执行invoke方法
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()): null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);// 获取方法上的@GlobalTranscation注解类final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);if (!disable && globalTransactionalAnnotation != null) {
// 解析@GlobalTranscation注解中的属性,并进行全局事务处理return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);} else if (!disable && globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);} else {
return methodInvocation.proceed();}
}
handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno)
通过调用事务模板类transcationTemplate的execute方法来进行全局事务控制,传参通过解析@GlobalTranscation的属性来设置回滚规则
private Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Overridepublic Object execute() throws Throwable {
return methodInvocation.proceed();}// 获取name属性(全局事务名称)public String name() {
String name = globalTrxAnno.name();if (!StringUtils.isNullOrEmpty(name)) {
return name;}return formatMethod(methodInvocation.getMethod());}@Overridepublic TransactionInfo getTransactionInfo() {
TransactionInfo transactionInfo = new TransactionInfo();// 获取全局事务超时时间transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());transactionInfo.setName(name());// 设置异常回滚规则(Exception)// 使用LinkedHashSet集合,保证顺序性和唯一性Set<RollbackRule> rollbackRules = new LinkedHashSet<>();for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();switch (code) {
case RollbackDone:throw e.getOriginalException();case BeginFailure:failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure:failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());throw e.getCause();default:throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));}}
}
transcationTemplate.execute(TransactionalExecutor business)
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 获取或创建全局事务// getCurrent底层通过ThreadLocal实现GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();// 1.1 获取入参中的全局事务属性类TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");}try {
// 2. 开启事务beginTransaction(txInfo, tx);Object rs = null;try {
// 业务逻辑处理rs = business.execute();} catch (Throwable ex) {
// 3.根据事务属性类中的异常回滚规则进行异常处理completeTransactionAfterThrowing(txInfo,tx,ex);throw ex;}// 4. 提交事务commitTransaction(tx);return rs;} finally {
//5. cleartriggerAfterCompletion();cleanUp();}
}
beginTranscation 开启事务
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();// 事务开启tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}
}@Override
public void begin(int timeout, String name) throws TransactionException {
// 角色校验,只有TM才能开启全局事务if (role != GlobalTransactionRole.Launcher) {
// 非TM角色,全局事务id xid为空则抛出异常,该角色的事务不应受到全局事务管控check();if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}// 全局事务发起,此时xid应为null,不为null则异常if (xid != null) {
throw new IllegalStateException();}if (RootContext.getXID() != null) {
throw new IllegalStateException();}// 通过TMClient向seata server发起请求// server向数据库的global_table表中插入一条全局事务数据// 并通过后台定时任务来判断事务是否超时xid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;// 绑定全局事务xidRootContext.bind(xid);if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);}
}
completeTransactionAfterThrowing 事务异常处理private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {
// 判断异常类型是否属于需要回滚的异常Set集合之一if (txInfo != null && txInfo.rollbackOn(ex)) {
try {
// 回滚rollbackTransaction(tx, ex);} catch (TransactionException txe) {
// 回滚失败,抛出异常throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, ex);}} else {
// 不需要回滚的异常,直接提交事务commitTransaction(tx);}
}private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();tx.rollback();triggerAfterRollback();// 3.1 Successfully rolled backthrow new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
}@Override
public void rollback() throws TransactionException {
// 角色判断,事务加入者,直接returnif (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of rollbackif (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);}return;}// xid为空则抛出异常if (xid == null) {
throw new IllegalStateException();}// 回滚重试次数int retry = ROLLBACK_RETRY_COUNT;try {
while (retry > 0) {
try {
// TMClient发送回滚请求到server端status = transactionManager.rollback(xid);break;} catch (Throwable ex) {
LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {
throw new TransactionException("Failed to report global rollback", ex);}}}} finally {
// 无论回滚是否成功,解除xid的绑定关系,本次全局事务结束if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
RootContext.unbind();}}if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] rollback status: {}", xid, status);}
}
commitTransaction事务提交private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();tx.commit();triggerAfterCommit();} catch (TransactionException txe) {
// 4.1 Failed to committhrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);}
}public void commit() throws TransactionException {
// 非TM角色不允许提交全局事务if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}// xid为null则抛出异常if (xid == null) {
throw new IllegalStateException();}int retry = COMMIT_RETRY_COUNT;try {
while (retry > 0) {
try {
// TMClient发送事务提交请求到server端status = transactionManager.commit(xid);break;} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);}}}} finally {
// 无论事务是否提交成功,本次全局事务结束,解除xid绑定if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
RootContext.unbind();}}if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);}}