当前位置: 代码迷 >> 综合 >> ElasticJob‐Lite:Dataflow作业
  详细解决方案

ElasticJob‐Lite:Dataflow作业

热度:26   发布时间:2023-12-01 17:49:21.0

ElasticJob的作业分类基于classtype两种类型。基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑;基于type的作业则无需编码,只需要提供相应配置即可。基于class的作业接口的方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount()getShardingItem()等方法分别获取分片总数和运行在本作业服务器的分片序列号等。

ElasticJob目前提供SimpleDataflow这两种基于class的作业类型,并提供ScriptHTTP这两种基于type的作业类型,用户可通过实现SPI接口自行扩展作业类型。

本篇博客介绍Dataflow作业。

添加依赖(3.0.1是目前最新的Releases版本):

        <dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-core</artifactId><version>3.0.1</version></dependency>

Dataflow作业用于处理数据流,需要实现DataflowJob接口。该接口提供2个方法,分别用于抓取 (fetchData)和处理 (processData) 数据。

package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import reactor.core.publisher.Flux;import java.text.SimpleDateFormat;
import java.util.*;/*** @Author: ITKaven* @Date: 2021/11/20 17:02* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/
public class MyDataflowJob implements DataflowJob<Flux<String>> {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static final String[][] message = {
    {
    "java", "c", "c++", "python", "go"},{
    "docker", "k8s"},{
    "elastic-job", "elasticsearch", "zookeeper", "spring cloud alibaba"}};@Overridepublic List<Flux<String>> fetchData(ShardingContext shardingContext) {
    int item = shardingContext.getShardingItem();return Collections.singletonList(Flux.fromArray(message[item]));}@Overridepublic void processData(ShardingContext shardingContext, List<Flux<String>> list) {
    System.out.println("-------------------------------------------------------------");System.out.println(formatter.format(new Date()));System.out.println(shardingContext.getShardingParameter());list.forEach(MyDataflowJob::printData);}private static void printData(Flux<String> data) {
    data.sort().toStream().forEach(System.out::println);}
}

Flux抽象类由reactor项目提供,Spring的响应式编程就是基于reactor项目。响应式编程是一种基于数据流和变化传递的声明式编程范式。这里就不详细介绍了,代码应该很容易看懂,就是根据分片项来获取对应的字符串数组,然后根据字典序从小到大的顺序打印该字符串数组中的字符串。getShardingParameter方法用于获取对应的分片参数。

reactor项目:

        <dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.3.8.RELEASE</version></dependency>
package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;/*** @Author: ITKaven* @Date: 2021/11/20 17:05* @Blog: https://kaven.blog.csdn.net* @Leetcode: https://leetcode-cn.com/u/kavenit* @Notes:*/
public class Application {
    public static void main(String[] args) {
    new ScheduleJobBootstrap(createRegistryCenter(), new MyDataflowJob(),createJobConfiguration()).schedule();}private static CoordinatorRegistryCenter createRegistryCenter() {
    ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");zc.setConnectionTimeoutMilliseconds(40000);zc.setMaxRetries(5);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);regCenter.init();return regCenter;}private static JobConfiguration createJobConfiguration() {
    return JobConfiguration.newBuilder("MyDataflowJob", 3).shardingItemParameters("0=程序语言,1=容器技术,2=框架").description("数据流作业").cron("30 * * * * ?").overwrite(true).monitorExecution(false).misfire(true)                .build();}
}

shardingItemParameters方法用于设置分片项和分片参数的映射,分片项和分片参数用等号分割,多个分片项和分片参数用逗号分割,分片项从零开始。monitorExecution方法是用于设置是否启动monitorExecution(默认启动,默认值为true),对于短间隔作业,最好禁用monitorExecution以提高性能(会增大ZooKeeper的压力,使得ZooKeeper性能下降,因为每次cron时间间隔会写数据到ZooKeeper上,用来保证数据不会重复获取)。 如果禁用monitorExecution,它不能保证数据重复获取,并且不能失效转移,因此作业需要保持幂等性。 对于长间隔作业,最好启用monitorExecution以保证只获取一次数据。misfire方法用于设置是否启动错过任务重执行(默认启动,默认值也为true),ElasticJob不允许作业在同一时间内叠加执行。当作业的执行时长超过其运行间隔(因为某种原因),错过任务重执行能够保证作业在完成上次的任务后继续执行逾期的作业。

结果如下图所示:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
Dataflow作业就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

  相关解决方案