文章目录
- Sink
-
- HDFS Sink
- Kafka Sink
- Foreach Writer
- 自定义 Sink
- Tigger
- 从 Source 到 Sink 的流程
- 错误恢复和容错语义
Sink
目标和步骤
目标
- 能够串联两端, 理解整个流式应用, 以及其中的一些根本的原理, 比如说容错语义
- 能够知道如何对接外部系统, 写入数据
步骤
- HDFS Sink
- Kafka Sink
- Foreach Sink
- 自定义 Sink
- Tiggers
- Sink 原理
- 错误恢复和容错语义
HDFS Sink
目标和步骤
目标
能够使用 Spark 将流式数据的处理结果放入 HDFS
步骤
- 场景和需求
- 代码实现
场景和需求
场景
- Kafka 往往作为数据系统和业务系统之间的桥梁
- 数据系统一般由批量处理和流式处理两个部分组成
- 在 Kafka 作为整个数据平台入口的场景下, 需要使用 StructuredStreaming 接收 Kafka 的数据并放置于 HDFS 上, 后续才可以进行批量处理
案例需求
从 Kafka 接收数据, 从给定的数据集中, 裁剪部分列, 落地于 HDFS
代码实现
步骤说明
- 从 Kafka 读取数据, 生成源数据集
- 连接 Kafka 生成 DataFrame
- 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型
- 对源数据集选择列
- 解析 CSV 格式的数据
- 生成正确类型的结果集
- 落地 HDFS
整体代码
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {
item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")result.writeStream.format("parquet") // 也可以是 "orc", "json", "csv" 等.option("path", "/dataset/streaming/result/").start()
Kafka Sink
目标和步骤
目标
掌握什么时候要将流式数据落地至 Kafka, 以及如何落地至 Kafka
步骤
- 场景
- 代码
场景
- 有很多时候, ETL 过后的数据, 需要再次放入 Kafka
- 在 Kafka 后, 可能会有流式程序统一将数据落地到 HDFS 或者 HBase
案例需求
从 Kafka 中获取数据, 简单处理, 再次放入 Kafka
代码
步骤
- 从 Kafka 读取数据, 生成源数据集
- 连接 Kafka 生成 DataFrame
- 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型
- 对源数据集选择列
- 解析 CSV 格式的数据
- 生成正确类型的结果集
- 再次落地 Kafka
代码
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {
item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")result.writeStream.format("kafka").outputMode(OutputMode.Append()).option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("topic", "streaming-bank-result").start().awaitTermination()
Foreach Writer
目标和步骤
目标
掌握 Foreach 模式理解如何扩展 Structured Streaming 的 Sink, 同时能够将数据落地到 MySQL
步骤
- 需求
- 代码
需求
场景
大数据有一个常见的应用场景
- 收集业务系统数据
- 数据处理
- 放入 OLTP 数据
- 外部通过 ECharts 获取并处理数据
这个场景下, StructuredStreaming 就需要处理数据并放入 MySQL 或者 MongoDB, HBase 中以供 Web 程序可以获取数据, 图表的形式展示在前端
Foreach 模式::
起因
- 在 Structured Streaming 中, 并未提供完整的 MySQL/JDBC 整合工具
- 不止 MySQL 和 JDBC, 可能会有其它的目标端需要写入
- 很多时候 Structured Streaming 需要对接一些第三方的系统, 例如阿里云的云存储, 亚马逊云的云存储等, 但是 Spark 无法对所有第三方都提供支持, 有时候需要自己编写
解决方案
- 既然无法满足所有的整合需求, StructuredStreaming 提供了 Foreach, 可以拿到每一个批次的数据
- 通过 Foreach 拿到数据后, 可以通过自定义写入方式, 从而将数据落地到其它的系统
案例需求::
从 Kafka 中获取数据, 处理后放入 MySQL
步骤
- 创建 DataFrame 表示 Kafka 数据源
- 在源 DataFrame 中选择三列数据
- 创建 ForeachWriter 接收每一个批次的数据落地 MySQL
- Foreach 落地数据
代码
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {
item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}
.as[(Int, Int, Int)]
.toDF("age", "job", "balance")class MySQLWriter extends ForeachWriter[Row] {
val driver = "com.mysql.jdbc.Driver"var statement: Statement = _var connection: Connection = _val url: String = "jdbc:mysql://Bigdata01:3306/streaming-bank-result"val user: String = "root"val pwd: String = "000000"override def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)connection = DriverManager.getConnection(url, user, pwd)this.statement = connection.createStatementtrue}override def process(value: Row): Unit = {
statement.executeUpdate(s"insert into bank values(" +s"${value.getAs[Int]("age")}, " +s"${value.getAs[Int]("job")}, " +s"${value.getAs[Int]("balance")} )")}override def close(errorOrNull: Throwable): Unit = {
connection.close()}
}result.writeStream.foreach(new MySQLWriter).start().awaitTermination()
自定义 Sink
目标和步骤
目标
- Foreach 倾向于一次处理一条数据, 如果想拿到 DataFrame 幂等的插入外部数据源, 则需要自定义 Sink
- 了解如何自定义 Sink
步骤
- Spark 加载 Sink 流程分析
- 自定义 Sink
Spark 加载 Sink 流程分析
Sink 加载流程
writeStream 方法中会创建一个 DataStreamWriter 对象
def writeStream: DataStreamWriter[T] = {
if (!isStreaming) {
logicalPlan.failAnalysis("'writeStream' can be called only on streaming Dataset/DataFrame")}new DataStreamWriter[T](this)
}
在 DataStreamWriter 对象上通过 format 方法指定 Sink 的短名并记录下来
def format(source: String): DataStreamWriter[T] = {
this.source = sourcethis
}
最终会通过 DataStreamWriter 对象上的 start 方法启动执行, 其中会通过短名创建 DataSource
val dataSource =DataSource(df.sparkSession,className = source, //传入的 Sink 短名 options = extraOptions.toMap,partitionColumns = normalizedParCols.getOrElse(Nil))
在创建 DataSource 的时候, 会通过一个复杂的流程创建出对应的 Source 和 Sink
lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
在这个复杂的创建流程中, 有一行最关键的代码, 就是通过 Java 的类加载器加载所有的 DataSourceRegister
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
在 DataSourceRegister 中会创建对应的 Source 或者 Sink
trait DataSourceRegister {
def shortName(): String // 提供短名
}trait StreamSourceProvider {
def createSource( //创建 SourcesqlContext: SQLContext,metadataPath: String,schema: Option[StructType],providerName: String,parameters: Map[String, String]): Source
}trait StreamSinkProvider {
def createSink( //创建 Sink sqlContext: SQLContext,parameters: Map[String, String],partitionColumns: Seq[String],outputMode: OutputMode): Sink
}
自定义 Sink 的方式
根据前面的流程说明, 有两点非常重要
- Spark 会自动加载所有 DataSourceRegister 的子类, 所以需要通过 DataSourceRegister 加载 Source 和 Sink
- Spark 提供了 StreamSinkProvider 用以创建 Sink, 提供必要的依赖
所以如果要创建自定义的 Sink, 需要做两件事
- 创建一个注册器, 继承 DataSourceRegister 提供注册功能, 继承 StreamSinkProvider 获取创建 Sink 的必备依赖
- 创建一个 Sink 子类
自定义 Sink
步骤
- 读取 Kafka 数据
- 简单处理数据
- 创建 Sink
- 创建 Sink 注册器
- 使用自定义 Sink
代码
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[6]").appName("kafka integration").getOrCreate()import spark.implicits._val source = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "Bigdata01:9092,Bigdata02:9092,Bigdata03:9092").option("subscribe", "streaming-bank").option("startingOffsets", "earliest").load().selectExpr("CAST(value AS STRING)").as[String]val result = source.map {
item =>val arr = item.replace("\"", "").split(";")(arr(0).toInt, arr(1).toInt, arr(5).toInt)
}.as[(Int, Int, Int)].toDF("age", "job", "balance")class MySQLSink(options: Map[String, String], outputMode: OutputMode) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val userName = options.get("userName").orNullval password = options.get("password").orNullval table = options.get("table").orNullval jdbcUrl = options.get("jdbcUrl").orNullval properties = new Propertiesproperties.setProperty("user", userName)properties.setProperty("password", password)data.write.mode(outputMode.toString).jdbc(jdbcUrl, table, properties)}
}class MySQLStreamSinkProvider extends StreamSinkProvider with DataSourceRegister {
override def createSink(sqlContext: SQLContext,parameters: Map[String, String],partitionColumns: Seq[String],outputMode: OutputMode): Sink = {
new MySQLSink(parameters, outputMode)}override def shortName(): String = "mysql"
}result.writeStream.format("mysql").option("username", "root").option("password", "000000").option("table", "streaming-bank-result").option("jdbcUrl", "jdbc:mysql://Bigdata01:3306/test").start().awaitTermination()
Tigger
目标和步骤
目标
掌握如何控制 StructuredStreaming 的处理时间
步骤
- 微批次处理
- 连续流处理
微批次处理
什么是微批次
并不是真正的流, 而是缓存一个批次周期的数据, 后处理这一批次的数据
通用流程
步骤
- 根据 Spark 提供的调试用的数据源 Rate 创建流式 DataFrame
- Rate 数据源会定期提供一个由两列 timestamp, value 组成的数据, value 是一个随机数
处理和聚合数据, 计算每个个位数和十位数各有多少条数据
- 对 value 求 log10 即可得出其位数
- 后按照位数进行分组, 最终就可以看到每个位数的数据有多少个
代码
val spark = SparkSession.builder().master("local[6]").appName("socket_processor").getOrCreate()import org.apache.spark.sql.functions._
import spark.implicits._spark.sparkContext.setLogLevel("ERROR")val source = spark.readStream.format("rate").load()val result = source.select(log10('value) cast IntegerType as 'key, 'value).groupBy('key).agg(count('key) as 'count).select('key, 'count).where('key.isNotNull).sort('key.asc)
默认方式划分批次
介绍
默认情况下的 Structured Streaming 程序会运行在微批次的模式下, 当一个批次结束后, 下一个批次会立即开始处理
步骤
指定落地到 Console 中, 不指定 Trigger
代码
result.writeStream.outputMode(OutputMode.Complete()).format("console").start().awaitTermination()
按照固定时间间隔划分批次
介绍
使用微批次处理数据, 使用用户指定的时间间隔启动批次, 如果间隔指定为 0, 则尽可能快的去处理, 一个批次紧接着一个批次
- 如果前一批数据提前完成, 待到批次间隔达成的时候再启动下一个批次
- 如果前一批数据延后完成, 下一个批次会在前面批次结束后立即启动
- 如果没有数据可用, 则不启动处理
步骤
通过 Trigger.ProcessingTime() 指定处理间隔
代码
result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.ProcessingTime("2 seconds")).start().awaitTermination()
一次性划分批次
介绍
只划分一个批次, 处理完成以后就停止 Spark 工作, 当需要启动一下 Spark 处理遗留任务的时候, 处理完就关闭集群的情况下, 这个划分方式非常实用
步骤
使用 Trigger.Once 一次性划分批次
代码
result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.Once()).start().awaitTermination()
连续流处理
介绍
- 微批次会将收到的数据按照批次划分为不同的 DataFrame, 后执行 DataFrame, 所以其数据的处理延迟取决于每个 DataFrame 的处理速度, 最快也只能在一个 DataFrame 结束后立刻执行下一个, 最快可以达到 100ms 左右的端到端延迟
- 而连续流处理可以做到大约 1ms 的端到端数据处理延迟
- 连续流处理可以达到 at-least-once 的容错语义
- 从 Spark 2.3 版本开始支持连续流处理, 我们所采用的 2.2 版本还没有这个特性, 并且这个特性截止到 2.4 依然是实验性质, 不建议在生产环境中使用
操作
步骤
使用特殊的 Trigger 完成功能
代码
result.writeStream.outputMode(OutputMode.Complete()).format("console").trigger(Trigger.Continuous("1 second")).start().awaitTermination()
限制
- 只支持 Map 类的有类型操作
- 只支持普通的的 SQL 类操作, 不支持聚合
- Source 只支持 Kafka
- Sink 只支持 Kafka, Console, Memory
从 Source 到 Sink 的流程
目标和步骤
目标
理解 Source 到 Sink 的整体原理
步骤
从 Source 到 Sink 的流程
从 Source 到 Sink 的流程
- 在每个 StreamExecution 的批次最开始, StreamExecution 会向 Source 询问当前 Source 的最新进度, 即最新的 offset
- StreamExecution 将 Offset 放到 WAL 里
- StreamExecution 从 Source 获取 start offset, end offset 区间内的数据
- StreamExecution 触发计算逻辑 logicalPlan 的优化与编译
- 计算结果写出给 Sink
- 调用 Sink.addBatch(batchId: Long, data: DataFrame) 完成
- 此时才会由 Sink 的写入操作开始触发实际的数据获取和计算过程
- 在数据完整写出到 Sink 后, StreamExecution 通知 Source 批次 id 写入到 batchCommitLog, 当前批次结束
错误恢复和容错语义
目标和步骤
目标
理解 Structured Streaming 中提供的系统级别容错手段
步骤
- 端到端
- 三种容错语义
- Sink 的容错
端到端
- Source 可能是 Kafka, HDFS
- Sink 也可能是 Kafka, HDFS, MySQL 等存储服务
- 消息从 Source 取出, 经过 Structured Streaming 处理, 最后落地到 Sink 的过程, 叫做端到端
三种容错语义
at-most-once
- 在数据从 Source 到 Sink 的过程中, 出错了, Sink 可能没收到数据, 但是不会收到两次, 叫做 at-most-once
- 一般错误恢复的时候, 不重复计算, 则是 at-most-once
at-least-once
- 在数据从 Source 到 Sink 的过程中, 出错了, Sink 一定会收到数据, 但是可能收到两次, 叫做 at-least-once
- 一般错误恢复的时候, 重复计算可能完成也可能未完成的计算, 则是 at-least-once
exactly-once
- 在数据从 Source 到 Sink 的过程中, 虽然出错了, Sink 一定恰好收到应该收到的数据, 一条不重复也一条都不少, 即是 exactly-once
- 想做到 exactly-once 是非常困难的
Sink 的容错
故障恢复一般分为 Driver 的容错和 Task 的容错
- Driver 的容错指的是整个系统都挂掉了
- Task 的容错指的是一个任务没运行明白, 重新运行一次
因为 Spark 的 Executor 能够非常好的处理 Task 的容错, 所以我们主要讨论 Driver 的容错, 如果出错的时候
-
读取 WAL offsetlog 恢复出最新的 offsets
当 StreamExecution 找到 Source 获取数据的时候, 会将数据的起始放在 WAL offsetlog 中, 当出错要恢复的时候, 就可以从中获取当前处理批次的数据起始, 例如 Kafka 的 Offset -
读取 batchCommitLog 决定是否需要重做最近一个批次
当 Sink 处理完批次的数据写入时, 会将当前的批次 ID 存入 batchCommitLog, 当出错的时候就可以从中取出进行到哪一个批次了, 和 WAL 对比即可得知当前批次是否处理完 -
如果有必要的话, 当前批次数据重做
- 如果上次执行在 (5) 结束前即失效, 那么本次执行里 Sink 应该完整写出计算结果
- 如果上次执行在 (5) 结束后才失效, 那么本次执行里 Sink 可以重新写出计算结果 (覆盖上次结果), 也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)
这样即可保证每次执行的计算结果, 在 Sink 这个层面, 是 不重不丢 的, 即使中间发生过失效和恢复, 所以 Structured Streaming 可以做到 exactly-once
容错所需要的存储
存储
- offsetlog 和 batchCommitLog 关乎于错误恢复
- offsetlog 和 batchCommitLog 需要存储在可靠的空间里
- offsetlog 和 batchCommitLog 存储在 Checkpoint 中
- WAL 其实也存在于 Checkpoint 中
指定 Checkpoint
只有指定了 Checkpoint 路径的时候, 对应的容错功能才可以开启
aggDF.writeStream.outputMode("complete").option("checkpointLocation", "path/to/HDFS/dir") //指定 Checkpoint 的路径, 这个路径对应的目录必须是 HDFS 兼容的文件系统.format("memory").start()
需要的外部支持
如果要做到 exactly-once, 只是 Structured Streaming 能做到还不行, 还需要 Source 和 Sink 系统的支持
- Source 需要支持数据重放
当有必要的时候, Structured Streaming 需要根据 start 和 end offset 从 Source 系统中再次获取数据, 这叫做重放
- Sink 需要支持幂等写入
如果需要重做整个批次的时候, Sink 要支持给定的 ID 写入数据, 这叫幂等写入, 一个 ID 对应一条数据进行写入, 如果前面已经写入, 则替换或者丢弃, 不能重复
所以 Structured Streaming 想要做到 exactly-once, 则也需要外部系统的支持, 如下
Source
Sources | 是否可重放 | 原生内置支持 | 注解 |
---|---|---|---|
HDFS | 可以 | 已支持 | 包括但不限于 Text, JSON, CSV, Parquet, ORC |
Kafka | 可以 | 已支持 | Kafka 0.10.0+ |
RateStream | 可以 | 已支持 | 以一定速率产生数据 |
RDBMS | 可以 | 待支持 | 预计后续很快会支持 |
Socket | 不可以 | 已支持 | 主要用途是在技术会议和讲座上做 Demo |
Sink
Sinks | 是否幂等写入 | 原生内置支持 | 注解 |
---|---|---|---|
HDFS | 可以 | 支持 | 包括但不限于 Text, JSON, CSV, Parquet, ORC |
ForeachSink | 可以 | 支持 | 可定制度非常高的 Sink, 是否可以幂等取决于具体的实现 |
RDBMS | 可以 | 待支持 | 预计后续很快会支持 |
Kafka | 不可以 | 支持 | Kafka 目前不支持幂等写入, 所以可能会有重复写入 |