当前位置: 代码迷 >> 综合 >> Structured Streaming 快速入门系列(三)Structured Streaming 实战之 Sink
  详细解决方案

Structured Streaming 快速入门系列(三)Structured Streaming 实战之 Sink

热度:49   发布时间:2024-02-20 12:41:52.0

文章目录

  • Sink
    • HDFS Sink
    • Kafka Sink
    • Foreach Writer
    • 自定义 Sink
    • Tigger
    • 从 Source 到 Sink 的流程
    • 错误恢复和容错语义

Sink

目标和步骤

目标

  • 能够串联两端, 理解整个流式应用, 以及其中的一些根本的原理, 比如说容错语义
  • 能够知道如何对接外部系统, 写入数据

步骤

  • HDFS Sink
  • Kafka Sink
  • Foreach Sink
  • 自定义 Sink
  • Tiggers
  • Sink 原理
  • 错误恢复和容错语义

HDFS Sink

目标和步骤

目标

能够使用 Spark 将流式数据的处理结果放入 HDFS

步骤

  • 场景和需求
  • 代码实现

场景和需求

场景

  • Kafka 往往作为数据系统和业务系统之间的桥梁
  • 数据系统一般由批量处理和流式处理两个部分组成
  • 在 Kafka 作为整个数据平台入口的场景下, 需要使用 StructuredStreaming 接收 Kafka 的数据并放置于 HDFS 上, 后续才可以进行批量处理

在这里插入图片描述
案例需求

从 Kafka 接收数据, 从给定的数据集中, 裁剪部分列, 落地于 HDFS

代码实现
步骤说明

  • 从 Kafka 读取数据, 生成源数据集
  • 连接 Kafka 生成 DataFrame
  • 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型
  • 对源数据集选择列
  • 解析 CSV 格式的数据
  • 生成正确类型的结果集
  • 落地 HDFS

整体代码

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {
    item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")result.writeStream.format("parquet") // 也可以是 "orc", "json", "csv" 等.option("path", "/dataset/streaming/result/").start()

Kafka Sink

目标和步骤

目标

掌握什么时候要将流式数据落地至 Kafka, 以及如何落地至 Kafka

步骤

  • 场景
  • 代码

场景

  • 有很多时候, ETL 过后的数据, 需要再次放入 Kafka
  • 在 Kafka 后, 可能会有流式程序统一将数据落地到 HDFS 或者 HBase

在这里插入图片描述
案例需求

从 Kafka 中获取数据, 简单处理, 再次放入 Kafka

代码

步骤

  • 从 Kafka 读取数据, 生成源数据集
  • 连接 Kafka 生成 DataFrame
  • 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型
  • 对源数据集选择列
  • 解析 CSV 格式的数据
  • 生成正确类型的结果集
  • 再次落地 Kafka

代码

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {
    item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")result.writeStream.format("kafka").outputMode(OutputMode.Append()).option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("topic", "streaming-bank-result").start().awaitTermination()

Foreach Writer

目标和步骤

目标

掌握 Foreach 模式理解如何扩展 Structured Streaming 的 Sink, 同时能够将数据落地到 MySQL

步骤

  • 需求
  • 代码

需求

场景

大数据有一个常见的应用场景

  • 收集业务系统数据
  • 数据处理
  • 放入 OLTP 数据
  • 外部通过 ECharts 获取并处理数据

这个场景下, StructuredStreaming 就需要处理数据并放入 MySQL 或者 MongoDB, HBase 中以供 Web 程序可以获取数据, 图表的形式展示在前端

在这里插入图片描述

Foreach 模式::

起因

  • 在 Structured Streaming 中, 并未提供完整的 MySQL/JDBC 整合工具
  • 不止 MySQL 和 JDBC, 可能会有其它的目标端需要写入
  • 很多时候 Structured Streaming 需要对接一些第三方的系统, 例如阿里云的云存储, 亚马逊云的云存储等, 但是 Spark 无法对所有第三方都提供支持, 有时候需要自己编写

解决方案

在这里插入图片描述

  • 既然无法满足所有的整合需求, StructuredStreaming 提供了 Foreach, 可以拿到每一个批次的数据
  • 通过 Foreach 拿到数据后, 可以通过自定义写入方式, 从而将数据落地到其它的系统

案例需求::

在这里插入图片描述
从 Kafka 中获取数据, 处理后放入 MySQL

步骤

  • 创建 DataFrame 表示 Kafka 数据源
  • 在源 DataFrame 中选择三列数据
  • 创建 ForeachWriter 接收每一个批次的数据落地 MySQL
  • Foreach 落地数据

代码


import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {
    item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")class MySQLWriter extends ForeachWriter[Row] {
    val driver = "com.mysql.jdbc.Driver"var statement: Statement = _var connection: Connection  = _val url: String = "jdbc:mysql://Bigdata01:3306/streaming-bank-result"val user: String = "root"val pwd: String = "000000"override def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)connection = DriverManager.getConnection(url, user, pwd)this.statement = connection.createStatementtrue}override def process(value: Row): Unit = {
    statement.executeUpdate(s"insert into bank values(" +s"${value.getAs[Int]("age")}, " +s"${value.getAs[Int]("job")}, " +s"${value.getAs[Int]("balance")} )")}override def close(errorOrNull: Throwable): Unit = {
    connection.close()}
}result.writeStream.foreach(new MySQLWriter).start().awaitTermination()

