当前位置: 代码迷 >> 综合 >> 7.delta lake的merge操作详细讲解,案例及性能调优
  详细解决方案

7.delta lake的merge操作详细讲解,案例及性能调优

热度:37   发布时间:2023-12-13 16:13:10.0

上文讲解了deltalake 的update,delete及merge的基本操作。鉴于merge操作的复杂性,本文主要对其进行展开讲解。

1.merge算子操作语法

merge操作的sql表达如下:

import io.delta.tables._import org.apache.spark.sql.functions._DeltaTable.forPath(spark, "/data/events/")  .as("events")  .merge(    updatesDF.as("updates"),    "events.eventId = updates.eventId")  .whenMatched  .updateExpr(    Map("data" -> "updates.data"))  .whenNotMatched  .insertExpr(    Map(      "date" -> "updates.date",      "eventId" -> "updates.eventId",      "data" -> "updates.data"))  .execute()

merge 编码操作还是有些约束需要详细描述的。

1.1 可以有(1,2,3)个wenMatched或者whenNotMatched的子语句。其中,whenMatched操作最多有两个语句,whenNotMatched最多有一个子语句。

1.2 当源表的数据和目标表的数据满足匹配条件的时候,执行的是whenMatched语句。这些语句可以有以下几个语义:

a) whenMatched语句最多有一个update和一个delete表达。merge中的update行为仅仅更新满足条件的目标表一行数据的指定列。而delete操作会删除所有匹配的行。

b) 每个whenMatched语句都可以有一个可选的条件。如果该可选的条件存在,update和delete操作仅仅在该可选条件为true的时候,才会在匹配的目标数据上执行相应操作。

c) 如果有两个whenMatched子句,则将按照它们被指定的顺序(即,子句的顺序很重要)进行执行。第一个子句必须具有一个子句条件(否则,第二个子句将永远不会执行)。

d) 如果两个whenMatched子语句都有条件并且两个子语句的条件都不为true,那不会对目标数据进行任何修改。

c) 支持满足条件的源dataset中相关行的所有列同时更新到目标detla表的相关列,表达式如下:

whenMatched(...).updateAll()

等价于:

whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

要保证源表和目标表有相同的列,否则会抛出异常。

1.3 给定的条件,源表的一行数据,跟目标表没有完成匹配的时候执行whenNotMatched语句。该子语句有以下语法:

a) whenNotMatched仅仅支持insert表达。根据指定的列和相关的条件,该操作会在目标表中插入一条新的数据,当目标表中存在的列没有明确的指定的时候,就插入null。

b) whenNotMatched语句可以有可选条件。如果指定了可选条件,数据仅仅会在可选条件为true的时候才会插入。否则,源列会被忽略。

c) 也可以插入匹配目标表相关行的所有源表行的数据列,表达式:

whenNotMatched(...).insertAll()

等价于:

whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

要保证源表和目标表有相同的列,否则就会抛出异常。

2.schema校验

merge操作会自动校验insert和update操作产生额数据schema是否与目标表的schema匹配。规则如下:

a) 对于update和insert行为,指定的目标列必须在目标delta lake表中存在。

b) 对于updateAll和insertAll操作,源dataset必须包含所有目标表的列。源dataset可以有目标表中不存在的列,但是这些列会被忽略。当然也可以通过配置保留仅源dataset有的列。

c) 对于所有操作,如果由生成目标列的表达式生成的数据类型与目标Delta表中的对应列不同,则merge尝试将其强制转换为表中的类型。

3.自动schema转换

默认情况下,updateAll和insertAll操作仅仅会更新或插入在目标表中有的相同列名的列,对于仅仅在源dataset中存在而目标表中不存在的列,会被忽略。但是有些场景下,我们希望保留源dataset中新增的列。首先需要将前面介绍的一个参数spark.databricks.delta.schema.autoMerge.enabled设置为true。

注意:

