Datax的官方stream插件中, 读写插件各自实现了这些方法
StreamReader:
StreamWriter:
现在开始逐个解释每个函数
Stream插件
- 一.StreamReader
-
- 1.Job的函数:
-
- 1.1 init():
- 1.2 prepare():
- 1.3 split():
- 1.4 post():
- 1.5 entry():
- 1.6 dealColumn():
- 1.7 parseMixupFunctions():
- 2. Task
-
- 2.1 init():
- 2.2 prepare():
- 2.3 startRead():
- 2.4 post():
- 2.5 destroy():
- 2.6 buildOneColumn():
- 2.7 buildOneRecord():
- 二. StreamWriter
-
- 1. Job
-
- 1.1 init():
- 1.2 prepare():
- 1.3 split():
- 1.4 post():
- 1.5 destroy():
- 1.6 validateParameter():
- 2. Task
-
- 2.1 init():
- 2.2 prepare():
- 2.3 startwrite():
- 2.4 post():
- 2.5 destroy():
- 2.6 writeToFile():
- 2.7 recordToString():
- 2.8 buildFilePath():
一.StreamReader
1.Job的函数:
1.1 init():
这个函数主要负责插件的初始化, 其完成的工作:
- 将配置信息保存为类的属性
- 将json格式的column转换为ArrayList的column(通过dealColumn函数)
- 查看是否满足初始化条件(设置sliceRecordCount参数且不能小于1)
1.2 prepare():
stream读插件的prepare为空, 即不进行任何操作
1.3 split():
将要打印的值复制adviceNumber次传递给configurations,返回这个值
1.4 post():
stream读插件的post为空, 即不进行任何操作
1.5 entry():
stream读插件的entry为空, 即不进行任何操作
1.6 dealColumn():
这个函数负责将json格式的column转换成arraylist的column(json.parse), 原本是json的格式编程了arrayList中的字符串
1.7 parseMixupFunctions():
处理混淆函数, 大概的意思是随机在内存中产生值作为读值
2. Task
2.1 init():
首先拿到配置项(column和sliceRecordCount), 然后将这两项分别复制给类的属性
2.2 prepare():
stream读插件的task.prepare为空, 即不执行任何操作
2.3 startRead():
- 调用buildOneRecord函数生成一个记录OneRecord
- 执行sliceRecorCount次, 将OneRecord当作参数使用recordSender.sendToWriter()发送给writer
2.4 post():
stream读插件的task.post为空, 即不执行任何操作
2.5 destroy():
stream读插件的task.destroy为空, 即不执行任何操作
2.6 buildOneColumn():
在官方给的例子中没有调用这个函数, 待补充
2.7 buildOneRecord():
- 使用recordSender.createRecord()函数创建一个record,record的格式为(date: xxx, size: xxx)
- 将每一个字符串的记录转换成configuration形式后放入record中的arraylist中
- 将record作为返回值返回给调用者
二. StreamWriter
1. Job
1.1 init():
- 将配置类复制一遍给自己的类的originalConfig属性
- 如果配置了地址和文件名,那就会去验证路径是否合法等设置(一般没有这一项)
1.2 prepare():
stream写插件的Job.prepare为空, 即不执行任何操作
1.3 split():
将输出属性的配置复制readerTaskNumber次当作参数返回给调用方
1.4 post():
stream写插件的Job.post为空, 即不执行任何操作
1.5 destroy():
stream写插件的Job.destroy为空, 即不执行任何操作
1.6 validateParameter():
验证文件地址和文件名是否合法
- 验证目录是否合法
- 如果不存在目录创建目录
- 如果文件存在就删除文件
- 新建文件
2. Task
2.1 init():
- 配置写入的属性作为类的writerSliceConfig
- 配置fieldDelimiter, print, path, filename, recordNumberforSleep, sleepTime
2.2 prepare():
stream写插件的Task.prepare为空, 即不执行任何操作
2.3 startwrite():
- 如果配置了path和filename就执行写入文件的操作
- 初始化BUfferedWriter和record
- 从recordReciver.getFromReader()接受record, 依次调用recordToString函数将record转换成string后打印到屏幕中
- writer.flush()刷新
2.4 post():
stream写插件的Task.post为空, 即不执行任何操作
2.5 destroy():
stream写插件的Task.destroy为空, 即不执行任何操作
2.6 writeToFile():
文件写入, 官方例子中没用到,待补充
2.7 recordToString():
遍历record中的每个column, 加上分隔符和转义符后变成字符串后返回.
2.8 buildFilePath():
文件操作暂时没用到, 待补充