当前位置: 代码迷 >> 综合 >> Flume sink hive Flume对接Hive(Sink)遇到的坑,以及最终放弃hive选用hdfs。欢迎讨论指点
  详细解决方案

Flume sink hive Flume对接Hive(Sink)遇到的坑,以及最终放弃hive选用hdfs。欢迎讨论指点

热度:18   发布时间:2023-12-22 02:01:52.0

项目中打算使用Flume把数据直接传到Hive表而不是HDFS上,使用Hive作为Sink,Flume版本为1.9.0。

前期启动遇到各种报错:

NoClassDefFoundError: org/apache/hadoop/hive/ql/session/SessionState

NoClassDefFoundError: org/apache/hadoop/hive/cli/CliSessionState

NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf

NoClassDefFoundError: org/apache/hadoop/conf/Configuration

NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf

java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer

java.lang.ClassNotFoundException: com.esotericsoftware.kryo.Serializer

NoClassDefFoundError: org/antlr/runtime/RecognitionException

解决:

将相应的jar包一股脑拷过去:
例如,CDH中的jar包目录是:

/data/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars

进入到目录后:

scp hive-* root@172.28.65.106:/usr/local/flume/lib
scp hadoop-* root@172.28.65.106:/usr/local/flume/lib
scp antlr-* root@172.28.65.106:/usr/local/flume/lib
scp kryo-2.22.jar root@172.28.65.106:/usr/local/flume/lib
 

设置flume配置文件:

# example.conf: A single-node Flume configuration
 
# Name the components on this agent
video_hive.sources = r1
video_hive.sinks = k1
video_hive.channels = c1
 
# Describe/configure the source
video_hive.sources.r1.type = netcat
video_hive.sources.r1.bind = localhost
video_hive.sources.r1.port = 44444
 
# Describe the sink
video_hive.sinks.k1.type = hive
video_hive.sinks.k1.channel = c1
video_hive.sinks.k1.hive.metastore = thrift://dev07.hadoop.openpf:9083
#video_hive.sinks.k1.hive.metastore = thrift://172.28.23.21:9083
video_hive.sinks.k1.hive.database = recommend_video
video_hive.sinks.k1.hive.table = video_test
#video_hive.sinks.k1.hive.table = user_video_action_log
video_hive.sinks.k1.hive.partition = %Y-%m-%d
#video_hive.sinks.k1.autoCreatePartitions = false
video_hive.sinks.k1.useLocalTimeStamp = true
video_hive.sinks.k1.batchSize = 1500
#video_hive.sinks.k1.round = true
#video_hive.sinks.k1.roundValue = 10
#video_hive.sinks.k1.roundUnit = minute
video_hive.sinks.k1.serializer = DELIMITED
video_hive.sinks.k1.serializer.delimiter = ","
video_hive.sinks.k1.serializer.serdeSeparator = ','
video_hive.sinks.k1.serializer.fieldnames = timestamp,userid,videoid,totaltime,playtime,hits,rate,praise
 
# Use a channel which buffers events in memory
video_hive.channels.c1.type = memory
video_hive.channels.c1.capacity = 2000
video_hive.channels.c1.transactionCapacity = 1500
 
# Bind the source and sink to the channel
video_hive.sources.r1.channels = c1
video_hive.sinks.k1.channel = c1
注意建表语句(clustered分桶、transactional事务、orc存储格式):

create table if not exists video_test ( `timestamp` string, 
  `userid` string, 
  `videoid` string, 
  `totaltime` string, 
  `playtime` string, 
  `rate` string, 
  `hits` string, 
  `praise` string ) COMMENT '用户视频日志'
  partitioned by (date string)
  clustered by (userid) into 5 buckets
  row format delimited fields terminated by ','
  stored as orc
  tblproperties("transactional"='true'); 
终极大招:

最后还不行的话,将hdfs-site.xml,hive-conf.properties,hive-site.xml,hive-env.sh拷到flume的配置文件下。如:/usr/local/flume/conf

大功搞成!!

 

附上 启动flume后台的脚本编写:

#!/bin/sh
 
FLUME_HOME=/usr/local/flume
 
