大数据批处理比较 spring batch vs flink vs stream Parallel
摘要:本文主要通过实际案例的对比分析,选择适合自己大数据批处理的应用技术方案
为什么使用批处理 ?
场景
- 数据导入
- 导入场景需要开启事务,保证数据一致性,要么全部成功,要么全部失败
- 实时在线处理,对响应时间有较高要求
- 批量查询
- 实时在线处理,对响应时间有较高要求
针对以上场景,我们采用传统的处理方法无法满足响应时间的要求,我们首先想到的是多线程编程处理,多线程解决方案是没有问题的,问题点是多线程并行编程要求高,难度大,那么是否有开源界是否有响应的解决方案呢?答案是:yes.
本文对批处理的方案进行了对比,总结、归纳:
场景:一万条数据保存到student表,MySQL数据库
表结构
REATE TABLE `student` (`id` int(20) NOT NULL AUTO_INCREMENT,`name` varchar(20) DEFAULT NULL,`age` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=220000 DEFAULT CHARSET=utf8;
spring batch vs flink vs stream Parallel 性能对比
方案 | 执行时间 | 备注 |
---|---|---|
循环方案 | 34391s | |
stream Parallel | 8384s | |
springboot batch | 1035s | |
flink | 4310s |
准备工作
Springboot 工程 pom.xml 配置
依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.1.RELEASE</version><relativePath /> <!-- lookup parent from repository --></parent>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.40</version><!--$NO-MVN-MAN-VER$ --><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId><scope>compile</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><scope>compile</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional><scope>compile</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.0.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>javax.xml.bind</groupId><artifactId>jaxb-api</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.9.0</version></dependency>
温馨提示 Springboot 版本选用 2.0.1.RELEASE ,否则有兼容问题
application.properties
server.port=8070
spring.application.name=sea-spring-boot-batch
spring.batch.initialize-schema=always
spring.jpa.generate-ddl=true
mybatis.config-location=classpath:mybatis-config.xml
mybatis.mapper-locations=classpath:mybatis/*.xml
mybatis.type-aliases-package=org.sea.spring.cloud.nacos.modelspring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=mysql
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
mybatis-config.xml
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"><configuration> <!-- 全局参数 --> <settings> <!-- 使全局的映射器启用或禁用缓存。 --> <setting name="cacheEnabled" value="true"/> <!-- 全局启用或禁用延迟加载。当禁用时,所有关联对象都会即时加载。 --> <setting name="lazyLoadingEnabled" value="true"/> <!-- 当启用时,有延迟加载属性的对象在被调用时将会完全加载任意属性。否则,每种属性将会按需要加载。 --> <setting name="aggressiveLazyLoading" value="true"/> <!-- 是否允许单条sql 返回多个数据集 (取决于驱动的兼容性) default:true --> <setting name="multipleResultSetsEnabled" value="true"/> <!-- 是否可以使用列的别名 (取决于驱动的兼容性) default:true --> <setting name="useColumnLabel" value="true"/> <!-- 允许JDBC 生成主键。需要驱动器支持。如果设为了true,这个设置将强制使用被生成的主键,有一些驱动器不兼容不过仍然可以执行。 default:false --> <setting name="useGeneratedKeys" value="true"/> <!-- 指定 MyBatis 如何自动映射 数据基表的列 NONE:不隐射 PARTIAL:部分 FULL:全部 --> <setting name="autoMappingBehavior" value="PARTIAL"/> <!-- 这是默认的执行类型 (SIMPLE: 简单; REUSE: 执行器可能重复使用prepared statements语句;BATCH: 执行器可以重复执行语句和批量更新) --> <setting name="defaultExecutorType" value="SIMPLE"/> <!-- 使用驼峰命名法转换字段。 --> <setting name="mapUnderscoreToCamelCase" value="true"/> <!-- 设置本地缓存范围 session:就会有数据的共享 statement:语句范围 (这样就不会有数据的共享 ) defalut:session --> <setting name="localCacheScope" value="SESSION"/> <!-- 设置但JDBC类型为空时,某些驱动程序 要指定值,default:OTHER,插入空值时不需要指定类型 --> <setting name="jdbcTypeForNull" value="NULL"/> </settings>
</configuration>
编写测试
for循环测试用例和 parallelStream测试用例自己编写,不重点讲了,重点是springbootbatch 、 flink.
springbootbatch 测试用例
package org.sea.spring.boot.batch.job;import java.util.Iterator;
import java.util.List;import org.sea.spring.boot.batch.model.StudentEntity;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;public class InputStudentItemReader implements ItemReader<StudentEntity>{
private final Iterator<StudentEntity> iterator;public InputStudentItemReader(List<StudentEntity> data) {
this.iterator = data.iterator();}@Overridepublic StudentEntity read()throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (iterator.hasNext()) {
return this.iterator.next();} else {
return null;}}}
package org.sea.spring.boot.batch.job;
import java.util.Set;import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;public class StudentBeanValidator <T> implements Validator<T>, InitializingBean{
private javax.validation.Validator validator;@Overridepublic void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();validator = validatorFactory.usingContext().getValidator();}@Overridepublic void validate(T value) throws ValidationException {
Set<ConstraintViolation<T>> constraintViolations = validator.validate(value);if (constraintViolations.size() > 0) {
StringBuilder message = new StringBuilder();for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage()).append("\n");}throw new ValidationException(message.toString());}}}
package org.sea.spring.boot.batch.job;import org.sea.spring.boot.batch.model.StudentEntity;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;public class StudentItemProcessor extends ValidatingItemProcessor<StudentEntity> {
@Overridepublic StudentEntity process(StudentEntity item) throws ValidationException {
super.process(item);return item;}
}
package org.sea.spring.boot.batch.job;import java.util.ArrayList;
import java.util.List;import org.sea.spring.boot.batch.model.StudentEntity;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.validator.Validator;
import org.springframework.batch.support.DatabaseType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;import com.alibaba.druid.pool.DruidDataSource;@Configuration
@EnableBatchProcessing
public class StudentBatchConfig {
/*** ItemReader定义,用来读取数据* @return FlatFileItemReader*/@Bean@StepScopepublic InputStudentItemReader reader() {
List<StudentEntity> list =new ArrayList<StudentEntity>(10000);for(int i=0;i<10000;i++) {
list.add(init(i));}return new InputStudentItemReader(list);}private StudentEntity init(int i) {
StudentEntity student=new StudentEntity();student.setName("name"+i);student.setAge(i);return student;}/*** ItemProcessor定义,用来处理数据** @return*/@Beanpublic ItemProcessor<StudentEntity, StudentEntity> processor() {
StudentItemProcessor processor = new StudentItemProcessor();processor.setValidator(studentBeanValidator());return processor;}@Beanpublic Validator<StudentEntity> studentBeanValidator() {
return new StudentBeanValidator<>();}/*** ItemWriter定义,用来输出数据* spring能让容器中已有的Bean以参数的形式注入,Spring Boot已经为我们定义了dataSource** @param dataSource* @return*/@Beanpublic ItemWriter<StudentEntity> writer(DruidDataSource dataSource) {
JdbcBatchItemWriter<StudentEntity> writer = new JdbcBatchItemWriter<>();//我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());String sql="INSERT INTO student (name,age) values(:name,:age)" ;//在此设置要执行批处理的SQL语句writer.setSql(sql);writer.setDataSource(dataSource);return writer;}/**** @param dataSource* @param transactionManager* @return* @throws Exception*/@Beanpublic JobRepository jobRepository(DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();jobRepositoryFactoryBean.setDataSource(dataSource);jobRepositoryFactoryBean.setTransactionManager(transactionManager);jobRepositoryFactoryBean.setDatabaseType(String.valueOf(DatabaseType.MYSQL));// 下面事务隔离级别的配置是针对Oracle的
// jobRepositoryFactoryBean.setIsolationLevelForCreate(isolationLevelForCreate);jobRepositoryFactoryBean.afterPropertiesSet();return jobRepositoryFactoryBean.getObject();}/*** JobLauncher定义,用来启动Job的接口** @param dataSource* @param transactionManager* @return* @throws Exception*/@Beanpublic SimpleJobLauncher jobLauncher(DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));return jobLauncher;}/*** Job定义,我们要实际执行的任务,包含一个或多个Step** @param jobBuilderFactory* @param s1* @return*/@Beanpublic Job importJob(JobBuilderFactory jobBuilderFactory, Step s1) {
return jobBuilderFactory.get("importJob").incrementer(new RunIdIncrementer()).flow(s1)//为Job指定Step.end().build();}/*** step步骤,包含ItemReader,ItemProcessor和ItemWriter** @param stepBuilderFactory* @param reader* @param writer* @param processor* @return*/@Beanpublic Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<StudentEntity> reader, ItemWriter<StudentEntity> writer,ItemProcessor<StudentEntity, StudentEntity> processor) {
return stepBuilderFactory.get("step1").<StudentEntity, StudentEntity>chunk(1000)//批处理每次提交65000条数据.reader(reader)//给step绑定reader.processor(processor)//给step绑定processor.writer(writer)//给step绑定writer.build();}
}
package org.sea.spring.boot.batch.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.sea.spring.boot.batch.SpringBootBathApplication;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StopWatch;import lombok.extern.slf4j.Slf4j;@RunWith(SpringRunner.class)
@SpringBootTest(classes=SpringBootBathApplication.class)
@Slf4j
public class TestBatchService {
@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate Job importJob;@Testpublic void testBatch1() throws Exception {
StopWatch watch = new StopWatch("testAdd1");watch.start("保存");JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();jobLauncher.run(importJob, jobParameters);watch.stop();log.info(watch.prettyPrint());}
}
flink 测试用例
package org.sea.spring.boot.batch;import java.util.List;import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.sea.spring.boot.batch.model.StudentEntity;public class FLink2Mysql {
private static String driverClass = "com.mysql.jdbc.Driver";private static String dbUrl = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false";private static String userName = "root";private static String passWord = "mysql";public static void add(List<StudentEntity> students) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<StudentEntity> input = env.fromCollection(students);DataStream<Row> ds = input.map(new RichMapFunction<StudentEntity, Row>() {
private static final long serialVersionUID = 1L;@Overridepublic Row map(StudentEntity student) throws Exception {
return Row.of(student.getId(), student.getName(), student.getAge());}});TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
BasicTypeInfo.INT_TYPE_INFO ,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO };JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setDrivername(driverClass).setDBUrl(dbUrl).setUsername(userName).setPassword(passWord).setParameterTypes(fieldTypes).setQuery("insert into student values(?,?,?)").build();sink.emitDataStream(ds);try {
env.execute();} catch (Exception e) {
e.printStackTrace();}}public static void query() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO };RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);// 查询mysqlJDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverClass).setDBUrl(dbUrl).setUsername(userName).setPassword(passWord).setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();DataStreamSource<Row> input1 = env.createInput(jdbcInputFormat);input1.print();try {
env.execute();} catch (Exception e) {
e.printStackTrace();}}
}
package org.sea.spring.boot.batch.test;import java.util.ArrayList;
import java.util.List;import org.junit.Test;
import org.sea.spring.boot.batch.FLink2Mysql;
import org.sea.spring.boot.batch.model.StudentEntity;
import org.springframework.util.StopWatch;import lombok.extern.log4j.Log4j2;
@Log4j2
public class TestFlink {
@Testpublic void test() {
//构造StopWatch watch = new StopWatch("testAdd1");watch.start("构造");List<StudentEntity> list =new ArrayList<StudentEntity>(10000);for(int i=0;i<10000;i++) {
list.add(init(i+210000));}watch.stop();//保存watch.start("保存");FLink2Mysql.add(list);watch.stop();log.info(watch.prettyPrint());}private StudentEntity init(int i) {
StudentEntity student=new StudentEntity();student.setId(i);student.setName("name"+i);student.setAge(i);return student;}
}
github源码:https://github.com/loveseaone/sea-spring-boot-batch.git
如果觉得文章有帮助,关注下作者的公众号,赞个人气,不胜感激!
同时可以下载作者整理的工作10多年来阅读过的电子书籍。
公众号: TalkNewClass