无论使用哪种方式,flume的sink 一定要是avro格式。。。
这里给出 pom.xml 依赖代码:
<properties><scala.version>2.11.8</scala.version><kafka.version>2.1.1</kafka.version><spark.version>2.3.2</spark.version><hadoop.version>2.7.3</hadoop.version><hbase.version>1.4.8</hbase.version><mysql.version>5.1.46</mysql.version><flume.version>1.9.0</flume.version></properties><dependencyManagement><dependencies><!--netty confluent--><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.17.Final</version></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.jboss.netty</groupId><artifactId>netty</artifactId><version>3.2.4.Final</version></dependency><!--Spark Streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><!--Spark SQL--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><!--Flume- run by spark-submit need--><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-auth</artifactId><version>${flume.version}</version></dependency><dependency><groupId>org.mortbay.jetty</groupId><artifactId>servlet-api</artifactId><version>2.5-20110124</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume-sink_2.11</artifactId><version>${spark.version}</version></dependency><!--flume_pull need--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.5</version></dependency><!--Streaming-Flume-No version2.4.0--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.11</artifactId><version>${spark.version}</version></dependency><!--Avro-why need: run flume get a avro wrong-java.lang.AbstractMethodError--><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.7.4</version></dependency><!--Streaming-Kafka--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.module</groupId><artifactId>jackson-module-scala_2.11</artifactId><version>2.6.7.1</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scalatest</groupId><artifactId>scalatest</artifactId><version>0.9.1</version></dependency><!--Kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><!--Hadoop--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!--Hbase--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency><!--JDBC--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>org.specs</groupId><artifactId>specs</artifactId><version>1.2.5</version><scope>test</scope></dependency><!--log to flume--><dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>${flume.version}</version></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-sdk</artifactId><version>${flume.version}</version></dependency><!--project run by spark-submit wrong--><dependency><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId><version>1.3.0</version></dependency></dependencies><build>.。。。。。。
1. push 方式
flume 配置文件:
在flume-1.9.0/conf 下创建:
flume_push_streaming.conf
# Name the components on this agent
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel# Describe/configure the source
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = master
simple-agent.sources.netcat-source.port = 44444# Describe the sink
simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = master
simple-agent.sinks.avro-sink.port = 41414# Use a channel which buffers events in memory
simple-agent.channels.memory-channel.type = memory# Bind the source and sink to the channel
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel
Spark Streaming 代码:
搭建IDEA 的Spark开发环境:https://blog.csdn.net/qq_38038143/article/details/89926205
package com.gyt.sparkstreamimport org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Spark Streaming整合Flume的第一种方式*/object FlumePushWordCount {def main(args: Array[String]): Unit = {if (args.length != 2) {System.err.println("Usage: FlumePushWordCount <hostname> <port>")System.exit(1)}val Array(hostname, port) = argsval saprkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")val ssc = new StreamingContext(saprkConf, Seconds(4))//TODO... 如何使用Spark Streaming整合Flumeval flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)flumeStream.map(x => new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()}
}
注意顺序:
先运行Spark,再启动flume:
设置参数(第一次运行报错,然后点击右上角Edit ):
程序已经运行:
启动flume:
# 运行
bin/flume-ng agent \
--name simple-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console
运行telnet:
2. pull
在flume-1.9.0/conf 下创建:
flume_pull_streaming.conf
其实与上面的配置文件一样,只是名称不同。。。
# Name the components on this agent
simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel# Describe/configure the source
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = master
simple-agent.sources.netcat-source.port = 44444# Describe the sink
simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = master
simple-agent.sinks.spark-sink.port = 41414# Use a channel which buffers events in memory
simple-agent.channels.memory-channel.type = memory# Bind the source and sink to the channel
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel
Spark 代码:
package com.gyt.sparkstreamimport org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Spark Streaming整合Flume的第二种方式*/object FlumePullWordCount {def main(args: Array[String]): Unit = {if (args.length != 2) {System.err.println("Usage: FlumePullWordCount <hostname> <port>")System.exit(1)}val Array(hostname, port) = argsval saprkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")val ssc = new StreamingContext(saprkConf, Seconds(4))//TODO... 如何使用Spark Streaming整合Flumeval flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)flumeStream.map(x => new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()}
}
启动顺序相反:
先启动flume:
# 运行
bin/flume-ng agent \
--name simple-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume_pull_streaming.conf \
-Dflume.root.logger=INFO,console
运行spark 程序:
运行telnet :