当前位置: 代码迷 >> 综合 >> Datax 插件二次开发之parquet日志问题处理
  详细解决方案

Datax 插件二次开发之parquet日志问题处理

热度:99   发布时间:2023-10-10 06:33:26.0

Datax 插件二次开发之parquet日志问题处理

Date: December 31, 2021

参考文档:

https://blog.csdn.net/wuleidaren/article/details/106395549

https://cloud.tencent.com/developer/ask/123020

1.问题背景

1.1 发现问题

前面对Datax进行了插件开发,能够正常使用,但是发现一个问题,每次执行时,在datax 的日志中会出现很多parquet的无效日志,类似下边:

Datax 插件二次开发之parquet日志问题处理

这样,在执行时非常影响日志查看,有效信息被无效信息遮蔽。

1.2 确定问题

发现问题,就要寻找解决办法。

通过查询资料,发现很多仁兄都遇到过类似问题,但是解决方式要么没有,要么含含糊糊。

于是,就自己去看parquet源码,发现parquet项目本身存在日志的bug,里边的日志类用很老的java.util.logging.Logger而不是 apache log4j或者slfj 这种日志框架,而且里边出现了硬编码,导致出现很多日志无法被logback日志框架屏蔽。

parquet.hadoop包中日志打印:

Datax 插件二次开发之parquet日志问题处理

parquet.hadoop中的日志类Log,实际使用的:

Datax 插件二次开发之parquet日志问题处理

因为两个使用的日志类没有关系,不能根据日志等级进行日志屏蔽。

2.问题解决

2.1 寻找类似案例

不过查找,在这个仁兄的博客中看到一些希望:https://blog.csdn.net/wuleidaren/article/details/106395549
对方法二看了下,发现这个想法太不切实际。然后方法三又没有实际的案例,让人不知如下下手。

最后,黄天不负有心人,在腾讯云的这个博客:https://cloud.tencent.com/developer/ask/123020
看到有仁兄在spark上遇到类似的问题。一思考便知,实际两者的问题本质相同,只是出现在不同的地方,因此多次调整和尝试,最终还是解决了这个问题。

2.2 解决方案

在datax中配置控制台日志输出,通过类冲突,限制parquet无效日志的数据,经测试可行。

2.2.1 具体操作

1>创建文件
parquet-logging.properties ,存放到${DATAX_HOME}/conf 目录下,文件内容如下:

# Note: I'm certain not every line here is necessary. I just added them to cover all possible
# class/facility names.you will want to tailor this as per your needs.
.level=WARNING
java.util.logging.ConsoleHandler.level=WARNINGparquet.handlers=java.util.logging.ConsoleHandler
parquet.hadoop.handlers=java.util.logging.ConsoleHandler
org.apache.parquet.handlers=java.util.logging.ConsoleHandler
org.apache.parquet.hadoop.handlers=java.util.logging.ConsoleHandlerparquet.level=WARNING
parquet.hadoop.level=WARNING
org.apache.parquet.level=WARNING
org.apache.parquet.hadoop.level=WARNING

2>修改datax.py 执行脚本

isWindows 函数中添加parquet日志类的相关配置:

PAR_LOG_CONF_FILE = ("%s/conf/parquet-logging.properties" ) % (DATAX_HOME)

并修改DEFAULT_PROPERTY_CONF 变量:

DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener  -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s -Djava.util.logging.config.file=%s" % (DATAX_HOME, LOGBACK_FILE, PAR_LOG_CONF_FILE)

3>进行parquet任务测试

执行datax 的hdfsreader (支持parquet)或者 hdfsparquetwriter 的任务,例如:

python  ${DATAX_HOME}/bin/datax.py  test_hdfswriter_parquet.job  

查看执行日志:

Datax 插件二次开发之parquet日志问题处理

发现已经没有parquet日志了。

4>再次确认方法可行

在datax的日志中,仔细看,能看到一些关于parquet类冲突的日志,如下:

Datax 插件二次开发之parquet日志问题处理

说明,在使用java.util.logging.ConsoleHandler 作为parquet 日志处理类后,由于出现两个binding,datax种选择了后者【shaded.parquet.org.slf4j.helpers.NOPLoggerFactory】,从而屏蔽了warning以下级别的日志。

2.2.2 总结

自此,parquet日志问题得以解决。我们在解决相关问题的时候,要学会变通,不能一条路堵死,就走不下去了。

就像这里,datax屏蔽日志没有案例,我们可以找其他场景的类似案例;直接屏蔽日志屏蔽不了,就考虑使用日志绑定的方式,不适用parquet自带的日志类,从而曲线救国达到同样效果。

最后,附上 datax.py 的内容:

