当前位置: 代码迷 >> 综合 >> Spark3 用户定义聚合函数(UDAFs)
  详细解决方案

Spark3 用户定义聚合函数(UDAFs)

热度:12   发布时间:2024-01-05 10:17:33.0

描述

用户定义聚合函数(UDAFs)是用户常用的编程,一次作用在多行,并且返回一个聚合的值作为结果.文档列出需要创建注册UDAF的相关类.包含一些例子和示例在Scala中怎样定义和注册UDAF,和在SparkSql中调用.

聚合器[-IN,BUF,OUT]

用户定义聚合的基础类,它可以在Dataset操作使用,处理一个组里的数据,并且缩减为一个单独的值.
IN 聚合操作的输入类型
BUF 缩减数据的中间类型
OUT 最后输出结果类型

  • bufferEncoder: Encoder[BUF] 指定中间处理数据编码类型
  • finish(reduction: BUF): OUT 转换缩减后的结果
  • merge(b1: BUF, b2: BUF): BUF 合并两个中间数据
  • outputEncoder: Encoder[OUT] 为输出值类型指定编译
  • reduce(b: BUF, a: IN): BUF 聚合输入数据到中间值.为了性能函数可能修改b再返回它,而不是为了重新构建对象.
  • zero: BUF 为聚合操作的初始值.

例子

类型安全用户定义聚合函数

强类型的数据认可用户定义聚合是以Aggregator抽象类为中心.例如,安全类型用户自定义平均数如下:

import org.apache.spark.sql.{
    Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregatorcase class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] {
    // A zero value for this aggregation. Should satisfy the property that any b + zero = bdef zero: Average = Average(0L, 0L)// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectdef reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salarybuffer.count += 1buffer}// Merge two intermediate valuesdef merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sumb1.count += b2.countb1}// Transform the output of the reductiondef finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count// Specifies the Encoder for the intermediate value typedef bufferEncoder: Encoder[Average] = Encoders.product// Specifies the Encoder for the final output value typedef outputEncoder: Encoder[Double] = Encoders.scalaDouble
}val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+

无类型用户定义聚合函数

类型聚合描述如上,可能也注册无类型聚合UDF来使用dataframe/dataset. 下例为自定义元类型dataframe.

import org.apache.spark.sql.{
    Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functionscase class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Long, Average, Double] {
    // A zero value for this aggregation. Should satisfy the property that any b + zero = bdef zero: Average = Average(0L, 0L)// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectdef reduce(buffer: Average, data: Long): Average = {
    buffer.sum += databuffer.count += 1buffer}// Merge two intermediate valuesdef merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sumb1.count += b2.countb1}// Transform the output of the reductiondef finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count// Specifies the Encoder for the intermediate value typedef bufferEncoder: Encoder[Average] = Encoders.product// Specifies the Encoder for the final output value typedef outputEncoder: Encoder[Double] = Encoders.scalaDouble
}// Register the function to access it
spark.udf.register("myAverage", functions.udaf(MyAverage))val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+