当前位置: 代码迷 >> 综合 >> Datax stream插件源码解析
  详细解决方案

Datax stream插件源码解析

热度:19   发布时间:2023-12-06 03:46:41.0

Datax的官方stream插件中, 读写插件各自实现了这些方法

StreamReader:
在这里插入图片描述
StreamWriter:
在这里插入图片描述

现在开始逐个解释每个函数

Stream插件

  • 一.StreamReader
    • 1.Job的函数:
      • 1.1 init():
      • 1.2 prepare():
      • 1.3 split():
      • 1.4 post():
      • 1.5 entry():
      • 1.6 dealColumn():
      • 1.7 parseMixupFunctions():
    • 2. Task
      • 2.1 init():
      • 2.2 prepare():
      • 2.3 startRead():
      • 2.4 post():
      • 2.5 destroy():
      • 2.6 buildOneColumn():
      • 2.7 buildOneRecord():
  • 二. StreamWriter
    • 1. Job
      • 1.1 init():
      • 1.2 prepare():
      • 1.3 split():
      • 1.4 post():
      • 1.5 destroy():
      • 1.6 validateParameter():
    • 2. Task
      • 2.1 init():
      • 2.2 prepare():
      • 2.3 startwrite():
      • 2.4 post():
      • 2.5 destroy():
      • 2.6 writeToFile():
      • 2.7 recordToString():
      • 2.8 buildFilePath():

一.StreamReader

1.Job的函数:

1.1 init():

这个函数主要负责插件的初始化, 其完成的工作:

  1. 将配置信息保存为类的属性
  2. 将json格式的column转换为ArrayList的column(通过dealColumn函数)
  3. 查看是否满足初始化条件(设置sliceRecordCount参数且不能小于1)

1.2 prepare():

stream读插件的prepare为空, 即不进行任何操作

1.3 split():

将要打印的值复制adviceNumber次传递给configurations,返回这个值

1.4 post():

stream读插件的post为空, 即不进行任何操作

1.5 entry():

stream读插件的entry为空, 即不进行任何操作

1.6 dealColumn():

这个函数负责将json格式的column转换成arraylist的column(json.parse), 原本是json的格式编程了arrayList中的字符串

1.7 parseMixupFunctions():

处理混淆函数, 大概的意思是随机在内存中产生值作为读值

2. Task

2.1 init():

首先拿到配置项(column和sliceRecordCount), 然后将这两项分别复制给类的属性

2.2 prepare():

stream读插件的task.prepare为空, 即不执行任何操作

2.3 startRead():

  1. 调用buildOneRecord函数生成一个记录OneRecord
  2. 执行sliceRecorCount次, 将OneRecord当作参数使用recordSender.sendToWriter()发送给writer

2.4 post():

stream读插件的task.post为空, 即不执行任何操作

2.5 destroy():

stream读插件的task.destroy为空, 即不执行任何操作

2.6 buildOneColumn():

在官方给的例子中没有调用这个函数, 待补充

2.7 buildOneRecord():

  1. 使用recordSender.createRecord()函数创建一个record,record的格式为(date: xxx, size: xxx)
  2. 将每一个字符串的记录转换成configuration形式后放入record中的arraylist中
  3. 将record作为返回值返回给调用者

二. StreamWriter

1. Job

1.1 init():

  1. 将配置类复制一遍给自己的类的originalConfig属性
  2. 如果配置了地址和文件名,那就会去验证路径是否合法等设置(一般没有这一项)

1.2 prepare():

stream写插件的Job.prepare为空, 即不执行任何操作

1.3 split():

将输出属性的配置复制readerTaskNumber次当作参数返回给调用方

1.4 post():

stream写插件的Job.post为空, 即不执行任何操作

1.5 destroy():

stream写插件的Job.destroy为空, 即不执行任何操作

1.6 validateParameter():

验证文件地址和文件名是否合法

  1. 验证目录是否合法
  2. 如果不存在目录创建目录
  3. 如果文件存在就删除文件
  4. 新建文件

2. Task

2.1 init():

  1. 配置写入的属性作为类的writerSliceConfig
  2. 配置fieldDelimiter, print, path, filename, recordNumberforSleep, sleepTime

2.2 prepare():

stream写插件的Task.prepare为空, 即不执行任何操作

2.3 startwrite():

  1. 如果配置了path和filename就执行写入文件的操作
  2. 初始化BUfferedWriter和record
  3. 从recordReciver.getFromReader()接受record, 依次调用recordToString函数将record转换成string后打印到屏幕中
  4. writer.flush()刷新

2.4 post():

stream写插件的Task.post为空, 即不执行任何操作

2.5 destroy():

stream写插件的Task.destroy为空, 即不执行任何操作

2.6 writeToFile():

文件写入, 官方例子中没用到,待补充

2.7 recordToString():

遍历record中的每个column, 加上分隔符和转义符后变成字符串后返回.

2.8 buildFilePath():

文件操作暂时没用到, 待补充

  相关解决方案