#
#nohup java ${JAVA_OPT} -jar ${APPLICATION_JAR} --spring.profiles.active=dev --spring.config.location="config/" >/dev/null 2>&1 &
nohup flume-ng agent --conf ${FLUME_HOME}/conf --conf-file ${FLUME_HOME}/conf/flume-hive.properties --name video_hive -Dflume.root.logger=INFO,LOGFILE &
 

重点:hive问题点:

flume对接的hive表必须要求是orc格式存储的,通过日志文件读取文档到hive表后,使用where语句查询会有问题(不用条件筛选查询正常)。而且没有找到设置其他存储格式的配置方法,所以最终放弃hive,转向hdfs。。。

附查询错误截图:

如果flume直接对接textfile格式的hive表,也会有问题:

总体感觉,flume对接hive方案还不成熟,存在各种小问题等。

欢迎各路大神讨论交流!!!

 

------------------------------------------------------------------------------------------------------------------------------------------------------------

flume对接hdfs:
# example.conf: A single-node Flume configuration
 
# Name the components on this agent
video_hdfs.sources = s1
video_hdfs.sinks = k1
video_hdfs.channels = c1
 
# Describe/configure the source
##netcat
#video_hdfs.sources.s1.type = netcat
#video_hdfs.sources.s1.bind = localhost
#video_hdfs.sources.s1.port = 44444
##exec
#video_hdfs.sources = s1
#video_hdfs.sources.s1.type = exec
#video_hdfs.sources.s1.command = tail -F /home/recommend/recom-video/logs/user-video.log
#video_hdfs.sources.s1.channels = c1
 
##TAILDIR
video_hdfs.sources.s1.type = TAILDIR
# 元数据位置
video_hdfs.sources.s1.positionFile = /usr/local/flume/conf/taildir_position.json
# 监控的目录
video_hdfs.sources.s1.filegroups = f1
video_hdfs.sources.s1.filegroups.f1 = /home/recommend/recom-video/logs/user-video.log
video_hdfs.sources.s1.fileHeader = true
 
 
# Describe the sink
video_hdfs.sinks.k1.type = hdfs
video_hdfs.sinks.k1.channel = c1
video_hdfs.sinks.k1.type = hdfs
video_hdfs.sinks.k1.hdfs.path = hdfs://nameservice1/user/hive/warehouse/recommend_video.db/video_test/dayid=%Y%m%d
video_hdfs.sinks.k1.hdfs.fileType = DataStream
video_hdfs.sinks.k1.hdfs.writeFormat=TEXT
video_hdfs.sinks.k1.hdfs.filePrefix = events-
video_hdfs.sinks.k1.hdfs.fileSuffix = .log
video_hdfs.sinks.k1.hdfs.useLocalTimeStamp = true
video_hdfs.sinks.k1.hdfs.round = true
video_hdfs.sinks.k1.hdfs.roundValue = 1
video_hdfs.sinks.k1.hdfs.roundUnit = hour
 
# Use a channel which buffers events in memory
video_hdfs.channels.c1.type = memory
video_hdfs.channels.c1.capacity = 20000
video_hdfs.channels.c1.transactionCapacity = 15000
 
# Bind the source and sink to the channel
video_hdfs.sources.s1.channels = c1
video_hdfs.sinks.k1.channel = c1
将日志文件直接存储在hdfs中对应的hive表及分区下。但是这样直接通过hive语句查询不到数据。

 

通过测试,手动创建对应的分区后可以查询到数据。。。

建表语句使用默认textfile即可。

create table if not exists video_test ( 
timestamp string, 
userid string, 
videoid string, 
totaltime string, 
playtime string, 
rate string, 
hits string, 
praise string ) COMMENT '用户视频日志' 
partitioned by (dayid string)
row format delimited fields terminated by ',' ;
 

待补充。

为了将hive表与同步日志关联,需要定时创建表分区。首先编写脚本如下:

create_partitions_video_log.sh

#!/bin/bash
 
#start
now=`date +%Y%m%d%H%M%S`
echo "now: "${now}
echo "START... NOW DATE:" $(date +"%Y-%m-%d %H:%M:%S")
 
#获得后一天的日期
#todayday=`date -d now +%Y%m%d`
tomorrowday=`date -d "+1 day" +%Y%m%d`
echo "the date is: ${tomorrowday}..."
#echo "the date is: ${todayday}..."
 
