当前位置: 代码迷 >> 综合 >> Datax 源码流程解析
  详细解决方案

Datax 源码流程解析

热度:20   发布时间:2023-12-06 03:46:14.0

这篇博客主要用来记录自己对datax中stream流的运行过程以及几个关键的运行逻辑:

运行逻辑

  • 一. engine.entry(args)部分
    • 1. 解析命令行的参数
    • 2. 配置configuration
    • 3. 新建一个engine对象
  • 二. engine.start(configurarion)部分
    • 1. 绑定configuration的信息给ColumnCast类和LoadUtil类
    • 2. 判断当前工作是Job还是Taskgroup
  • 三. Jobcontainer.start()
    • 1.运行preHandle()函数
    • 2.init()函数
    • 3. prepare()函数
    • 4. split()函数
    • 5. schedule()函数
    • 6. post()函数
    • 7. postHandle()函数
    • 8. invokeHooks()函数

一. engine.entry(args)部分

这一部分主要有下面这几个任务:

1. 解析命令行的参数

通过调用java的cli包来解析命令行的参数, 并且解析之后将其复制给自己的, 主要是mode,Jobid以及job, 实验时的这三个键对应的值即为

Key Value
mode “standalone”
jobid “-1”
job “{path}\stream2stream.json”

2. 配置configuration

  1. 根据第一步得到的job(即json文件的地址)去打开json文件, 并将其内容存为configuration类的jsonobject属性
  2. 然后去读取core.json文件里的配置项同样将其配置为configuration类中的jsonobject属性
  3. 将configuration自己配置里面的写插件名和读插件名单独取出做成ArrayList后再作为configuration的值
  4. 此时, configuration的属性和值为:
Key Value
core 包含container,dataxserver, transport, statictics的配置
entry “jvm” : “Xms1G-Xmx1G”
common 包含时间, 地区, 编码格式的配置
plugin reader和writer插件的配置(name, path, description, developer, class)
job 这次任务的reader和writer的列属性, 以及并发管道的数量

3. 新建一个engine对象

将configuration作为参数启动engine对象的start方法.

二. engine.start(configurarion)部分

1. 绑定configuration的信息给ColumnCast类和LoadUtil类

  1. 首先是帮i的那个ColumnCast类, 里面会初始化三个Cast分别为StringCast, DateCast, BytesCast
Key Value
StringCast 负责时间, 地区, 编码格式等的配置
DateCast 时间格式的配置
BytesCast 编码格式的配置
  1. 配置LoadUtil, 将所有的配置复制一遍, 方便后面的插件获取这些数据

2. 判断当前工作是Job还是Taskgroup

根据当前工作是形式(配置项中core的配置内容)来判断运行形式, 如果是Job则初始化JobContainer,否则初始化TaskGroupContainer.然后进行运行container.start()方法来执行

三. Jobcontainer.start()

首先进行复制配置项的操作, 将之前做的配置类复制一遍赋值给userConf

1.运行preHandle()函数

steam2stream没有预处理, 所以这个函数什么都不做

2.init()函数

  1. 首先取出JobID, 如果小于0, 再将其改成0, 并且用这个jobID作为线程的线程名
  2. 初始化JobPluginCollector对象
  3. 调用initJobReader()方法
    3.1: 根据插件的种类和插件名得到插件的配置
    3.2: 在对应的插件加载到 jarLoaderCenter中
    3.3: 保存当前的classLoader, 并将当前线程的classLoader设置为所给classloader
    3.4: 初始化jobReader
    3.5: 设置reader的jobconfig, readerconfig, jobPluginCollector
    3.6: 调用插件的init()函数
    3.7: 将当前线程的类加载器设置为保存的类加载
    3.8: 返回jobReader
  4. 调用initJobWriter()方法, 流程和initJobreader几乎一样

3. prepare()函数

  1. 保存当前classLoader,并将当前线程的classLoader设置为所给classLoader
  2. 执行插件的prepare()函数
  3. 将当前线程的类加载器设置为保存的类加载

4. split()函数

  1. 配置channel number
  2. 调用插件的split函数, 得到读写的task任务的配置
  3. 得到transformar的配置list
  4. 将读写的配置合并后赋值给当前类的datax_job_content属性
  5. 返回contentConfig.size, 也就是拆分task的数量

5. schedule()函数

  1. 得到每个task任务的channel数和task的数量并且取较小值为needChannelNumber
  2. 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
  3. 初始化调度器
  4. 使用scheduler.schedule启动调度
    1. 收集Job的信息
    2. 每隔一段时间打印日志信息
  5. 记录时间和运行情况

6. post()函数

  1. 设置当前的Jobloader
  2. 依次执行读写插件的post函数
  3. 保存Jobloader

7. postHandle()函数

  1. 暂时没有调用过, 待补充

8. invokeHooks()函数

  1. 调用getContainerCommuicator().collect()函数初始化comm和invoker
  2. 调用invokeAll(), 一般也用不到, 待补充