Elastic-Job的使用心得
场景介绍:
最近由于公司要求,项目的定时任务作业统一使用elastic-job改造;而之前没有接触过,然后在熟悉它,和改造使用的过程中就碰到了一些和业务结合的问题,最终在同事及前辈的帮助下得以解决;个中过程还是有所体会和所得,故记录下来。
Elastic-Job简介
elastic-job是当当开源的一款非常好用的作业框架,在这之前,我们开发定时任务一般都是使用quartz或者spring-task。(哈哈,项目之前用的quartz),其实使用Elastic-Job的最大好处或者说最终目的是为了多节点部署和水平扩展。
Elastic-Job产品线说明
elastic-job在2.×之后,出了两个产品线:Elastic-Job-Lite和Elastic-Job-Cloud。我们一般使用Elastic-Job-Lite就能够满足需求(实际上也是这样的!),项目中使用的也是Elastic-Job-Lite。1.×系列对应的就只有Elastic-Job-Lite,并且在2.×里修改了一些核心类名,差别虽大,原理类似,建议使用2.×系列。本项目中使用的是版本是2.1.5。
Elastic-Job-Lite原理
我这里也不做说明,其实我也不是很明白,只是懂他的意思而已。就知道一点就好了:elastic底层的任务调度还是使用的quartz,通过zookeeper来动态给job节点分片。具体的原理,可以自行百度,我相信会有很多你要的。
代码演示
step1. 引入框架的jar包
<!-- 引入elastic-job-lite核心模块 -->
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version>
</dependency>
<!-- 使用springframework自定义命名空间时引入 -->
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version>
</dependency>
step2. 编写job
public class MyElasticJob implements SimpleJob {@Autowired
private TaskService taskService;@Override
public void execute(ShardingContext shardingContext) {System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));/*** 这里是实际要执行的任务(作业)*/doExecute();
}
}
Step3. Spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"xmlns:job="http://www.dangdang.com/schema/ddframe/job"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.dangdang.com/schema/ddframe/reghttp://www.dangdang.com/schema/ddframe/reg/reg.xsdhttp://www.dangdang.com/schema/ddframe/jobhttp://www.dangdang.com/schema/ddframe/job/job.xsd">
<reg:zookeeper id="sample-job"namespace="hep-web-task"base-sleep-time-milliseconds="1000"max-sleep-time-milliseconds="3000"session-timeout-milliseconds="6000"connection-timeout-milliseconds="3000"max-retries="3"server-lists="192.168.159.179:2181"/>
<job:simple id="MyJob" class="com.×××.×××.web.controller.elastic.MySpringElasticJob"registry-center-ref="sample-job"cron="0 42 15 * * ?"sharding-total-count="1"disabled="false"monitor-execution="true"failover="true"misfire="true"overwrite="true"description="样例任务作业">
</job:simple>
</beans>
作业类型
elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。这里主要讲解前两者。Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本,使用不多,可以参见github文档。
SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job。
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。 可通过DataflowJobConfiguration配置是否流式处理。 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。 实际开发中,Dataflow类型的job还是很有好用的。
其他功能
上述介绍的是最精简常用的功能。elastic-job的功能集还不止这些,比如像作业事件追踪、任务监听等,另外,elastic-job-lite-console作为一个独立的运维平台还提供了用来查询和操作任务的web页面。读者可以在http://elasticjob.io/自行学习。
使用体会
遇到的问题
本项目中任务的功能是通过前端页面的管理模式,任务信息保存在数据库中(和elastic-job的运维平台类似)。在经过对elastic-job的认识和各种百度后,依然没有好的或自认为可行的方案,陷入了无从下手的境地,主要是不知怎样以配置的方式(或是代码的方式);其实都有试过但还是不能满足业务需要。另外就是当这个问题解决后(转化方式后)又碰到了cron表达式时间点与当前日期时间的判断问题(当解决后,见解决方式,这也不是问题啦,哈哈哈)。
解决方式
第一个问题,在困扰了许久后,在前辈的点化和帮助下, 采取只注册一个大的任务到Elastic-Job,而它的任务就是定时扫描库(我数据库保存的任务信息),当满足条件(时间点 cron表达式方式)才执行任务;这样确实巧妙的解决了之前的困扰,但是又产生出第二个问题,即如何判断满足条件(判断cron表达式的执行时间与当前时间是否一致),又一次经过一段时间的摸索(主要是当时项目时间宽裕,哈哈哈);终于在自己的不断搜索下,找到了解决方式:具体见这个博客http://blog.csdn.net/cpf2016/article/details/46625457,在此要感谢博主的热情分享。 这里还是要说一下思路:首先,在elastic-job中注册一个大的任务定期【(周期时间粒度)可自行根据实际需要设置】去查询数据库里的任务信息表,然后循环匹配该任务信息的Cron表达式与当前日期时间是否吻合,若是的(这里可以适当放宽些,即并非要相等才吻合,相差一分钟也是可以的,当然看你自己的想法和需要了)则执行作业;其次是判断Cron表达式与当前日期时间吻合,其实是通过Cron表达式取得最近一次的执行时间与当前日期时间比较(这样瞬间就把难题解决了,再次感谢上面提到的博文及博主);最后就是考虑到实际的任务作业的执行时间或任务列表的数量,后期会采用kafka的消息队列模式供web端消费执行,这样就不会现行的系统(当然只是这样想,具体还是不是很明确)望大神多多指教!