#!/usr/bin/env python
# -*- coding:utf-8 -*-import codecs
import json
import os
import platform
import re
import signal
import socket
import subprocess
import sys
import time
from optparse import OptionGroup
from optparse import OptionParser
from string import Templateispy2 = sys.version_info.major == 2def isWindows():return platform.system() == 'Windows'DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
PAR_LOG_CONF_FILE = ("%s/conf/parquet-logging.properties" ) % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s -Djava.util.logging.config.file=%s" % (DATAX_HOME, LOGBACK_FILE, PAR_LOG_CONF_FILE)ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"RET_STATE = {
    "KILL": 143,"FAIL": -1,"OK": 0,"RUN": 1,"RETRY": 2
}def getLocalIp():try:return socket.gethostbyname(socket.getfqdn(socket.gethostname()))except:return "Unknown"def suicide(signum, e):global child_processif ispy2:print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)else:print("[Error] DataX receive unexpected signal %d, starts to suicide." % (signum), sys.stderr)if child_process:child_process.send_signal(signal.SIGQUIT)time.sleep(1)child_process.kill()if ispy2:print >> sys.stderr, "DataX Process was killed ! you did ?"else:print("DataX Process was killed ! you did ?", sys.stderr)sys.exit(RET_STATE["KILL"])def register_signal():if not isWindows():global child_processsignal.signal(2, suicide)signal.signal(3, suicide)signal.signal(15, suicide)def getOptionParser():usage = "usage: %prog [options] job-url-or-path"parser = OptionParser(usage=usage)prodEnvOptionGroup = OptionGroup(parser, "Product Env Options","Normal user use these options to set jvm parameters, job runtime mode etc. ""Make sure these options can be used in Product Env.")prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",default=DEFAULT_JVM, help="Set jvm parameters if necessary.")prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",help="Set job unique id when running by Distribute/Local Mode.")prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",action="store", default="standalone",help="Set job runtime mode such as: standalone, local, distribute. ""Default mode is standalone.")prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",action="store", dest="params",help='Set job parameter, eg: the source tableName you want to set it by command, ''then you can use like this: -p"-DtableName=your-table-name", ''if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".''Note: you should config in you job tableName with ${tableName}.')prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",action="store", dest="reader", type="string",help='View job config[reader] template, eg: mysqlreader,streamreader')prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",action="store", dest="writer", type="string",help='View job config[writer] template, eg: mysqlwriter,streamwriter')parser.add_option_group(prodEnvOptionGroup)devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options","Developer use these options to trace more details of DataX.")devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",help="Set to remote debug mode.")devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",default="info", help="Set log level such as: debug, info, all etc.")parser.add_option_group(devEnvOptionGroup)return parserdef generateJobConfigTemplate(reader, writer):readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader, reader, reader)writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer, writer, writer)print(readerRef)print(writerRef)jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'print(jobGuid)jobTemplate = {
    "job": {
    "setting": {
    "speed": {
    "channel": ""}},"content": [{
    "reader": {
    },"writer": {
    }}]}}readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME, reader)writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME, writer)try:readerPar = readPluginTemplate(readerTemplatePath)except:print("Read reader[%s] template error: can\'t find file %s" % (reader, readerTemplatePath))try:writerPar = readPluginTemplate(writerTemplatePath)except:print("Read writer[%s] template error: : can\'t find file %s" % (writer, writerTemplatePath))jobTemplate['job']['content'][0]['reader'] = readerParjobTemplate['job']['content'][0]['writer'] = writerParprint(json.dumps(jobTemplate, indent=4, sort_keys=True))def readPluginTemplate(plugin):with open(plugin, 'r') as f:return json.load(f)def isUrl(path):if not path:return Falseassert (isinstance(path, str))m = re.match(r"^http[s]?://\S+\w*", path.lower())if m:return Trueelse:return Falsedef buildStartCommand(options, args):commandMap = {
    }tempJVMCommand = DEFAULT_JVMif options.jvmParameters:tempJVMCommand = tempJVMCommand + " " + options.jvmParametersif options.remoteDebug:tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIGprint('local ip: ', getLocalIp())if options.loglevel:tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))if options.mode:commandMap["mode"] = options.mode# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)jobResource = args[0]if not isUrl(jobResource):jobResource = os.path.abspath(jobResource)if jobResource.lower().startswith("file://"):jobResource = jobResource[len("file://"):]jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))if options.params:jobParams = jobParams + " " + options.paramsif options.jobid:commandMap["jobid"] = options.jobidcommandMap["jvm"] = tempJVMCommandcommandMap["params"] = jobParamscommandMap["job"] = jobResourcereturn Template(ENGINE_COMMAND).substitute(**commandMap)def printCopyright():print(''' DataX (%s), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)sys.stdout.flush()if __name__ == "__main__":printCopyright()parser = getOptionParser()options, args = parser.parse_args(sys.argv[1:])if options.reader is not None and options.writer is not None:generateJobConfigTemplate(options.reader, options.writer)sys.exit(RET_STATE['OK'])if len(args) != 1:parser.print_help()sys.exit(RET_STATE['FAIL'])startCommand = buildStartCommand(options, args)# print startCommandchild_process = subprocess.Popen(startCommand, shell=True)register_signal()(stdout, stderr) = child_process.communicate()sys.exit(child_process.returncode)

如果想下载文件,或者Datax 的 hdfsreader(支持parquet读) 和 hdfsparquetwriter (支持parquet写)相关插件,请关注我的代码仓库:https://gitee.com/jackielee4cn/DataX.git

欢迎批评指正或者Star 。

  相关解决方案