a. schema自动增加仅仅是针对updateAll操作或者insertAll操作,或者两者。

b. 仅仅顶层的列会被更改,而不是嵌套的列。

c. 更新和插入操作不能显式引用目标表中不存在的目标列(即使其中有updateAll或insertAll作为子句之一)。 

4.schema推断与否对比

据一些例子,进行schema自动推断与不自动推断的对比

对比一

目标列(key,value),源列(key,value,newValue),对源源表执行下面的sql操作:

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insertAll()  .execute()

没有使用自动schema推断的话:目标表的schema信息是不会变的。仅仅key,value列被更新。

使用了schema推断的话:表的schema就会演变为(key,value,newValue)。updateAll操作,会更新value和newValue列。对于insertAll操作会插入整行(key,value,newValue)。

对比二

目标表(key,oldValue),源表(key,newValue),对源表执行下面的sql:

targetDeltaTable.alias("t").merge(sourceDataFrame.alias("s"),"t.key = s.key").whenMatched().updateAll().whenNotMatched().insertAll().execute()

不使用schema推断:updateAll和insertAll操作都会抛异常。

使用schema推断:表的shema会演变为(key,oldValue,newValue)。updateAll操作会更新key和value列,而oldValue列不变。insertAll操作会插入(key,null,newValue),oldValue会插入null。

对比三

目标表(key,oldValue),源表(key,newValue),对源表执行下面的sql

targetDeltaTable.alias("t").merge(sourceDataFrame.alias("s"),"t.key = s.key").whenMatched().update(Map("newValue" -> col("s.newValue"))).whenNotMatched().insertAll().execute()

不使用schema推断:update操作会抛出异常,因为newValue在目标表中并不存在。

使用schema推断:update操作会抛出异常,因为newValue在目标表中并不存在。

对比四:

目标表(key,oldValue),源表(key,newValue),对源表执行下面的sql

targetDeltaTable.alias("t").merge(sourceDataFrame.alias("s"),"t.key = s.key").whenMatched().updateAll().whenNotMatched().insert(Map("key" -> col("s.key"),"newValue" -> col("s.newValue"))).execute()

不使用schema推断:insert操作会抛出异常,因为newValue在目标表中并不存在。

使用schema推断:insert操作依然会抛出异常,因为newValue在目标表中并不存在。

5.性能调优

下面几个方法可以有效减少merge的处理时间:

a.减少匹配查找的数据量

默认情况下,merge操作会扫描整个delta lake表找到满足条件的数据。可以加些谓词,以减少数据量。比如,数据是以country和date进行分区的,而你只想更新特定国家的昨天的数据。就可以增加一些条件,比如:

events.date = current_date() AND events.country = 'USA'

这样就只会处理指定分区的数据,大大减少了数据扫描量。也可以避免不同分区之间操作的一些冲突。

b.合并文件

如果数据存储的时候有很多小文件,就会降低数据的读取速度。可以合并小文件成一些大文件,来提升读取的速度。后面会说到这个问题。

c.控制shuffle的分区数

为了计算和更新数据,merge操作会对数据进行多次shuffle。shuffle过程中task数量是由参数spark.sql.shuffle.partitions来设置,默认是200。该参数不仅能控制shuffle的并行度,也能决定输出的文件数。增加这个值虽然可以增加并行度,但也相应的增加了产生小文件数。

d.写出数据之间进行重分区

对与分区表,merge操作会产生很多小文件,会比shuffle分区数多很多。原因是每个shuffle任务会为多分区表产生更多的文件,这可能会是一个性能瓶颈。所以,很多场景中使用表的分区列对数据进行写入前重分区是很有效的。可以通过设置spark.delta.merge.repartitionBeforeWrite为true来生效。

推荐阅读:

6. delta lake 的curd操作

5.数据湖deltalake流表的读写

4.数据湖之schema校验

3.数据湖deltalake之时间旅行及版本管理

  相关解决方案