#
#hive -e "USE recommend_video; ALTER TABLE t_di_user_action_log_id ADD IF NOT EXISTS PARTITION (dayid='${tomorrowday}');"
/usr/bin/hive -e "USE recommend_video; ALTER TABLE video_test ADD IF NOT EXISTS PARTITION (dayid='${tomorrowday}');"
linux服务器crontab定时任务设置如下:

#02 21 * * 1 sh /data/ceshi/hbase-hive-music.sh > /data/ceshi/music.log &
#22 20 * * 1 /data/ceshi/hbase-hive-novel.sh > /data/ceshi/novel.log &
#30 11 * * 1 sh /data/ceshi/hbase-hive-video.sh
#45 11 * * 1 sh /data/ceshi/hbase-hive-mother.sh
#0 12 * * 1 sh /data/ceshi/hbase-hive-sports.sh
06 16 * * * /bin/bash /root/lpz/create_partitions_video_log.sh >> /root/lpz/video_log.log 2>&1
中间会有各种报错(根据不同服务器,可能不一样):

Unable to determine Hadoop version information.
'hadoop version' returned:

Error: JAVA_HOME is not set and could not be found.

解决方案:

/etc/profile文件中增加:export HADOOP_VERSION=2.6.0-cdh5.11.2

 /usr/bin/hive文件中增加:export JAVA_HOME=/data/jdk1.8.0_171

不知道为啥要特意添加这些常量,其实profile已经配置过了。。。。

 

附录:优化后的kafka对接hdfs配置文件:

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
 
# Name the components on this agent
kafka_hdfs.sources = s1
kafka_hdfs.sinks = k1
kafka_hdfs.channels = c1
 
 
#kafka sources
kafka_hdfs.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
kafka_hdfs.sources.s1.batchSize = 100
kafka_hdfs.sources.s1.batchDurationMillis = 10000
kafka_hdfs.sources.s1.kafka.bootstrap.servers = dev05.hadoop.openpf:9092, dev06.hadoop.openpf:9092, dev07.hadoop.openpf:9092
kafka_hdfs.sources.s1.kafka.topics = recom_video_hjq
kafka_hdfs.sources.s1.kafka.consumer.group.id = flume-cdh
 
kafka_hdfs.sources.s1.interceptors = i1
kafka_hdfs.sources.s1.interceptors.i1.type = com.cmcc.hy.recommend.flume.VideoLogInterceptor$Builder
 
 
# Describe the sink
kafka_hdfs.sinks.k1.type = hdfs
kafka_hdfs.sinks.k1.channel = c1
kafka_hdfs.sinks.k1.type = hdfs
#kafka_hdfs.sinks.k1.hdfs.path = hdfs://nameservice1/user/hive/warehouse/recommend_video.db/t_di_user_action_log_id/dayid=%Y%m%d
kafka_hdfs.sinks.k1.hdfs.path = hdfs://nameservice1/tmp/recommend/lpz-cdh/%Y%m%d
kafka_hdfs.sinks.k1.hdfs.fileType = DataStream
kafka_hdfs.sinks.k1.hdfs.writeFormat=TEXT
kafka_hdfs.sinks.k1.hdfs.filePrefix = events
kafka_hdfs.sinks.k1.hdfs.fileSuffix = .log
kafka_hdfs.sinks.k1.hdfs.useLocalTimeStamp = true
kafka_hdfs.sinks.k1.hdfs.rollInterval=600
kafka_hdfs.sinks.k1.hdfs.rollSize=1024000
kafka_hdfs.sinks.k1.hdfs.rollCount=1000
#kafka_hdfs.sinks.k1.hdfs.round = true
#kafka_hdfs.sinks.k1.hdfs.roundValue = 1
#kafka_hdfs.sinks.k1.hdfs.roundUnit = hour
 
# Use a channel which buffers events in memory
kafka_hdfs.channels.c1.type = memory
kafka_hdfs.channels.c1.capacity = 20000
kafka_hdfs.channels.c1.transactionCapacity = 1000
 
# Bind the source and sink to the channel
kafka_hdfs.sources.s1.channels = c1
kafka_hdfs.sinks.k1.channel = c1
 

  相关解决方案