前言
本章节介绍如何将Spark Streaming 和Kafka整合。即从Kafka主题中读取数据,并且是实时流的。
博主的Kafka是集群,如果没有,也可以不适用集群。
1. 安装zookeeper
参考:https://blog.csdn.net/qq_38038143/article/details/84203344
1. 安装Kafka
注:选择Kafka-2.11-2.1.1,即scala选择2.11版,参考链接中使用的2.12。
参考:https://blog.csdn.net/qq_38038143/article/details/88779174
3. 创建测试主题
- 开启zookeeper
- 开启kafka
- 创建主题
kafka-topics.sh --zookeeper master:2181 --create --topic kafka_streaming --replication-factor 1 --partitions 1
- 写数据和读取数据 - 开启两个终端
# 终端1-生产者
kafka-console-producer.sh --broker-list slave1:9092 \
--topic kafka_streaming# 终端2-消费者
kafka-console-consumer.sh --topic kafka_streaming \
--bootstrap-server slave1:9092 \
--from-beginning
如果能够写、和读就行了:
4. pom.xml依赖
Hadoop、hbase等用不到的可以不写。
<properties><scala.version>2.11.8</scala.version><kafka.version>2.1.1</kafka.version><spark.version>2.4.0</spark.version><hadoop.version>2.7.3</hadoop.version><hbase.version>1.4.8</hbase.version><mysql.version>5.1.46</mysql.version></properties><dependencyManagement><dependencies><!--netty confluent--><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.17.Final</version></dependency></dependencies></dependencyManagement><dependencies><!--Spark Streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><!--Spark SQL--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.module</groupId><artifactId>jackson-module-scala_2.11</artifactId><version>2.6.7.1</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scalatest</groupId><artifactId>scalatest</artifactId><version>0.9.1</version></dependency><!--Kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><!--Hadoop--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!--Hbase--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency><!--JDBC--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>org.specs</groupId><artifactId>specs</artifactId><version>1.2.5</version><scope>test</scope></dependency></dependencies><build>.......
5. 代码
注意:全程都要开启kafka的生产者进程,向主题写入数据,才会看到输出效果。
参考官方示例:
spark安装目录:
examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
IDEA中运行:
先运行一次,然后在IDEA右上角点击Edit 设置参数:
以空格分割,依此是broker、消费者组id、主题名称:根据自己修改
代码:KafkaDirectWordCount.scala
package com.gyt.sparkstreamimport org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._object KafkaDirectWordCount {def main(args: Array[String]) {if (args.length < 3) {System.err.println("Usage: KafkaDirectWordCount <brokers> <groupId> <topics>")System.exit(1)}val Array(brokers, groupId, topics) = args// Create context with 2 second batch intervalval sparkConf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(5))// Create direct kafka stream with brokers and topicsval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// Create direct inputStreamval messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))// Get the lines, split them into words, count the words and printval lines = messages.map(_.value)val wordCount = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)wordCount.print()// Start the computationssc.start()ssc.awaitTermination()}
}
运行:
6. 部署到spark服务器
使用Maven 打包项目,进入项目路径下:
执行命令:mvn clean package -DskipTests
(博主执行的命令是mvn install,因为之前执行过,后来报错,所以执行install,执行clean即可)
生成 JAR包:
提交到服务器:
–class 指定main类,是类在项目中的路径
–packages 一个依赖
后面跟上jar包路径和三个参数
spark-submit \
--class com.gyt.sparkstream.KafkaDirectWordCount \
--master local[2] \
--name KafkaDirectWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 \
/home/hadoop/lib/sparkstream-1.0-SNAPSHOT.jar \
slave1:9092 kafka_streaming_group3 kafka_streaming
首先会下载一些东西,然后开始执行:
完整项目GitHub:
https://github.com/GYT0313/Spark-Learning