自定义 Sink

目标和步骤

目标

  • Foreach 倾向于一次处理一条数据, 如果想拿到 DataFrame 幂等的插入外部数据源, 则需要自定义 Sink
  • 了解如何自定义 Sink

步骤

  • Spark 加载 Sink 流程分析
  • 自定义 Sink

Spark 加载 Sink 流程分析
Sink 加载流程

writeStream 方法中会创建一个 DataStreamWriter 对象

def writeStream: DataStreamWriter[T] = {
    if (!isStreaming) {
    logicalPlan.failAnalysis("'writeStream' can be called only on streaming Dataset/DataFrame")}new DataStreamWriter[T](this)
}

在 DataStreamWriter 对象上通过 format 方法指定 Sink 的短名并记录下来

def format(source: String): DataStreamWriter[T] = {
    this.source = sourcethis
}

最终会通过 DataStreamWriter 对象上的 start 方法启动执行, 其中会通过短名创建 DataSource

val dataSource =DataSource(df.sparkSession,className = source,  //传入的 Sink 短名 options = extraOptions.toMap,partitionColumns = normalizedParCols.getOrElse(Nil))

在创建 DataSource 的时候, 会通过一个复杂的流程创建出对应的 Source 和 Sink

lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)

在这个复杂的创建流程中, 有一行最关键的代码, 就是通过 Java 的类加载器加载所有的 DataSourceRegister

val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

在 DataSourceRegister 中会创建对应的 Source 或者 Sink

