Java SPI机制
在上一篇博客中介绍了ElasticJob
的作业分片策略:
- ElasticJob‐Lite:作业分片策略介绍与源码分析
其中提到了ElasticJob
是通过Java
提供的SPI
机制(ServiceLoader
类)加载所有作业分片策略。
ServiceLoader
类就是Java
提供的SPI
,SPI
(Service Provider Interface
)是JDK
内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,不同厂商可以针对同一接口做出不同的实现,比如java.sql.Driver
接口,MySQL
和PostgreSQL
都提供了对应的实现给用户使用,而Java
的SPI
机制可以为某个接口寻找服务实现。Java
中SPI
机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。
ServiceLoader
类正常工作的唯一要求是服务提供类必须具有无参构造函数,以便它们可以在加载期间实例化。通过在资源目录的META-INF/services
中放置服务提供者配置文件来标识服务提供者,文件名是服务类型的完全限定名(比如ElasticJobListener
类的完全限定名),该文件包含具体的服务提供者类的完全限定名列表(ElasticJobListener
实现类的完全限定名列表),每行一个,每个名称周围的空格和制表符以及空行都将被忽略,该文件必须以UTF-8
编码。
自定义作业分片策略
所有可用的作业分片策略在JobShardingStrategyFactory
类的静态块中被加载(通过ElasticJobServiceLoader
类,该类是ElasticJob
基于Java SPI
机制实现的特定于作业的服务加载器)。
static {
ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);}
加载的类型是JobShardingStrategy.class
,因此自定义的作业分片策略需要实现该接口。
自定义作业分片策略ShuffleJobShardingStrategy
类:
package com.kaven.job.my;import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;import java.util.*;public class ShuffleJobShardingStrategy implements JobShardingStrategy {
@Overridepublic Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();}// 先将作业分片项装入容器List<Integer> shuffleShardingList = new ArrayList<>(shardingTotalCount);for (int i = 0; i < shardingTotalCount; i++) {
shuffleShardingList.add(i);}// 将容器中的作业分片项顺序打乱(使用容器的shuffle方法)Collections.shuffle(shuffleShardingList);// 模仿AverageAllocationJobShardingStrategy作业分片策略进行分配Map<JobInstance, List<Integer>> result = shardingShuffle(jobInstances, shardingTotalCount, shuffleShardingList);addShuffle(jobInstances, shardingTotalCount, result, shuffleShardingList);return result;}private Map<JobInstance, List<Integer>> shardingShuffle(final List<JobInstance> shardingUnits,final int shardingTotalCount,final List<Integer> shuffleShardingList) {
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);// 每个作业服务器最少应该分配的作业分片项数int itemCountPerSharding = shardingTotalCount / shardingUnits.size();int count = 0;for (JobInstance each : shardingUnits) {
// 每个作业服务器申请的作业分片项列表(容量为itemCountPerSharding + 1)// itemCountPerSharding + 1为每个作业服务器最多应该分配的作业分片项数List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
// 给作业分片项列表添加容器中的第i个作业分片项shardingItems.add(shuffleShardingList.get(i));}// 将作业服务器与它执行的作业分片项列表进行关联result.put(each, shardingItems);count++;}return result;}private void addShuffle(final List<JobInstance> shardingUnits, final int shardingTotalCount,final Map<JobInstance, List<Integer>> shardingResults,final List<Integer> shuffleShardingList) {
// 无法平均分配的分片项数int aliquant = shardingTotalCount % shardingUnits.size();// 已分配的无法平均分配的分片项数int count = 0;for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
// 是否还有无法平均分配的分片项if (count < aliquant) {
// 分配给序号较小的作业服务器entry.getValue().add(shuffleShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count));}// 已分配数更新count++;}}// 作业分片策略的标识符@Overridepublic String getType() {
return "Shuffle";}
}
博主自定义的ShuffleJobShardingStrategy
作业分片策略是模仿AverageAllocationJobShardingStrategy
作业分片策略(默认的作业分片策略),只是先将作业分片项装入容器,然后将容器中的作业分片项顺序打乱(使用容器的shuffle
方法),之后再基于该作业分片项容器使用AverageAllocationJobShardingStrategy
作业分片策略给作业服务器分配该容器中的作业分片项,如果不了解AverageAllocationJobShardingStrategy
作业分片策略,可以去看看最上面列出的博客。
添加服务实现
在resources
的META-INF/services
中放置服务提供者配置文件来标识服务提供者,如下图所示:
测试
作业定义(Simple
作业):
package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;import java.text.SimpleDateFormat;
import java.util.Date;public class MySimpleJob implements SimpleJob {
private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void execute(ShardingContext shardingContext) {
String job = shardingContext.getShardingParameter();if(job == null || job.trim().equals("")) {
System.out.println("请指定帮[Kaven]执行的任务名称!");throw new RuntimeException();}System.out.printf("%s 执行任务%d - [%s]!\n", formatter.format(new Date()),shardingContext.getShardingItem(), job);}
}
启动类:
package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;public class Application {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration()).schedule();}// 注册中心private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9000", "my-job");zc.setConnectionTimeoutMilliseconds(40000);zc.setMaxRetries(5);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);regCenter.init();return regCenter;}// 作业配置private static JobConfiguration createJobConfiguration() {
String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=写博客,6=看源码";return JobConfiguration.newBuilder("MySimpleJob", 7).cron("30 0/2 * * * ?").shardingItemParameters(jobs)// 使用自定义的作业分片策略.jobShardingStrategyType("Shuffle").overwrite(true).build();}
}
启动三个作业服务器,输出如下图所示:
输出符合预期,因为自定义作业分片策略是模仿AverageAllocationJobShardingStrategy
作业分片策略,但自定义作业分片策略中将作业的分片项顺序打乱了,因此给每个作业服务器分配的作业分片项可能不是连续的。
修改作业配置(使用默认的作业分片策略):
private static JobConfiguration createJobConfiguration() {
String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=写博客,6=看源码";return JobConfiguration.newBuilder("MySimpleJob", 7).cron("30 0/2 * * * ?").shardingItemParameters(jobs)
// .jobShardingStrategyType("Shuffle").overwrite(true).build();}
启动三个作业服务器,输出如下图所示:
输出符合AverageAllocationJobShardingStrategy
作业分片策略,[0,1,6]
、[2,3]
、[4,5]
很显然是有序的,而博主自定义的作业分片策略是乱序的。
ElasticJob
如何自定义作业分片策略就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。