当前位置: 代码迷 >> 综合 >> ElasticJob‐Lite:作业分片策略介绍与源码分析
  详细解决方案

ElasticJob‐Lite:作业分片策略介绍与源码分析

热度:27   发布时间:2023-12-01 18:18:21.0

分片

弹性调度是ElasticJob最重要的功能,也是这款产品名称的由来。它是一款能够让任务通过分片进行水
平扩展的任务处理系统。

ElasticJob中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

举例说明,如果作业分为4片,用两台服务器执行,则每个服务器分到2片,分别负责作业的50%的负载,如下图所示。
在这里插入图片描述
ElasticJob并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。分片项为数字,始于0而终于分片总数减1

以上是ElasticJob的官方文档对分片的描述,而文档对作业分片策略的介绍非常简单,只给了作业分片策略的SPI名称,如下图所示:
在这里插入图片描述

作业分片策略

博主目前使用的是3.0.1版本的ElasticJob‐Lite(目前最新版本)。

        <dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-core</artifactId><version>3.0.1</version></dependency>

在这里插入图片描述
作业分片策略的SPI名称是JobShardingStrategy,是作业分片策略的顶层设计。

package org.apache.shardingsphere.elasticjob.infra.handler.sharding;import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;import java.util.List;
import java.util.Map;/*** 作业分片策略*/
public interface JobShardingStrategy extends TypedSPI {
    /*** 作业分片* jobInstances – 参与分片的所有作业实例(作业服务器)* jobName -作业名称* shardingTotalCount – 分片总数*/Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}

JobShardingStrategy接口的sharding方法就是用来定义作业分片的逻辑,供子类实现,目前有三个实现类:AverageAllocationJobShardingStrategyOdevitySortByNameJobShardingStrategy以及RoundRobinByNameJobShardingStrategy
在这里插入图片描述

AverageAllocationJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
    @Overridepublic Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
    if (jobInstances.isEmpty()) {
    return Collections.emptyMap();}Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);addAliquant(jobInstances, shardingTotalCount, result);return result;}private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
    Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);int itemCountPerSharding = shardingTotalCount / shardingUnits.size();int count = 0;for (JobInstance each : shardingUnits) {
    List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
    shardingItems.add(i);}result.put(each, shardingItems);count++;}return result;}private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
    int aliquant = shardingTotalCount % shardingUnits.size();int count = 0;for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
    if (count < aliquant) {
    entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);}count++;}}@Overridepublic String getType() {
    return "AVG_ALLOCATION";}
}

这是一种尽量平均分配的分片策略,如果作业的分片项无法平均分配给所有的作业服务器,即作业的分片项数%作业服务器数不为零,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。 例如:

  • 如果有3个作业服务器,总分片数为9,每个作业服务器的分片项为:1=[0,1,2]2=[3,4,5]3=[6,7,8]
  • 如果有3个作业服务器,总分片数为8,每个作业服务器的分片项为:1=[0,1,6]2=[2,3,7]3=[4,5]
  • 如果有3个作业服务器,总分片数为10,每个作业服务器的分片项为:1=[0,1,2,9]2=[3,4,5]3=[6,7,8]

先给每个作业服务器分配相同数量的作业分片项(数量为:作业的分片项数/作业服务器数)。

    private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
    Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);// 每个作业服务器最少应该分配的作业分片项数int itemCountPerSharding = shardingTotalCount / shardingUnits.size();int count = 0;for (JobInstance each : shardingUnits) {
    // 每个作业服务器申请的作业分片项列表(容量为itemCountPerSharding + 1)// itemCountPerSharding + 1为每个作业服务器最多应该分配的作业分片项数List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
    // 给作业分片项列表添加第i个作业分片项shardingItems.add(i);}// 将作业服务器与它执行的作业分片项列表进行关联result.put(each, shardingItems);count++;}return result;}

如果作业的分片项无法平均分配给所有的作业服务器,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。

    private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
    // 无法平均分配的分片项数int aliquant = shardingTotalCount % shardingUnits.size();// 已分配的无法平均分配的分片项数int count = 0;for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
    // 是否还有无法平均分配的分片项if (count < aliquant) {
    // 分配给序号较小的作业服务器entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);}// 已分配数更新count++;}}

OdevitySortByNameJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;import java.util.Collections;
import java.util.List;
import java.util.Map;public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();@Overridepublic Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
    long jobNameHash = jobName.hashCode();if (0 == jobNameHash % 2) {
    Collections.reverse(jobInstances);}return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);}@Overridepublic String getType() {
    return "ODEVITY";}
}

其实还是使用AverageAllocationJobShardingStrategy作业分片策略进行分配,只是会先根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行reverse操作。例如:

  • 如果有3个作业服务器,总分片数为2,作业名称的哈希码为奇数(对作业服务器列表不进行reverse操作),每个作业服务器的分片项为:1=[0]2=[1]3=[]
  • 如果有3个作业服务器,总分片数为2,作业名的哈希码是偶数(对作业服务器列表进行reverse操作),每个作业服务器的分片项为:3=[0]2=[1]1=[]

RoundRobinByNameJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public final class RoundRobinByNameJobShardingStrategy implements JobShardingStrategy {
    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();@Overridepublic Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
    return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);}private List<JobInstance> rotateServerList(final List<JobInstance> shardingUnits, final String jobName) {
    int shardingUnitsSize = shardingUnits.size();int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;if (0 == offset) {
    return shardingUnits;}List<JobInstance> result = new ArrayList<>(shardingUnitsSize);for (int i = 0; i < shardingUnitsSize; i++) {
    int index = (i + offset) % shardingUnitsSize;result.add(shardingUnits.get(index));}return result;}@Overridepublic String getType() {
    return "ROUND_ROBIN";}
}

其实跟OdevitySortByNameJobShardingStrategy作业分片策略类似,都是使用AverageAllocationJobShardingStrategy作业分片策略进行分配,并且在分配前都会根据作业名称的哈希码将作业服务器列表中的作业服务器项改变顺序,只是变序规则不一样而已,OdevitySortByNameJobShardingStrategy作业分片策略根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行reverse操作,而RoundRobinByNameJobShardingStrategy作业分片策略根据作业名称的哈希码的绝对值%作业服务器数的值对作业服务器列表进行rotate操作。例如:

  • 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为0,每个作业服务器的分片项为:1=[0]2=[1]3=[]
  • 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为1,每个作业服务器的分片项为:2=[0]3=[1]1=[]
  • 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为2,每个作业服务器的分片项为:3=[0]1=[1]2=[]

JobShardingStrategyFactory

作业的分片策略通过JobShardingStrategyFactory类(作业分片策略工厂类)的getStrategy方法获取,源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding;import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobShardingStrategyFactory {
    private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";static {
    ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);}public static JobShardingStrategy getStrategy(final String type) {
    if (Strings.isNullOrEmpty(type)) {
    return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, DEFAULT_STRATEGY).get();}return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, type).orElseThrow(() -> new JobConfigurationException("Cannot find sharding strategy using type '%s'.", type));}
}

JobShardingStrategyFactory类的静态块中使用ElasticJobServiceLoader类的registerTypedService方法加载所有作业分片策略。

    static {
    ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);}

ElasticJobServiceLoader类的相关代码如下所示,通过Java提供的SPI机制(ServiceLoader类)加载所有作业分片策略。

    private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, TypedSPI>> TYPED_SERVICES = new ConcurrentHashMap<>();private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, Class<? extends TypedSPI>>> TYPED_SERVICE_CLASSES = new ConcurrentHashMap<>();public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {
    if (TYPED_SERVICES.containsKey(typedService)) {
    return;}ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));}private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {
    TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());}

默认为AverageAllocationJobShardingStrategy作业分片策略,和官方文档给的示意图是对应的。

private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";

AverageAllocationJobShardingStrategy类的getType方法(ElasticJobServiceLoader类加载所有作业分片策略会将getType方法的返回值作为存储每个作业分片策略实例的第二个key值)。

    @Overridepublic String getType() {
    return "AVG_ALLOCATION";}

到这里就结束了,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

  相关解决方案