trait DataSourceRegister {
    def shortName(): String       // 提供短名
}trait StreamSourceProvider {
    def createSource(          //创建 SourcesqlContext: SQLContext,metadataPath: String,schema: Option[StructType],providerName: String,parameters: Map[String, String]): Source
}trait StreamSinkProvider {
    def createSink( //创建 Sink sqlContext: SQLContext,parameters: Map[String, String],partitionColumns: Seq[String],outputMode: OutputMode): Sink
}

自定义 Sink 的方式

根据前面的流程说明, 有两点非常重要

  • Spark 会自动加载所有 DataSourceRegister 的子类, 所以需要通过 DataSourceRegister 加载 Source 和 Sink
  • Spark 提供了 StreamSinkProvider 用以创建 Sink, 提供必要的依赖

所以如果要创建自定义的 Sink, 需要做两件事

  • 创建一个注册器, 继承 DataSourceRegister 提供注册功能, 继承 StreamSinkProvider 获取创建 Sink 的必备依赖
  • 创建一个 Sink 子类

自定义 Sink

步骤

  • 读取 Kafka 数据
  • 简单处理数据
  • 创建 Sink
  • 创建 Sink 注册器
  • 使用自定义 Sink

代码

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {
    item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}.as[(Int, Int, Int)].toDF("age", "job", "balance")class MySQLSink(options: Map[String, String], outputMode: OutputMode) extends Sink {
    override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val userName = options.get("userName").orNullval password = options.get("password").orNullval table = options.get("table").orNullval jdbcUrl = options.get("jdbcUrl").orNullval properties = new Propertiesproperties.setProperty("user", userName)properties.setProperty("password", password)data.write.mode(outputMode.toString).jdbc(jdbcUrl, table, properties)}
}class MySQLStreamSinkProvider extends StreamSinkProvider with DataSourceRegister {
    override def createSink(sqlContext: SQLContext,parameters: Map[String, String],partitionColumns: Seq[String],outputMode: OutputMode): Sink = {
    new MySQLSink(parameters, outputMode)}override def shortName(): String = "mysql"
}result.writeStream.format("mysql").option("username", "root").option("password", "000000").option("table", "streaming-bank-result").option("jdbcUrl", "jdbc:mysql://Bigdata01:3306/test").start().awaitTermination()

Tigger

目标和步骤
目标

掌握如何控制 StructuredStreaming 的处理时间

步骤

  • 微批次处理
  • 连续流处理

微批次处理

什么是微批次

在这里插入图片描述
并不是真正的流, 而是缓存一个批次周期的数据, 后处理这一批次的数据

通用流程

步骤

  • 根据 Spark 提供的调试用的数据源 Rate 创建流式 DataFrame
  • Rate 数据源会定期提供一个由两列 timestamp, value 组成的数据, value 是一个随机数

处理和聚合数据, 计算每个个位数和十位数各有多少条数据

  • 对 value 求 log10 即可得出其位数
  • 后按照位数进行分组, 最终就可以看到每个位数的数据有多少个

代码

val spark = SparkSession.builder().master("local[6]").appName("socket_processor").getOrCreate()import org.apache.spark.sql.functions._
import spark.implicits._spark.sparkContext.setLogLevel("ERROR")val source = spark.readStream.format("rate").load()val result = source.select(log10('value) cast IntegerType as 'key, 'value).groupBy('key).agg(count('key) as 'count).select('key, 'count).where('key.isNotNull).sort('key.asc)

默认方式划分批次

介绍

默认情况下的 Structured Streaming 程序会运行在微批次的模式下, 当一个批次结束后, 下一个批次会立即开始处理

步骤

指定落地到 Console 中, 不指定 Trigger

代码

result.writeStream.outputMode(OutputMode.Complete()).format("console").start().awaitTermination()

按照固定时间间隔划分批次

介绍

使用微批次处理数据, 使用用户指定的时间间隔启动批次, 如果间隔指定为 0, 则尽可能快的去处理, 一个批次紧接着一个批次

  • 如果前一批数据提前完成, 待到批次间隔达成的时候再启动下一个批次
  • 如果前一批数据延后完成, 下一个批次会在前面批次结束后立即启动
  • 如果没有数据可用, 则不启动处理

步骤

通过 Trigger.ProcessingTime() 指定处理间隔

代码

result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.ProcessingTime("2 seconds")).start().awaitTermination()

一次性划分批次

介绍

只划分一个批次, 处理完成以后就停止 Spark 工作, 当需要启动一下 Spark 处理遗留任务的时候, 处理完就关闭集群的情况下, 这个划分方式非常实用

步骤

使用 Trigger.Once 一次性划分批次

代码

result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.Once()).start().awaitTermination()

连续流处理

介绍

  • 微批次会将收到的数据按照批次划分为不同的 DataFrame, 后执行 DataFrame, 所以其数据的处理延迟取决于每个 DataFrame 的处理速度, 最快也只能在一个 DataFrame 结束后立刻执行下一个, 最快可以达到 100ms 左右的端到端延迟
  • 而连续流处理可以做到大约 1ms 的端到端数据处理延迟
  • 连续流处理可以达到 at-least-once 的容错语义
  • 从 Spark 2.3 版本开始支持连续流处理, 我们所采用的 2.2 版本还没有这个特性, 并且这个特性截止到 2.4 依然是实验性质, 不建议在生产环境中使用

操作

步骤

使用特殊的 Trigger 完成功能

代码

result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.Continuous("1 second")).start().awaitTermination()

限制

  • 只支持 Map 类的有类型操作
  • 只支持普通的的 SQL 类操作, 不支持聚合
  • Source 只支持 Kafka
  • Sink 只支持 Kafka, Console, Memory

从 Source 到 Sink 的流程

目标和步骤

目标

理解 Source 到 Sink 的整体原理

步骤

从 Source 到 Sink 的流程

从 Source 到 Sink 的流程

在这里插入图片描述

  • 在每个 StreamExecution 的批次最开始, StreamExecution 会向 Source 询问当前 Source 的最新进度, 即最新的 offset
  • StreamExecution 将 Offset 放到 WAL 里
  • StreamExecution 从 Source 获取 start offset, end offset 区间内的数据
  • StreamExecution 触发计算逻辑 logicalPlan 的优化与编译
  • 计算结果写出给 Sink
    • 调用 Sink.addBatch(batchId: Long, data: DataFrame) 完成
    • 此时才会由 Sink 的写入操作开始触发实际的数据获取和计算过程
  • 在数据完整写出到 Sink 后, StreamExecution 通知 Source 批次 id 写入到 batchCommitLog, 当前批次结束

错误恢复和容错语义

目标和步骤

目标

理解 Structured Streaming 中提供的系统级别容错手段

步骤

  • 端到端
  • 三种容错语义
  • Sink 的容错

端到端

在这里插入图片描述

  • Source 可能是 Kafka, HDFS
  • Sink 也可能是 Kafka, HDFS, MySQL 等存储服务
  • 消息从 Source 取出, 经过 Structured Streaming 处理, 最后落地到 Sink 的过程, 叫做端到端

