1、 项目搭建
1.1 项目结构:
1.2 使用jar包
项目中所使用的是springboot2.3.1以及如下jar包:
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.1</version>
</dependency>
1.3 所需的表
订单表:
CREATE TABLE `t_order` (`id` varchar(20) NOT NULL COMMENT '主键',`code` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '流水号',`amount` decimal(10,2) unsigned NOT NULL COMMENT '总金额',`payment_type` tinyint unsigned NOT NULL COMMENT '支付方式:1借记卡,2信用卡,3微信,4支付宝,5现金',`status` tinyint unsigned NOT NULL COMMENT '状态:1未付款,2已付款,3已发货,4已签收',`create_time` datetime NOT NULL COMMENT '创建时间',`update_time` datetime DEFAULT NULL COMMENT '更新时间',`is_cancel` tinyint(1) NOT NULL DEFAULT '0' COMMENT '订单是否被取消',`version` int DEFAULT NULL COMMENT '版本号,用于乐观锁',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='订单表';
2、配置
数据源+elastic-job+mybatis-plus的相关配置:
application.yml:
#elastic-job配置
elasticjob:zookeeper:name-space: springboot-elasticjobserver-list: 192.168.24.140:2181#数据源配置
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.24.140:3306/test?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&serverTimezone=GMT%2B8username: rootpassword: 123456# mybatis-plus的sql打印
mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
2.1 mybatis-plus配置
2.1.1 mybatis-plus自动填充(声明为组件,纳入spring容器管理)
@Slf4j
@Component
public class MyMetaObjectHandler implements MetaObjectHandler {@Overridepublic void insertFill(MetaObject metaObject) {log.info("start insert fill ....");this.strictInsertFill(metaObject, "createTime", LocalDateTime.class, LocalDateTime.now());this.strictInsertFill(metaObject, "status", Integer.class, 1);this.strictInsertFill(metaObject, "isCancel", Integer.class, 0);this.strictInsertFill(metaObject, "version", Integer.class, 0);}@Overridepublic void updateFill(MetaObject metaObject) {log.info("start update fill ....");this.strictInsertFill(metaObject, "updateTime", LocalDateTime.class, LocalDateTime.now());}
}
2.1.2 配置mapper扫描以及乐观锁
@Configuration
@MapperScan(value = "com.lucifer.elastic.mapper")
public class MybatisPlusConfig {/*** 开启乐观锁* @return*/@Beanpublic OptimisticLockerInterceptor optimisticLockerInterceptor() {return new OptimisticLockerInterceptor();}}
2.2 elastic-job及zookeeper的配置
2.2.1 elastic job的simple的自定义注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {String jobName() default "";String cron() default "";int shardingTotalCount() default 1;boolean overwrite() default false;
}
2.2.2 简单定时任务自动配置
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {@Resourceprivate CoordinatorRegistryCenter zkCenter;@Resourceprivate ApplicationContext applicationContext;@PostConstructpublic void initSimpleJob() {Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);for (Map.Entry<String, Object> entry : beans.entrySet()) {Object instance = entry.getValue();Class<?>[] interfaces = instance.getClass().getInterfaces();for (Class<?> anInterface : interfaces) {if (anInterface == SimpleJob.class) {ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);String jobName = annotation.jobName();String cron = annotation.cron();boolean overwrite = annotation.overwrite();int shardingTotalCount = annotation.shardingTotalCount();//job核心配置JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build();//job类型配置JobTypeConfiguration jobTypeConfiguration =new SimpleJobConfiguration(jobCoreConfiguration, instance.getClass().getCanonicalName());//job根的配置(LiteJobConfiguration)LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobTypeConfiguration).overwrite(overwrite).build();new SpringJobScheduler((ElasticJob) instance, zkCenter, liteJobConfiguration).init();}}}}
}
2.2.3 Zookeeper的自动配置
@Configuration
@EnableConfigurationProperties(ZookeeperProperties.class)
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
public class ZookeeperAutoConfig {@Resourceprivate ZookeeperProperties zookeeperProperties;/*** zookeeper注册中心** @return*/@Bean(initMethod = "init")public CoordinatorRegistryCenter zkCenter() {String serverList = zookeeperProperties.getServerList();String nameSpace = zookeeperProperties.getNameSpace();ZookeeperConfiguration zookeeperConfiguration =new ZookeeperConfiguration(serverList, nameSpace);CoordinatorRegistryCenter coordinatorRegistryCenter =new ZookeeperRegistryCenter(zookeeperConfiguration);//注册中心初始化coordinatorRegistryCenter.init();return coordinatorRegistryCenter;}
}
2.2.4 Zookeeper的自定义配置(读取application.yml中的自定义配置)
@Data
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
public class ZookeeperProperties {/*** zookeeper地址列表*/private String serverList;/*** zookeeper命名空间*/private String nameSpace;
}
2.2.5 创建spring.factories文件
在resources文件夹下,创建META-INF,创建spring.factories文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.lucifer.elastic.config.ZookeeperAutoConfig,\
com.lucifer.elastic.config.SimpleJobAutoConfig
2.3 定时任务
2.3.1 创建订单的定时任务:模拟订单的创建
@ElasticSimpleJob(jobName = "addOrderSimpleJob",cron = "0/15 * * * * ?", //每15秒执行一次shardingTotalCount = 2,overwrite = true
)
@Slf4j
public class AddOrderSimpleJob implements SimpleJob {@Resourceprivate OrderService orderService;/*** 插入订单的定时任务,模拟订单创建* @param shardingContext*/@Overridepublic void execute(ShardingContext shardingContext) {log.info("======【订单新增的定时任务】===========");log.info("分片项:{},总分片数:{}", shardingContext.getShardingItem(), shardingContext.getShardingTotalCount());for (int i = 0; i < 5; i++) {orderService.addOrder();}}
}
2.3.2 订单取消的定时任务
@ElasticSimpleJob(jobName = "cancelOrderSimpleJob",cron = "0/30 * * * * ?", //每30秒执行一次shardingTotalCount = 2,overwrite = true
)
@Slf4j
public class CancelOrderSimpleJob implements SimpleJob {@Resourceprivate OrderService orderService;/*** 取消订单** @param shardingContext*/@Overridepublic void execute(ShardingContext shardingContext) {//查询所有未支付的比当前时间小于1分钟的取消掉,为了演示,此处就只设置为1分钟List<Order> needCancelOrders = orderService.findNeedCancelOrder(1);int size = needCancelOrders.size();log.info("======【取消订单新增的定时任务】,此批次:{},需要取消订单数:{}===========", shardingContext.getTaskId(), size);
// if (needCancelOrders != null && needCancelOrders.size() > 0) {
// ExecutorService fixedThreadPool = newFixedThreadPool(4);
//
// for (Order order : needCancelOrders) {
// fixedThreadPool.execute(() -> orderService.cancelOrder(order.getId()));
// }
// }needCancelOrders.parallelStream().collect(Collectors.toList()).forEach(order -> orderService.cancelOrder(order.getId()));}
}
2.4 业务代码
2.4.1 mapper 接口
public interface OrderMapper extends BaseMapper<Order> {
}
2.4.2 service (这里为了方便,将service接口与实现类就简写了,直接一个service类)
@Slf4j
@Service
public class OrderService {@Resourceprivate OrderMapper orderMapper;/*** 查询需要取消的订单** @return*/public List<Order> findNeedCancelOrder(long minutes) {QueryWrapper<Order> queryWrapper = new QueryWrapper<>();queryWrapper.lt("create_time", LocalDateTime.now().minusMinutes(minutes));queryWrapper.eq("is_cancel",0);List<Order> orders = orderMapper.selectList(queryWrapper);return orders;}/*** 新增订单*/public void addOrder() {Order order = new Order();order.setCode(IdWorker.getIdStr());order.setAmount(BigDecimal.valueOf(Math.random() * 10000 + 1).setScale(2, BigDecimal.ROUND_HALF_UP));order.setPaymentType((int) (Math.random() * 5 + 1));//其它字段使用mybatis-plus的自动填充插入orderMapper.insert(order);}/*** 取消订单** @param orderId 订单id*/public void cancelOrder(String orderId) {Order order = new Order();order.setId(orderId);//订单是否被取消,0:未取消、1:已取消order.setIsCancel(1);Integer oldVersion = orderMapper.selectById(orderId).getVersion();System.out.println("乐观锁字段:旧version:" + oldVersion);order.setVersion(oldVersion);int updateCount = orderMapper.updateById(order);System.out.println("乐观锁字段:更新后的version:" + order.getVersion());if (updateCount == 0) {throw new RuntimeException("更新失败");}}
}
2.4.3 订单表实体类
@Data
@TableName(value = "t_order")
public class Order {@TableId(type=IdType.ASSIGN_ID)private String id;/*** 流水号*/@TableField(value = "code")private String code;/*** 总金额*/@TableField(value = "amount")private BigDecimal amount;/*** 支付方式*/@TableField(value = "payment_type")private Integer paymentType;/*** 订单状态*/@TableField(value = "status",fill = FieldFill.INSERT)private Integer status;/*** 创建时间*/@TableField(value = "create_time",fill = FieldFill.INSERT)private LocalDateTime createTime;/*** 更新时间*/@TableField(value = "update_time",fill = FieldFill.UPDATE)private LocalDateTime updateTime;/*** 订单是否取消*/@TableField(value = "is_cancel",fill = FieldFill.INSERT)private Integer isCancel;/*** 版本号,用于乐观锁实现*/@Version@TableField(value = "version",fill = FieldFill.INSERT)private Integer version;
}
2.4.4 spring boot启动类
@SpringBootApplication
public class ElasticJobDemoApplication {public static void main(String[] args) {SpringApplication.run(ElasticJobDemoApplication.class, args);}
}
2.5 启动项目
创建订单以及取消定时任务启动。
查看订单表: