当前位置: 代码迷 >> 综合 >> Elastic-Job (二)实现Dataflow作业
  详细解决方案

Elastic-Job (二)实现Dataflow作业

热度:62   发布时间:2023-11-17 12:44:45.0

附:可以参考:Elastic-Job (一)实现Simple作业

对于Dataflow类型作业,官方文档给的解释是:

  1. 可通过DataflowJobConfiguration配置是否流式处理。
  2. 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
  3. 如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

elastic-job配置:

package com.lucifer.config;import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.lucifer.job.MyDataFlowJob;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author lucifer* @date 2020/5/4 19:50* @description elastic-job配置*/
@Configuration
public class LiteJobConfig {private static final String SERVER_LISTS = "192.168.24.128:2181";private static final String NAMES_SPACE = "myDataFlowJob";@Beanpublic static void JobScheduler() {new JobScheduler(zkCenter(), dataFlowJobConfiguration()).init();}public static CoordinatorRegistryCenter zkCenter() {ZookeeperConfiguration zookeeperConfiguration =new ZookeeperConfiguration(SERVER_LISTS, NAMES_SPACE);CoordinatorRegistryCenter coordinatorRegistryCenter =new ZookeeperRegistryCenter(zookeeperConfiguration);//注册中心初始化coordinatorRegistryCenter.init();return coordinatorRegistryCenter;}/*** job配置** @return*/public static LiteJobConfiguration dataFlowJobConfiguration() {//job核心配置JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("myDataFlowJob", "0/10 * * * * ? ", 2).build();//job类型配置JobTypeConfiguration jobTypeConfiguration =new DataflowJobConfiguration(jobCoreConfiguration, MyDataFlowJob.class.getCanonicalName(),true);//job根的配置LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobTypeConfiguration).build();return liteJobConfiguration;}
}

MyDataFlowJob:

package com.lucifer.job;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.lucifer.pojo.Order;
import lombok.extern.slf4j.Slf4j;import java.util.*;import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @author lucifer* @date 2020/5/4 21:10* @description Data-Flow*/
@Slf4j
public class MyDataFlowJob implements DataflowJob<Order> {//模拟100个未处理订单private static List<Order> orders=new ArrayList<>();{for (int i = 0; i < 100; i++) {Order order = new Order();order.setOrderId(i);order.setStatus(0);orders.add(order);}}@Overridepublic List<Order> fetchData(ShardingContext shardingContext) {//订单号%分片总数==当前分片项List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0).filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()).collect(Collectors.toList());List<Order> subList = null;if (orderList != null && orderList.size() > 0) {subList = orderList.subList(0, 10);}//由于抓取数据过快,为更好看出效果,此处休眠一会儿try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}log.info("分片项:{},我抓取的数据:{}", shardingContext.getShardingItem(), subList);return subList;}@Overridepublic void processData(ShardingContext shardingContext, List<Order> list) {list.forEach(o -> o.setStatus(1));try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}log.info("分片项:{},处理中.....", shardingContext.getShardingItem());}
}

实体类: 

package com.lucifer.pojo;import lombok.Data;/*** @author lucifer* @date 2020/5/4 21:10* @description 订单实体类*/
@Data
public class Order {//订单idprivate Integer orderId;//订单状态,0:未处理,1:已处理private Integer status;}

控制台打印:

2020-05-04 23:20:36.958  INFO 19320 --- [           main] com.lucifer.ElasticJobApplication        : Started ElasticJobApplication in 2.223 seconds (JVM running for 3.034)
2020-05-04 23:20:43.075  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=0, status=0), Order(orderId=2, status=0), Order(orderId=4, status=0), Order(orderId=6, status=0), Order(orderId=8, status=0), Order(orderId=10, status=0), Order(orderId=12, status=0), Order(orderId=14, status=0), Order(orderId=16, status=0), Order(orderId=18, status=0)]
2020-05-04 23:20:43.075  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=1, status=0), Order(orderId=3, status=0), Order(orderId=5, status=0), Order(orderId=7, status=0), Order(orderId=9, status=0), Order(orderId=11, status=0), Order(orderId=13, status=0), Order(orderId=15, status=0), Order(orderId=17, status=0), Order(orderId=19, status=0)]
2020-05-04 23:20:48.082  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:20:48.083  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:20:51.084  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=20, status=0), Order(orderId=22, status=0), Order(orderId=24, status=0), Order(orderId=26, status=0), Order(orderId=28, status=0), Order(orderId=30, status=0), Order(orderId=32, status=0), Order(orderId=34, status=0), Order(orderId=36, status=0), Order(orderId=38, status=0)]
2020-05-04 23:20:51.084  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=21, status=0), Order(orderId=23, status=0), Order(orderId=25, status=0), Order(orderId=27, status=0), Order(orderId=29, status=0), Order(orderId=31, status=0), Order(orderId=33, status=0), Order(orderId=35, status=0), Order(orderId=37, status=0), Order(orderId=39, status=0)]
2020-05-04 23:20:56.085  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:20:56.085  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:20:59.086  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=41, status=0), Order(orderId=43, status=0), Order(orderId=45, status=0), Order(orderId=47, status=0), Order(orderId=49, status=0), Order(orderId=51, status=0), Order(orderId=53, status=0), Order(orderId=55, status=0), Order(orderId=57, status=0), Order(orderId=59, status=0)]
2020-05-04 23:20:59.087  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=40, status=0), Order(orderId=42, status=0), Order(orderId=44, status=0), Order(orderId=46, status=0), Order(orderId=48, status=0), Order(orderId=50, status=0), Order(orderId=52, status=0), Order(orderId=54, status=0), Order(orderId=56, status=0), Order(orderId=58, status=0)]
2020-05-04 23:21:04.086  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:21:04.088  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:21:07.088  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=61, status=0), Order(orderId=63, status=0), Order(orderId=65, status=0), Order(orderId=67, status=0), Order(orderId=69, status=0), Order(orderId=71, status=0), Order(orderId=73, status=0), Order(orderId=75, status=0), Order(orderId=77, status=0), Order(orderId=79, status=0)]
2020-05-04 23:21:07.090  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=60, status=0), Order(orderId=62, status=0), Order(orderId=64, status=0), Order(orderId=66, status=0), Order(orderId=68, status=0), Order(orderId=70, status=0), Order(orderId=72, status=0), Order(orderId=74, status=0), Order(orderId=76, status=0), Order(orderId=78, status=0)]
2020-05-04 23:21:12.089  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:21:12.091  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:21:15.092  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:[Order(orderId=81, status=0), Order(orderId=83, status=0), Order(orderId=85, status=0), Order(orderId=87, status=0), Order(orderId=89, status=0), Order(orderId=91, status=0), Order(orderId=93, status=0), Order(orderId=95, status=0), Order(orderId=97, status=0), Order(orderId=99, status=0)]
2020-05-04 23:21:15.093  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:[Order(orderId=80, status=0), Order(orderId=82, status=0), Order(orderId=84, status=0), Order(orderId=86, status=0), Order(orderId=88, status=0), Order(orderId=90, status=0), Order(orderId=92, status=0), Order(orderId=94, status=0), Order(orderId=96, status=0), Order(orderId=98, status=0)]
2020-05-04 23:21:20.092  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,处理中.....
2020-05-04 23:21:20.093  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,处理中.....
2020-05-04 23:21:23.095  INFO 19320 --- [myDataFlowJob-2] com.lucifer.job.MyDataFlowJob            : 分片项:1,我抓取的数据:null
2020-05-04 23:21:23.096  INFO 19320 --- [myDataFlowJob-1] com.lucifer.job.MyDataFlowJob            : 分片项:0,我抓取的数据:null

 

  相关解决方案