前言
学习Spark的Structured Streaming(结构化流)。
官网:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
1. 快速示例
博主这里仍然使用的是Spark 集群运行。
假设从侦听TCP套接字的数据服务器接收的文本数据的运行字数。
- 编写代码
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession._// 创建SparkSession
val spark = SparkSession.builder.appName("StructedStreaming").getOrCreate()// 创建流式DataFrame
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 先转换为字符串数据集,然后分割
val words = lines.as[String].flatMap(_.split(" "))// 按值分组并计数
val wordsCount = words.groupBy("value").count()// 输出到控制台, 并启动
val query = wordsCount.writeStream.outputMode("complete").format("console").start()// 防止查询处于活动状态时退出
query.awaitTermination()
- 先启动传输套接字
在master 节点输入:
nc -lk 9999
3. 启动Spark 应用
终端运行 spark-shell,进入scala的shell,复制代码:
在WebUI 下,也可以看到相应的Job 情况:
4. 测试数据
在nc 中输入数据:
可以看到shell中正在执行计算步骤:
计算结果:
再次输入:
你可以看到输出结果包括了之前的数据,是因为输出时设置了选项为 “complete”。
有如下三种输出模式:
也称为“增量查询”:该查询将先前运行的计数与新数据相结合,以计算更新的计数。
如,官网图例:
2. 创建DataFrame和流式数据集
使用本地文件系统,监控一个本地目录,按照csv格式解析:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.types.StructType// 创建schema
val userSchema = new StructType().add("name", "string").add("age","integer")// 创建
val csvDF = spark.readStream.option("sep",";").schema(userSchema).csv("file:///home/hadoop/test_data/")// 输出模式只能选择append
val query = csvDF.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
在/home/hadoop 下创建 test_data目录:
复制代码到shell,运行:
输入测试数据:
结果: