当前位置: 代码迷 >> 综合 >> ElasticJob‐Lite:Simple作业
  详细解决方案

ElasticJob‐Lite:Simple作业

热度:68   发布时间:2023-12-01 18:27:04.0

ElasticJob

以下关于ElasticJob的介绍来自官方文档:

ElasticJob是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目ElasticJob‐LiteElasticJob‐Cloud组成。它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。它的各个产品使用统一的作业API,开发者仅需一次开发,即可随意部署。ElasticJob已于2020528日成为Apache ShardingSphere的子项目。

ElasticJob能够让开发工程师不再担心任务的线性吞吐量提升等非功能需求,使他们能够更加专注于面向业务编码设计;同时,它也能够解放运维工程师,使他们不必再担心任务的可用性和相关管理需求,只通过轻松的增加服务节点即可达到自动化运维的目的。

环境要求:

  • Java 8及其以上版本。
  • Maven 3.5.0及其以上版本。
  • ZooKeeper 3.6.0及其以上版本。

这里先介绍子项目ElasticJob-Lite的使用,ElasticJob‐Lite的架构如下图所示:
ElasticJob‐Lite Architecture
ElasticJob‐Lite会将作业注册到ZooKeeper上,在定义的命名空间下,创建作业名称节点,用于区分不同作业,所以作业一旦创建则不能修改作业名称,如果修改作业名称将视为新的作业。

添加依赖(3.0.1是目前最新的Releases版本):

        <dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-core</artifactId><version>3.0.1</version></dependency>

在这里插入图片描述

ElasticJob的作业分类基于classtype两种类型。基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑;基于type的作业则无需编码,只需要提供相应配置即可。基于class的作业接口的方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount()getShardingItem()等方法分别获取分片总数和运行在本作业服务器的分片序列号等。

ElasticJob目前提供SimpleDataflow这两种基于class的作业类型,并提供ScriptHTTP这两种基于type的作业类型,用户可通过实现SPI接口自行扩展作业类型。

这篇博客只会介绍Simple作业,其他的作业类型以及扩展作业类型以后博主也会给大家介绍。

Simple作业

定义Simple作业需要实现SimpleJob接口,而SimpleJob接口只定义了一个方法,并且继承了ElasticJob接口。

public interface SimpleJob extends ElasticJob {
    void execute(ShardingContext shardingContext);
}

定义一个Simple作业,任务被分成三个可并行的子任务,而每个子任务只是输出当前时间和各自执行的任务分片。

package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Author: ITKaven* @Date: 2021/11/20 17:02* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/
public class MySimpleJob implements SimpleJob {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void execute(ShardingContext shardingContext) {
    switch (shardingContext.getShardingItem()) {
    case 0:System.out.println(formatter.format(new Date()) + " : ShardingItem[0]");break;case 1:System.out.println(formatter.format(new Date()) + " : ShardingItem[1]");break;case 2:System.out.println(formatter.format(new Date()) + " : ShardingItem[2]");break;default:System.out.println(formatter.format(new Date()) + " : Unknown ShardingItem");}}
}

定义了作业,还需要对作业进行配置,比如作业的名称、分片数、cron时间表达式以及是否需要失效转移等,主要通过JobConfiguration类来完成这些配置,它提供了构建者风格的方法,比如下面的作业配置,作业名称为MySimpleJob、作业分片数为3,并且在每一分钟的第30秒执行任务,调用overwrite方法用来设置在作业启动时是否将本地配置覆盖到注册中心(默认不覆盖,所以本地修改了cron时间表达式会不起作用),如果需要覆盖(方法传入true),则每次启动时都将使用本地配置(即以本地的作业配置为主,不然本地修改作业配置不会起作用)。调用failover方法用于设置是否开启失效转移(仅适用于开启了 monitorExecution,默认开启 monitorExecution,但默认不开启失效转移),ElasticJob不会在本次执行过程中进行重新分片(给作业节点分配作业分片),而是等待下次调度之前才开启重新分片流程。当作业执行过程中服务器宕机,失效转移允许将该次未完成的任务在另一作业节点上补偿执行。

开启失效转移功能,ElasticJob会监控作业每一分片的执行状态,并将其写入注册中心,供其他节点感知。在一次运行耗时较长且间隔较长的作业场景,失效转移是提升作业运行实时性的有效手段;对于间隔较短的作业,会产生大量与注册中心的网络通信,对集群的性能产生影响。而且间隔较短的作业并未见得关注单次作业的实时性,可以通过下次作业执行的重分片使所有的分片正确执行,因此不建议短间隔作业开启失效转移。 另外需要注意的是,作业本身的幂等性,是保证失效转移正确性的前提。

    private static JobConfiguration createJobConfiguration() {
    return JobConfiguration.newBuilder("MySimpleJob", 3).cron("30 * * * * ?").overwrite(true).failover(true).build();}

作业节点一:
在这里插入图片描述
作业节点二(当作业节点三宕机后,会补偿执行作业节点三未执行的作业分片):
在这里插入图片描述
作业节点三(模拟宕机):
在这里插入图片描述
接下来还需要连接注册中心(使用ZooKeeper),目前默认只支持ZooKeeper作为注册中心(CoordinatorRegistryCenter接口只有ZookeeperRegistryCenter一个实现类)。
在这里插入图片描述
在这里插入图片描述
192.168.1.184:9000ZooKeeper服务端提供的客户端连接套接字,而my-job就是作业的命名空间。

    private static CoordinatorRegistryCenter createRegistryCenter() {
    ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");zc.setConnectionTimeoutMilliseconds(40000);zc.setMaxRetries(5);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);regCenter.init();return regCenter;}

作业的各种配置和注册中心已经准备好了,就可以启动作业了,通过ScheduleJobBootstrap类来实现,主要就是将注册中心、作业定义以及作业配置传入启动作业的实例中。

    public static void main(String[] args) {
    new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(),createJobConfiguration()).schedule();}

汇总到Application类:

package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;/*** @Author: ITKaven* @Date: 2021/11/20 17:05* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/
public class Application {
    public static void main(String[] args) {
    new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(),createJobConfiguration()).schedule();}private static CoordinatorRegistryCenter createRegistryCenter() {
    ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");zc.setConnectionTimeoutMilliseconds(40000);zc.setMaxRetries(5);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);regCenter.init();return regCenter;}private static JobConfiguration createJobConfiguration() {
    return JobConfiguration.newBuilder("MySimpleJob", 3).cron("30 * * * * ?").overwrite(true).failover(true).build();}
}

结果如下图所示:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
cron时间表达式的介绍可以参考以下博客:

  • cron时间表达式简介
  • Cron表达式详解

Simple作业就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

  相关解决方案