当前位置: 代码迷 >> 综合 >> 11-Structured Streaming -- Scala版本
  详细解决方案

11-Structured Streaming -- Scala版本

热度:79   发布时间:2023-12-04 14:08:41.0

前言

学习Spark的Structured Streaming(结构化流)。
官网:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

1. 快速示例

博主这里仍然使用的是Spark 集群运行。
假设从侦听TCP套接字的数据服务器接收的文本数据的运行字数。

  1. 编写代码
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession._// 创建SparkSession
val spark = SparkSession.builder.appName("StructedStreaming").getOrCreate()// 创建流式DataFrame
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 先转换为字符串数据集,然后分割
val words = lines.as[String].flatMap(_.split(" "))// 按值分组并计数
val wordsCount = words.groupBy("value").count()// 输出到控制台, 并启动
val query = wordsCount.writeStream.outputMode("complete").format("console").start()// 防止查询处于活动状态时退出
query.awaitTermination()
  1. 先启动传输套接字
    在master 节点输入:
nc -lk 9999

在这里插入图片描述
3. 启动Spark 应用
终端运行 spark-shell,进入scala的shell,复制代码:
在这里插入图片描述
在WebUI 下,也可以看到相应的Job 情况:
在这里插入图片描述
4. 测试数据
在nc 中输入数据:
在这里插入图片描述
可以看到shell中正在执行计算步骤:
在这里插入图片描述
计算结果:
在这里插入图片描述
再次输入:
在这里插入图片描述
在这里插入图片描述
你可以看到输出结果包括了之前的数据,是因为输出时设置了选项为 “complete”。
有如下三种输出模式:
在这里插入图片描述
也称为“增量查询”:该查询将先前运行的计数与新数据相结合,以计算更新的计数。

如,官网图例:
在这里插入图片描述

2. 创建DataFrame和流式数据集

使用本地文件系统,监控一个本地目录,按照csv格式解析:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.types.StructType// 创建schema
val userSchema = new StructType().add("name", "string").add("age","integer")// 创建
val csvDF = spark.readStream.option("sep",";").schema(userSchema).csv("file:///home/hadoop/test_data/")// 输出模式只能选择append
val query = csvDF.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

在/home/hadoop 下创建 test_data目录:
在这里插入图片描述
复制代码到shell,运行:
在这里插入图片描述
输入测试数据:
在这里插入图片描述
结果:
在这里插入图片描述

完!

  相关解决方案