三种容错语义

at-most-once

在这里插入图片描述

  • 在数据从 Source 到 Sink 的过程中, 出错了, Sink 可能没收到数据, 但是不会收到两次, 叫做 at-most-once
  • 一般错误恢复的时候, 不重复计算, 则是 at-most-once

at-least-once

在这里插入图片描述

  • 在数据从 Source 到 Sink 的过程中, 出错了, Sink 一定会收到数据, 但是可能收到两次, 叫做 at-least-once
  • 一般错误恢复的时候, 重复计算可能完成也可能未完成的计算, 则是 at-least-once

exactly-once

在这里插入图片描述

  • 在数据从 Source 到 Sink 的过程中, 虽然出错了, Sink 一定恰好收到应该收到的数据, 一条不重复也一条都不少, 即是 exactly-once
  • 想做到 exactly-once 是非常困难的

Sink 的容错

在这里插入图片描述
故障恢复一般分为 Driver 的容错和 Task 的容错

  • Driver 的容错指的是整个系统都挂掉了
  • Task 的容错指的是一个任务没运行明白, 重新运行一次

因为 Spark 的 Executor 能够非常好的处理 Task 的容错, 所以我们主要讨论 Driver 的容错, 如果出错的时候

  • 读取 WAL offsetlog 恢复出最新的 offsets
    当 StreamExecution 找到 Source 获取数据的时候, 会将数据的起始放在 WAL offsetlog 中, 当出错要恢复的时候, 就可以从中获取当前处理批次的数据起始, 例如 Kafka 的 Offset

  • 读取 batchCommitLog 决定是否需要重做最近一个批次
    当 Sink 处理完批次的数据写入时, 会将当前的批次 ID 存入 batchCommitLog, 当出错的时候就可以从中取出进行到哪一个批次了, 和 WAL 对比即可得知当前批次是否处理完

  • 如果有必要的话, 当前批次数据重做

    • 如果上次执行在 (5) 结束前即失效, 那么本次执行里 Sink 应该完整写出计算结果
    • 如果上次执行在 (5) 结束后才失效, 那么本次执行里 Sink 可以重新写出计算结果 (覆盖上次结果), 也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)

这样即可保证每次执行的计算结果, 在 Sink 这个层面, 是 不重不丢 的, 即使中间发生过失效和恢复, 所以 Structured Streaming 可以做到 exactly-once

容错所需要的存储

存储

  • offsetlog 和 batchCommitLog 关乎于错误恢复
  • offsetlog 和 batchCommitLog 需要存储在可靠的空间里
  • offsetlog 和 batchCommitLog 存储在 Checkpoint 中
  • WAL 其实也存在于 Checkpoint 中

指定 Checkpoint

只有指定了 Checkpoint 路径的时候, 对应的容错功能才可以开启

aggDF.writeStream.outputMode("complete").option("checkpointLocation", "path/to/HDFS/dir") //指定 Checkpoint 的路径, 这个路径对应的目录必须是 HDFS 兼容的文件系统.format("memory").start()

需要的外部支持

如果要做到 exactly-once, 只是 Structured Streaming 能做到还不行, 还需要 Source 和 Sink 系统的支持

  • Source 需要支持数据重放

当有必要的时候, Structured Streaming 需要根据 start 和 end offset 从 Source 系统中再次获取数据, 这叫做重放

  • Sink 需要支持幂等写入

如果需要重做整个批次的时候, Sink 要支持给定的 ID 写入数据, 这叫幂等写入, 一个 ID 对应一条数据进行写入, 如果前面已经写入, 则替换或者丢弃, 不能重复

所以 Structured Streaming 想要做到 exactly-once, 则也需要外部系统的支持, 如下

Source

Sources 是否可重放 原生内置支持 注解
HDFS 可以 已支持 包括但不限于 Text, JSON, CSV, Parquet, ORC
Kafka 可以 已支持 Kafka 0.10.0+
RateStream 可以 已支持 以一定速率产生数据
RDBMS 可以 待支持 预计后续很快会支持
Socket 不可以 已支持 主要用途是在技术会议和讲座上做 Demo

Sink

Sinks 是否幂等写入 原生内置支持 注解
HDFS 可以 支持 包括但不限于 Text, JSON, CSV, Parquet, ORC
ForeachSink 可以 支持 可定制度非常高的 Sink, 是否可以幂等取决于具体的实现
RDBMS 可以 待支持 预计后续很快会支持
Kafka 不可以 支持 Kafka 目前不支持幂等写入, 所以可能会有重复写入
  相关解决方案