这篇博客主要用来记录自己对datax中stream流的运行过程以及几个关键的运行逻辑:
运行逻辑
- 一. engine.entry(args)部分
-
- 1. 解析命令行的参数
- 2. 配置configuration
- 3. 新建一个engine对象
- 二. engine.start(configurarion)部分
-
- 1. 绑定configuration的信息给ColumnCast类和LoadUtil类
- 2. 判断当前工作是Job还是Taskgroup
- 三. Jobcontainer.start()
-
- 1.运行preHandle()函数
- 2.init()函数
- 3. prepare()函数
- 4. split()函数
- 5. schedule()函数
- 6. post()函数
- 7. postHandle()函数
- 8. invokeHooks()函数
一. engine.entry(args)部分
这一部分主要有下面这几个任务:
1. 解析命令行的参数
通过调用java的cli包来解析命令行的参数, 并且解析之后将其复制给自己的, 主要是mode,Jobid以及job, 实验时的这三个键对应的值即为
Key | Value |
---|---|
mode | “standalone” |
jobid | “-1” |
job | “{path}\stream2stream.json” |
2. 配置configuration
- 根据第一步得到的job(即json文件的地址)去打开json文件, 并将其内容存为configuration类的jsonobject属性
- 然后去读取core.json文件里的配置项同样将其配置为configuration类中的jsonobject属性
- 将configuration自己配置里面的写插件名和读插件名单独取出做成ArrayList后再作为configuration的值
- 此时, configuration的属性和值为:
Key | Value |
---|---|
core | 包含container,dataxserver, transport, statictics的配置 |
entry | “jvm” : “Xms1G-Xmx1G” |
common | 包含时间, 地区, 编码格式的配置 |
plugin | reader和writer插件的配置(name, path, description, developer, class) |
job | 这次任务的reader和writer的列属性, 以及并发管道的数量 |
3. 新建一个engine对象
将configuration作为参数启动engine对象的start方法.
二. engine.start(configurarion)部分
1. 绑定configuration的信息给ColumnCast类和LoadUtil类
- 首先是帮i的那个ColumnCast类, 里面会初始化三个Cast分别为StringCast, DateCast, BytesCast
Key | Value |
---|---|
StringCast | 负责时间, 地区, 编码格式等的配置 |
DateCast | 时间格式的配置 |
BytesCast | 编码格式的配置 |
- 配置LoadUtil, 将所有的配置复制一遍, 方便后面的插件获取这些数据
2. 判断当前工作是Job还是Taskgroup
根据当前工作是形式(配置项中core的配置内容)来判断运行形式, 如果是Job则初始化JobContainer,否则初始化TaskGroupContainer.然后进行运行container.start()方法来执行
三. Jobcontainer.start()
首先进行复制配置项的操作, 将之前做的配置类复制一遍赋值给userConf
1.运行preHandle()函数
steam2stream没有预处理, 所以这个函数什么都不做
2.init()函数
- 首先取出JobID, 如果小于0, 再将其改成0, 并且用这个jobID作为线程的线程名
- 初始化JobPluginCollector对象
- 调用initJobReader()方法
3.1: 根据插件的种类和插件名得到插件的配置
3.2: 在对应的插件加载到 jarLoaderCenter中
3.3: 保存当前的classLoader, 并将当前线程的classLoader设置为所给classloader
3.4: 初始化jobReader
3.5: 设置reader的jobconfig, readerconfig, jobPluginCollector
3.6: 调用插件的init()函数
3.7: 将当前线程的类加载器设置为保存的类加载
3.8: 返回jobReader - 调用initJobWriter()方法, 流程和initJobreader几乎一样
3. prepare()函数
- 保存当前classLoader,并将当前线程的classLoader设置为所给classLoader
- 执行插件的prepare()函数
- 将当前线程的类加载器设置为保存的类加载
4. split()函数
- 配置channel number
- 调用插件的split函数, 得到读写的task任务的配置
- 得到transformar的配置list
- 将读写的配置合并后赋值给当前类的datax_job_content属性
- 返回contentConfig.size, 也就是拆分task的数量
5. schedule()函数
- 得到每个task任务的channel数和task的数量并且取较小值为needChannelNumber
- 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
- 初始化调度器
- 使用scheduler.schedule启动调度
- 收集Job的信息
- 每隔一段时间打印日志信息
- 记录时间和运行情况
6. post()函数
- 设置当前的Jobloader
- 依次执行读写插件的post函数
- 保存Jobloader
7. postHandle()函数
- 暂时没有调用过, 待补充
8. invokeHooks()函数
- 调用getContainerCommuicator().collect()函数初始化comm和invoker
- 调用invokeAll(), 一般也用不到, 待补充