当前位置: 代码迷 >> 综合 >> Elastic-Job (四)Elastic-Job+Spring Boot 实现订单取消
  详细解决方案

Elastic-Job (四)Elastic-Job+Spring Boot 实现订单取消

热度:101   发布时间:2023-11-17 12:43:21.0

1、 项目搭建

1.1 项目结构: 

1.2 使用jar包

项目中所使用的是springboot2.3.1以及如下jar包:

<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.1</version>
</dependency>        

1.3 所需的表

订单表: 

CREATE TABLE `t_order` (`id` varchar(20) NOT NULL COMMENT '主键',`code` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '流水号',`amount` decimal(10,2) unsigned NOT NULL COMMENT '总金额',`payment_type` tinyint unsigned NOT NULL COMMENT '支付方式:1借记卡,2信用卡,3微信,4支付宝,5现金',`status` tinyint unsigned NOT NULL COMMENT '状态:1未付款,2已付款,3已发货,4已签收',`create_time` datetime NOT NULL COMMENT '创建时间',`update_time` datetime DEFAULT NULL COMMENT '更新时间',`is_cancel` tinyint(1) NOT NULL DEFAULT '0' COMMENT '订单是否被取消',`version` int DEFAULT NULL COMMENT '版本号,用于乐观锁',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='订单表';

2、配置

数据源+elastic-job+mybatis-plus的相关配置:

application.yml:

#elastic-job配置
elasticjob:zookeeper:name-space: springboot-elasticjobserver-list: 192.168.24.140:2181#数据源配置
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.24.140:3306/test?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&serverTimezone=GMT%2B8username: rootpassword: 123456# mybatis-plus的sql打印
mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

2.1 mybatis-plus配置

2.1.1 mybatis-plus自动填充(声明为组件,纳入spring容器管理)

@Slf4j
@Component
public class MyMetaObjectHandler implements MetaObjectHandler {@Overridepublic void insertFill(MetaObject metaObject) {log.info("start insert fill ....");this.strictInsertFill(metaObject, "createTime", LocalDateTime.class, LocalDateTime.now());this.strictInsertFill(metaObject, "status", Integer.class, 1);this.strictInsertFill(metaObject, "isCancel", Integer.class, 0);this.strictInsertFill(metaObject, "version", Integer.class, 0);}@Overridepublic void updateFill(MetaObject metaObject) {log.info("start update fill ....");this.strictInsertFill(metaObject, "updateTime", LocalDateTime.class, LocalDateTime.now());}
}

2.1.2 配置mapper扫描以及乐观锁

@Configuration
@MapperScan(value = "com.lucifer.elastic.mapper")
public class MybatisPlusConfig {/*** 开启乐观锁* @return*/@Beanpublic OptimisticLockerInterceptor optimisticLockerInterceptor() {return new OptimisticLockerInterceptor();}}

2.2 elastic-job及zookeeper的配置

2.2.1 elastic job的simple的自定义注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {String jobName() default "";String cron() default "";int shardingTotalCount() default 1;boolean overwrite() default false;
}

2.2.2 简单定时任务自动配置

@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {@Resourceprivate CoordinatorRegistryCenter zkCenter;@Resourceprivate ApplicationContext applicationContext;@PostConstructpublic void initSimpleJob() {Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);for (Map.Entry<String, Object> entry : beans.entrySet()) {Object instance = entry.getValue();Class<?>[] interfaces = instance.getClass().getInterfaces();for (Class<?> anInterface : interfaces) {if (anInterface == SimpleJob.class) {ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);String jobName = annotation.jobName();String cron = annotation.cron();boolean overwrite = annotation.overwrite();int shardingTotalCount = annotation.shardingTotalCount();//job核心配置JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build();//job类型配置JobTypeConfiguration jobTypeConfiguration =new SimpleJobConfiguration(jobCoreConfiguration, instance.getClass().getCanonicalName());//job根的配置(LiteJobConfiguration)LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobTypeConfiguration).overwrite(overwrite).build();new SpringJobScheduler((ElasticJob) instance, zkCenter, liteJobConfiguration).init();}}}}
}

2.2.3 Zookeeper的自动配置

@Configuration
@EnableConfigurationProperties(ZookeeperProperties.class)
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
public class ZookeeperAutoConfig {@Resourceprivate ZookeeperProperties zookeeperProperties;/*** zookeeper注册中心** @return*/@Bean(initMethod = "init")public CoordinatorRegistryCenter zkCenter() {String serverList = zookeeperProperties.getServerList();String nameSpace = zookeeperProperties.getNameSpace();ZookeeperConfiguration zookeeperConfiguration =new ZookeeperConfiguration(serverList, nameSpace);CoordinatorRegistryCenter coordinatorRegistryCenter =new ZookeeperRegistryCenter(zookeeperConfiguration);//注册中心初始化coordinatorRegistryCenter.init();return coordinatorRegistryCenter;}
}

 2.2.4 Zookeeper的自定义配置(读取application.yml中的自定义配置)

@Data
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
public class ZookeeperProperties {/*** zookeeper地址列表*/private String serverList;/*** zookeeper命名空间*/private String nameSpace;
}

2.2.5 创建spring.factories文件

在resources文件夹下,创建META-INF,创建spring.factories文件:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.lucifer.elastic.config.ZookeeperAutoConfig,\
com.lucifer.elastic.config.SimpleJobAutoConfig

2.3 定时任务

2.3.1 创建订单的定时任务:模拟订单的创建

@ElasticSimpleJob(jobName = "addOrderSimpleJob",cron = "0/15 * * * * ?",   //每15秒执行一次shardingTotalCount = 2,overwrite = true
)
@Slf4j
public class AddOrderSimpleJob implements SimpleJob {@Resourceprivate OrderService orderService;/*** 插入订单的定时任务,模拟订单创建* @param shardingContext*/@Overridepublic void execute(ShardingContext shardingContext) {log.info("======【订单新增的定时任务】===========");log.info("分片项:{},总分片数:{}", shardingContext.getShardingItem(), shardingContext.getShardingTotalCount());for (int i = 0; i < 5; i++) {orderService.addOrder();}}
}

2.3.2 订单取消的定时任务

@ElasticSimpleJob(jobName = "cancelOrderSimpleJob",cron = "0/30 * * * * ?",   //每30秒执行一次shardingTotalCount = 2,overwrite = true
)
@Slf4j
public class CancelOrderSimpleJob implements SimpleJob {@Resourceprivate OrderService orderService;/*** 取消订单** @param shardingContext*/@Overridepublic void execute(ShardingContext shardingContext) {//查询所有未支付的比当前时间小于1分钟的取消掉,为了演示,此处就只设置为1分钟List<Order> needCancelOrders = orderService.findNeedCancelOrder(1);int size = needCancelOrders.size();log.info("======【取消订单新增的定时任务】,此批次:{},需要取消订单数:{}===========", shardingContext.getTaskId(), size);
//        if (needCancelOrders != null && needCancelOrders.size() > 0) {
//            ExecutorService fixedThreadPool = newFixedThreadPool(4);
//
//            for (Order order : needCancelOrders) {
//                fixedThreadPool.execute(() -> orderService.cancelOrder(order.getId()));
//            }
//        }needCancelOrders.parallelStream().collect(Collectors.toList()).forEach(order -> orderService.cancelOrder(order.getId()));}
}

2.4 业务代码

2.4.1 mapper 接口

public interface OrderMapper extends BaseMapper<Order> {
}

2.4.2 service (这里为了方便,将service接口与实现类就简写了,直接一个service类)

@Slf4j
@Service
public class OrderService {@Resourceprivate OrderMapper orderMapper;/*** 查询需要取消的订单** @return*/public List<Order> findNeedCancelOrder(long minutes) {QueryWrapper<Order> queryWrapper = new QueryWrapper<>();queryWrapper.lt("create_time", LocalDateTime.now().minusMinutes(minutes));queryWrapper.eq("is_cancel",0);List<Order> orders = orderMapper.selectList(queryWrapper);return orders;}/*** 新增订单*/public void addOrder() {Order order = new Order();order.setCode(IdWorker.getIdStr());order.setAmount(BigDecimal.valueOf(Math.random() * 10000 + 1).setScale(2, BigDecimal.ROUND_HALF_UP));order.setPaymentType((int) (Math.random() * 5 + 1));//其它字段使用mybatis-plus的自动填充插入orderMapper.insert(order);}/*** 取消订单** @param orderId 订单id*/public void cancelOrder(String orderId) {Order order = new Order();order.setId(orderId);//订单是否被取消,0:未取消、1:已取消order.setIsCancel(1);Integer oldVersion = orderMapper.selectById(orderId).getVersion();System.out.println("乐观锁字段:旧version:" + oldVersion);order.setVersion(oldVersion);int updateCount = orderMapper.updateById(order);System.out.println("乐观锁字段:更新后的version:" + order.getVersion());if (updateCount == 0) {throw new RuntimeException("更新失败");}}
}

2.4.3 订单表实体类

@Data
@TableName(value = "t_order")
public class Order {@TableId(type=IdType.ASSIGN_ID)private String id;/*** 流水号*/@TableField(value = "code")private String code;/*** 总金额*/@TableField(value = "amount")private BigDecimal amount;/*** 支付方式*/@TableField(value = "payment_type")private Integer paymentType;/*** 订单状态*/@TableField(value = "status",fill = FieldFill.INSERT)private Integer status;/*** 创建时间*/@TableField(value = "create_time",fill = FieldFill.INSERT)private LocalDateTime createTime;/*** 更新时间*/@TableField(value = "update_time",fill = FieldFill.UPDATE)private LocalDateTime updateTime;/*** 订单是否取消*/@TableField(value = "is_cancel",fill = FieldFill.INSERT)private Integer isCancel;/*** 版本号,用于乐观锁实现*/@Version@TableField(value = "version",fill = FieldFill.INSERT)private Integer version;
}

2.4.4 spring boot启动类

@SpringBootApplication
public class ElasticJobDemoApplication {public static void main(String[] args) {SpringApplication.run(ElasticJobDemoApplication.class, args);}
}

2.5 启动项目

创建订单以及取消定时任务启动。

查看订单表:

  相关解决方案