当前位置: 代码迷 >> 综合 >> Datax DB2插件开发
  详细解决方案

Datax DB2插件开发

热度:6   发布时间:2023-12-06 03:45:58.0

对于datax来说, 其实所有的关系型数据库步骤都差不多, 不同的就在于不同的数据库对于SQL的要求:
例如

开发流程

  • 1. 新建module
  • 2.DB2配置文件处理
  • 3. Java代码
  • 4.构建项目的配置文件
  • 5. 编译后测试运行

1. 新建module

在这里插入图片描述

2.DB2配置文件处理

  1. Datax/db2reader/src/main/resources/{json文件}: 负责生成jar包和json模板
  2. Datax/db2reader/pom.xml: 负责maven编译,配置依赖项
  3. Datax/db2reader/src/main/assembly/package.xml: 用来选择文件生成的地址(assembly文件夹需要自己创建)

代码分别为:

  1. plugin.json和plugin_job_template.json(位于Datax/db2reader/src/main/resource/下)
// plugin.json
{
    "name": "db2reader","class": "com.alibaba.datax.plugin.reader.db2reader.Db2Reader","description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.","developer": "alibaba"
}
// plugin_job_template.json
{
    "name": "db2reader","parameter": {
    "username": "","password": "","column": [],"connection": [{
    "jdbcUrl": [],"table": []}],"where": ""}
}
  1. pom.xml(位于Datax/db2reader下)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>datax-all</artifactId><groupId>com.alibaba.datax</groupId><version>0.0.1-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>db2reader</artifactId><name>db2reader</name><packaging>jar</packaging><dependencies><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-common</artifactId><version>${datax-project-version}</version><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></dependency><dependency><groupId>com.alibaba.datax</groupId><artifactId>plugin-rdbms-util</artifactId><version>${datax-project-version}</version></dependency><!-- https://mvnrepository.com/artifact/com.ibm.db2/jcc --><dependency><groupId>com.ibm.db2</groupId><artifactId>jcc</artifactId><version>11.5.4.0</version></dependency></dependencies><build><plugins><!-- compiler plugin --><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>${jdk-version}</source><target>${jdk-version}</target><encoding>${project-sourceEncoding}</encoding></configuration></plugin><!-- assembly plugin --><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptors><descriptor>src/main/assembly/package.xml</descriptor></descriptors><finalName>datax</finalName></configuration><executions><execution><id>dwzip</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
  1. package.xml(位于Datax/db2reader/src/main/assembly/下, assembly文件夹需要手动创建)
<assemblyxmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"><id></id><formats><format>dir</format></formats><includeBaseDirectory>false</includeBaseDirectory><fileSets><fileSet><directory>src/main/resources</directory><includes><include>plugin.json</include><include>plugin_job_template.json</include></includes><outputDirectory>plugin/reader/db2reader</outputDirectory></fileSet><fileSet><directory>target/</directory><includes><include>db2reader-0.0.1-SNAPSHOT.jar</include></includes><outputDirectory>plugin/reader/db2reader</outputDirectory></fileSet></fileSets><dependencySets><dependencySet><useProjectArtifact>false</useProjectArtifact><outputDirectory>plugin/reader/db2reader/libs</outputDirectory><scope>runtime</scope></dependencySet></dependencySets>
</assembly>

3. Java代码

这里要注意几个修改sql的地方,分别位于job的init和split

package com.alibaba.datax.plugin.reader.db2reader;import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;public class Db2Reader extends Reader {
    private static final DataBaseType DATABASE_TYPE = DataBaseType.DB2;public static class Job extends Reader.Job{
    private static final Logger Log = LoggerFactory.getLogger(Job.class);private Configuration originalConfig = null;private CommonRdbmsReader.Job commonRdbmsReaderJob;@Overridepublic void init(){
    this.originalConfig = super.getPluginJobConf();Integer userConfigFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE, 1024);if (userConfigFetchSize < 1){
    throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE,String.format("您配置的 fetchSize 有误,fetchSize:[%d] 值不能小于 1.",userConfigFetchSize));}// 处理表结构String tableName = this.originalConfig.getString(String.format("%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE));tableName = String.format("\"%s\"", tableName);this.originalConfig.set(String.format("%s[0].%s[0]", Constant.CONN_MARK, Key.TABLE), tableName);this.originalConfig.set(Constant.FETCH_SIZE, userConfigFetchSize);this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);this.commonRdbmsReaderJob.init(this.originalConfig);}@Overridepublic void preCheck(){
    init();this.commonRdbmsReaderJob.preCheck(this.originalConfig, DATABASE_TYPE);}@Overridepublic List<Configuration> split(int adviceNumber){
    // 处理column字段String column = this.originalConfig.getString(Key.COLUMN);String[] columns;columns = column.split(",");String newColumns = "";String thisColumn;for(int j=0; j<columns.length; j++){
    thisColumn = String.format("\"%s\",", columns[j]);newColumns = newColumns + thisColumn;}newColumns = newColumns.substring(0, newColumns.length()-1);this.originalConfig.set(Key.COLUMN, newColumns);return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);}@Overridepublic void post(){
    this.commonRdbmsReaderJob.post(this.originalConfig);}@Overridepublic void destroy(){
    this.commonRdbmsReaderJob.destroy(this.originalConfig);}}public static class Task extends Reader.Task{
    private Configuration readerSliceConfig;private CommonRdbmsReader.Task commonRdbmsReaderTask;@Overridepublic void init(){
    this.readerSliceConfig = super.getPluginJobConf();this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId());this.commonRdbmsReaderTask.init(this.readerSliceConfig);}@Overridepublic void startRead(RecordSender recordSender){
    int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,super.getTaskPluginCollector(), fetchSize);}@Overridepublic void post(){
    this.commonRdbmsReaderTask.post(this.readerSliceConfig);}@Overridepublic void destroy(){
    this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);}}}

4.构建项目的配置文件

修改package.xml和pom.xml(位于Datax/下)
在这里插入图片描述
在这里插入图片描述

5. 编译后测试运行

  1. 执行maven命令进行编译
 mvn -U clean package assembly:assembly -Dmaven.test.skip=true
  1. 构建json文件进行测试
{
    "job": {
    "setting": {
    "speed": {
    "channel": "5"},"errorLimit": {
    "record": 1}},"content": [{
    "reader": {
    "name": "db2reader","parameter": {
    "username": "username","password": "password","column": ["id","age","name","noId","street","phoneNumber","email","bankNo","domain","mac","ip","date"],"connection": [{
    "table": ["Student"],"jdbcUrl": ["jdbc:db2://ip:port/database_name"]}]}},"writer": {
    "name": "mysql8writer","parameter": {
    "writeMode": "insert","username": "username","password": "password","column": ["`id`","`age`","`name`","`noId`","`street`","`phoneNumber`","`email`","`bankNo`","`domain`","`mac`","`ip`","`date`"],"session": ["set session sql_mode='ANSI'"],"preSql": [],"connection": [{
    "jdbcUrl": "jdbc:mysql://ip:port/database_name?characterEncoding=utf8","table": ["`Person3`"]}]}}}]}
}
  1. 运行查看结果
python datax.py mysql8test.json

在这里插入图片描述
4. 成功
在这里插入图片描述
去数据库查看, 同步成功