一、Seata
处理分布式事务
1. Seata
简介
- Seata
- Seata 是一款开源的 分布式事务 解决方案,致力于在微服务架构下,提供 高性能 和 简单易用 的 分布式事务服务。
- 2019 年 1 月份蚂蚁金服 和 阿里巴巴共同开源的分布式事务解决方案。
2. 能干嘛
2.1 分布式事务处理过程(1
ID + 3
组件模型)
- Transaction ID(XID 全局唯一的事务 ID)。
- Transaction Coordinator(TC)。
事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的 提交 或 回滚。- Transaction Manager(TM)。
事务管理器,控制全局事务的边界,负责开启一个全局事务,并最终发起 全局提交 或 全局回滚 的决议。- Resource Manager(RM)。
资源管理器,控制分支事务,负责分支注册、状态汇报,并接收 事务协调器 的指令,驱动分支(本地)事务的 提交 和 回滚。
- TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
- XID 在微服务调用链路的上下文中传播。
- RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
- TM 向 TC 发起针对 XID 的全局 提交 或 回滚 决议。
- TC 调度 XID 下管辖的全部分支事务,完成 提交 或 回滚 请求。
3. Seata-Server
安装
3.1 下载
seata-server-0.9.0.zip
3.2 修改 file.conf
3.3 创建 store
库
seata-server-0.9.0\seata\conf\db_store.sql
。
3.4 修改 registry.conf
- 指明注册中心为 Nacos,及修改 Nacos 连接信息。
3.5 启动 seata-server.bat
4. 订单/库存/账户 业务数据库准备
- 创建回滚日志表
seata-server-0.9.0\seata\conf\db_undo_log.sql
4.1 订单库
# 创建数据库
CREATE DATABASE seata_order;
# 创建数据表
CREATE TABLE t_order (`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',`count` INT(11) DEFAULT NULL COMMENT '数量',`money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',`status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结'
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;SELECT * FROM t_order;
4.2 库存库
# 创建数据库
CREATE DATABASE seata_storage;
# 创建数据表
CREATE TABLE t_storage (`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',`total` INT(11) DEFAULT NULL COMMENT '总库存',`used` INT(11) DEFAULT NULL COMMENT '已用库存',`residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`)
VALUES ('1', '1', '100', '0', '100');SELECT * FROM t_storage;
4.3 账户库
# 创建数据库
CREATE DATABASE seata_account;
# 创建数据表
CREATE TABLE t_account (`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',`total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',`used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',`residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;INSERT INTO seata_account.t_account(`id`, `user_id`, `total`, `used`, `residue`)
VALUES ('1', '1', '1000', '0', '1000');SELECT * FROM t_account;
5. 订单/库存/账户 业务微服务准备
5.1 新建 Module seata-order-service2001
5.2 POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>cloud-2020</artifactId><groupId>com.qs.springcloud</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>seata-order-service2001</artifactId><dependencies><!--nacos-discovery--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!--seata--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><artifactId>seata-all</artifactId><groupId>io.seata</groupId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId><version>0.9.0</version></dependency><!--openfeign--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><!--web + actuator--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--mysql-druid--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.37</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.10</version></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.0.0</version></dependency><!--自定义API通用包--><dependency><groupId>com.qs.springcloud</groupId><artifactId>cloud-api-commons</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies></project>
5.3 YML
server:port: 2001spring:application:name: seata-order-servicecloud:alibaba:seata:# 自定义事务组名称需要与seata-server中的对应tx-service-group: qs_tx_groupnacos:discovery:server-addr: localhost:8848datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://192.168.137.155:3306/seata_orderusername: rootpassword: 123456feign:hystrix:enabled: falselogging:level:io:seata: infomybatis:mapperLocations: classpath:mapper/*.xml
5.4 主启动
@EnableDiscoveryClient
@EnableFeignClients
// 取消数据源的自动创建
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class OrderMain2001 {
public static void main(String[] args) {
SpringApplication.run(OrderMain2001.class, args);}
}
5.5 MyBatisConfig
@Configuration
@MapperScan({
"com.qs.springcloud.dao"})
public class MyBatisConfig {
}
5.6 DataSourceProxyConfig
使用 Seata
对数据源进行代理
@Configuration
public class DataSourceProxyConfig {
@Value("${mybatis.mapperLocations}")private String mapperLocations;@Bean@ConfigurationProperties(prefix = "spring.datasource")public DataSource druidDataSource() {
return new DruidDataSource();}@Beanpublic DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);}@Beanpublic SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();sqlSessionFactoryBean.setDataSource(dataSourceProxy);sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());return sqlSessionFactoryBean.getObject();}
}
5.7 Order
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Long id;private Long userId;private Long productId;private Integer count;private BigDecimal money;/*** 订单状态:0:创建中;1:已完结*/private Integer status;
}
5.8 OrderDao
@Mapper
public interface OrderDao {
/*** 创建订单*/void create(Order order);/*** 修改订单金额*/void update(@Param("userId") Long userId, @Param("status") Integer status);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.qs.springcloud.dao.OrderDao"><resultMap id="BaseResultMap" type="com.qs.springcloud.domain.Order"><id column="id" property="id" jdbcType="BIGINT"/><result column="user_id" property="userId" jdbcType="BIGINT"/><result column="product_id" property="productId" jdbcType="BIGINT"/><result column="count" property="count" jdbcType="INTEGER"/><result column="money" property="money" jdbcType="DECIMAL"/><result column="status" property="status" jdbcType="INTEGER"/></resultMap><insert id="create">INSERT INTO `t_order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`)VALUES (NULL, #{userId}, #{productId}, #{count}, #{money}, 0)</insert><update id="update">UPDATE `t_order`SET status = 1WHERE user_id = #{userId} AND status = #{status}</update></mapper>
5.9 IStorageService
@FeignClient(value = "seata-storage-service")
public interface IStorageService {
/*** 扣减库存*/@PostMapping(value = "/storage/decrease")CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
5.10 IAccountService
@FeignClient(value = "seata-account-service")
public interface IAccountService {
/*** 扣减账户余额*/
// @RequestMapping(value = "/account/decrease", method = RequestMethod.POST, produces = "application/json; charset=UTF-8")@PostMapping("/account/decrease")CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
5.11 IOrderService
public interface IOrderService {
/*** 创建订单*/void create(Order order);
}
5.12 OrderServiceImpl
@GlobalTransactional
。
@Service
@Slf4j
public class OrderServiceImpl implements IOrderService {
@Resourceprivate OrderDao orderDao;@Resourceprivate IStorageService storageService;@Resourceprivate IAccountService accountService;/*** 创建订单 -> 调用库存服务扣减库存 -> 调用账户服务扣减账户余额 -> 修改订单状态* 下订单 -> 减库存 -> 减余额 -> 改状态*/@Override// 所有异常都回滚@GlobalTransactional(name = "qs-create-order", rollbackFor = Exception.class)public void create(Order order) {
log.info("下订单 Start");orderDao.create(order);log.info("下订单 End");log.info("减库存 Start");storageService.decrease(order.getProductId(), order.getCount());log.info("减库存 End");log.info("减余额 Start");accountService.decrease(order.getUserId(), order.getMoney());log.info("减余额 End");log.info("改状态 Start");orderDao.update(order.getId(), 1);log.info("改状态 End");}
}
5.13 OrderController
。
@RestController
public class OrderController {
@Autowiredprivate IOrderService orderService;@GetMapping("/order/create")public CommonResult create(Order order) {
orderService.create(order);return CommonResult.successOfData("订单创建成功!");}
}
5.14 测试
- http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
6. AT
模式如何做到对业务的无侵入
- AT 模式是 Seata 最主推的分布式事务解决方案,它是基于 XA 演进而来。
- AT 模式是一种 无侵入 的 分布式事务 解决方案。
- 在 AT 模式下,用户只需关注自己的 “业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段 提交 和 回滚 操作。
6.1 一阶段加载
- 在一阶段,Seata 会拦截 “业务 SQL”。
- 解析 SQL 语义,找到 “业务 SQL” 要更新的业务数据,在业务数据被更新前,将其保存成 “
before image
”。- 执行 “业务 SQL” 更新业务数据,在业务数据更新之后。
- 其保存成 “
after image
”,最后生成行锁。
- 以上操作全部在一个 数据库事务 内完成,这样保证了一阶段操作的 原子性。
6.2 二阶段提交
- 二阶段如是顺利提交的话,因为 “业务 SQL” 在一阶段已经提交至数据库,所以 Seata 框架只需将 一阶段保存的 快照数据 和 行锁删掉,完成数据清理即可。
6.3 二阶段回滚
- 二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的 “业务 SQL”,还原业务数据。
- 回滚方式便是用 “before image” 还原业务数据。
- 但在还原前要首先要校验脏写,对比 “数据库当前业务数据” 和 “after image”。
- 如果两份数据完全一致就说明 没有脏写,可以还原业务数据,如果不一致就说明 有脏写,出现脏写就需要转人工处理。