今天为组内同学做了题为「From Calcite to Tampering with Flink SQL」的分享,将Markdown版讲义贴在下面。
本次分享信息量极大,涵盖Calcite基础、Blink Planner执行原理、优化器与优化规则等。之后会择重点专门写文章二次讲解。
From Calcite to Tampering with Flink SQL
August 26th, 2021
For NiceTuan Real-Time Team
Prerequisites
- Basic understanding of
- Flink DataStream runtime (3-layered DAGs, stream partition, etc.)
- Database system concepts
- SQL queries
- Scala language, just in case
(Review) Some Relational Algebra
Textbook - Database System Concepts 6th Edition [Abraham Silberschatz et al. 2011]
-
But Wikipedia is fairly enough
- Relational algebra is a theory that uses algebraic structures with a well-founded semantics for modeling data, and defining queries on it
- The theory was introduced by Edgar F. Codd
Projection (Π)
- Selection (σ)
- Rename (ρ)
- Natural join (?) & Equi-join
- Left outer join (?)
- Right outer join (?)
Calcite In A Nutshell
What is it
As you already knew, "Flink does not reinvent the wheel, but leverages Apache Calcite to deal with most SQL-related works"
Apache Calcite is a foundational software framework that provides query processing, optimization, and query language support to many popular open-source data processing systems such as Apache Hive, Apache Storm, Apache Flink, Druid, and MapD
Architecture
- From Apache Calcite: A Foundational Framework for Optimized Query Processing Over Heterogeneous Data Sources [Edmon Begoli et al. SIGMOD 2018]
Fundamental Concepts
Catalog
- A metadata store & handler for schema, tables, etc.-
SqlNode
- A parsed SQL tree (i.e. AST)-
SqlLiteral
- Constant value (1
,FALSE
, ...) -
SqlIdentifier
- Identifier -
SqlCall
- Call to functions, operators, etc. -
SqlSelect
/SqlJoin
/SqlOrderBy
/ ...
-
-
RelNode
- A relational (algebraic) expressionLogicalTableScan
LogicalProject
LogicalFilter
LogicalCalc
- ...
-
RexNode
- A (typed) row-level expressionRexLiteral
RexVariable
RexCall
- ...
-
RelTrait
&RelTraitDef
- A set of physical properties & their definitions carried by a relational expression-
Convention
- Working scope, mainly a single data source -
RelCollation
- Ordering method of data (and sort keys) -
RelDistribution
- Distribution method of data
-
-
RelOptPlanner
- A query optimizer, which transforms a relational expression into a semantically equivalent relational expression, according to a given set of rules and a cost model-
HepPlanner
- RBO, greedy, heuristic -
VolcanoPlanner
- CBO, dynamic programming, Volcano-flavored
-
-
RelOptRule
- A (usually empirical) rule which defines the transformation routine for RBO-
RelOptRuleOperand
- Used by the rule to determine the section ofRelNode
s to be optimized -
RuleSet
- Self-explanatory
-
RelOptCost
- An interface for optimizer cost in terms of number of rows processed, CPU cost, and I/O cost-
RelMetadataProvider
- An interface for obtaining metadata about relational expressions to support optimization process- Min / max row count
- Data size
- Expression lineage
- Distinctness / uniqueness
- ...
RelOptCluster
- The environment during the optimization of a query
Process Flow
A Quick Calcite Show
Prepare Schema and SQL
SchemaPlus rootSchema = Frameworks.createRootSchema(true);rootSchema.add("student", new AbstractTable() {@Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {RelDataTypeFactory.Builder builder = new Builder(DEFAULT_TYPE_FACTORY);builder.add("id", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.BIGINT));builder.add("name", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.VARCHAR));builder.add("class", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.VARCHAR));builder.add("age", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.INTEGER));return builder.build();}
});rootSchema.add("exam_result", new AbstractTable() {@Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {RelDataTypeFactory.Builder builder = new Builder(DEFAULT_TYPE_FACTORY);builder.add("student_id", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.BIGINT));builder.add("score1", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.FLOAT));builder.add("score2", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.FLOAT));return builder.build();}
});String sql = /* language=SQL */"SELECT a.id, a.name, SUM(b.score1 * 0.7 + b.score2 * 0.3) AS total_score " +"FROM student a " +"INNER JOIN exam_result b ON a.id = b.student_id " +"WHERE a.age < 20 AND b.score1 > 60.0 " +"GROUP BY a.id, a.name";
Parsing
FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder().parserConfig(SqlParser.config().withCaseSensitive(false).withLex(Lex.MYSQL_ANSI)).defaultSchema(rootSchema).build();SqlParser parser = SqlParser.create(sql);
SqlNode originalSqlNode = parser.parseStmt();System.out.println(originalSqlNode.toString());
--- Original SqlNode ---
SELECT `A`.`ID`, `A`.`NAME`, SUM(`B`.`SCORE1` * 0.7 + `B`.`SCORE2` * 0.3) AS `TOTAL_SCORE`
FROM `STUDENT` AS `A`
INNER JOIN `EXAM_RESULT` AS `B` ON `A`.`ID` = `B`.`STUDENT_ID`
WHERE `A`.`AGE` < 20 AND `B`.`SCORE1` > 60.0
GROUP BY `A`.`ID`, `A`.`NAME`
Validation
Properties cxnConfig = new Properties();
cxnConfig.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),String.valueOf(frameworkConfig.getParserConfig().caseSensitive()));CalciteCatalogReader catalogReader = new CalciteCatalogReader(CalciteSchema.from(rootSchema),CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null),DEFAULT_TYPE_FACTORY,new CalciteConnectionConfigImpl(cxnConfig)
);SqlValidator validator = new SqlValidatorImpl1(frameworkConfig.getOperatorTable(),catalogReader,DEFAULT_TYPE_FACTORY
);SqlNode validatedSqlNode = validator.validate(originalSqlNode);System.out.println(validatedSqlNode.toString());
--- Validated SqlNode ---
SELECT `A`.`ID`, `A`.`NAME`, SUM(`B`.`SCORE1` * 0.7 + `B`.`SCORE2` * 0.3) AS `TOTAL_SCORE`
FROM `STUDENT` AS `A`
INNER JOIN `EXAM_RESULT` AS `B` ON `A`.`id` = `B`.`student_id`
WHERE `A`.`age` < 20 AND `B`.`score1` > 60.0
GROUP BY `A`.`id`, `A`.`name`
Planning
RelOptCluster relOptCluster = RelOptCluster.create(new VolcanoPlanner(), new RexBuilder(DEFAULT_TYPE_FACTORY));SqlToRelConverter relConverter = new SqlToRelConverter(null,validator,catalogReader,relOptCluster,frameworkConfig.getConvertletTable()
);RelRoot relRoot = relConverter.convertQuery(validatedSqlNode, false, true);
RelNode originalRelNode = relRoot.rel;System.out.println(RelOptUtil.toString(originalRelNode));
--- Original RelNode ---
LogicalProject(ID=[$0], NAME=[$1], TOTAL_SCORE=[$2])LogicalAggregate(group=[{0, 1}], TOTAL_SCORE=[SUM($2)])LogicalProject(id=[$0], name=[$1], $f2=[+(*($5, 0.7:DECIMAL(2, 1)), *($6, 0.3:DECIMAL(2, 1)))])LogicalFilter(condition=[AND(<($3, 20), >($5, 60.0:DECIMAL(3, 1)))])LogicalJoin(condition=[=($0, $4)], joinType=[inner])LogicalTableScan(table=[[student]])LogicalTableScan(table=[[exam_result]])
Optimization
- Predicate (filter) pushdown past join into table scan using
HepPlanner
andFILTER_INTO_JOIN
rule
σR.aθa' ^ S.bθb' (R ? S) = (σR.aθa' R) ? (σS.bθb' S)
-
HepProgram
defines the order of rules to be attempted
HepProgram hepProgram = new HepProgramBuilder().addRuleInstance(CoreRules.FILTER_INTO_JOIN).addMatchOrder(HepMatchOrder.BOTTOM_UP).build();HepPlanner hepPlanner = new HepPlanner(hepProgram);
hepPlanner.setRoot(originalRelNode);
RelNode optimizedRelNode = hepPlanner.findBestExp();System.out.println(RelOptUtil.toString(optimizedRelNode));
--- Optimized RelNode ---
LogicalProject(ID=[$0], NAME=[$1], TOTAL_SCORE=[$2])LogicalAggregate(group=[{0, 1}], TOTAL_SCORE=[SUM($2)])LogicalProject(id=[$0], name=[$1], $f2=[+(*($5, 0.7:DECIMAL(2, 1)), *($6, 0.3:DECIMAL(2, 1)))])LogicalJoin(condition=[=($0, $4)], joinType=[inner])LogicalFilter(condition=[<($3, 20)])LogicalTableScan(table=[[student]])LogicalFilter(condition=[>($1, 60.0:DECIMAL(3, 1))])LogicalTableScan(table=[[exam_result]])
- Rules can do a lot more...
Dive Into Blink Stream Planner
Overview
- Parsing & validation
- Logical planning
- All-over optimization w/ physical planning
- Execution planning & codegen (only a brief today)
SQL for Example
- Will not cover sophisticated things (e.g. sub-queries, aggregate functions, window TVFs) for now
- Just an ordinary streaming ETL process, which will be optimized later
INSERT INTO expdb.print_joined_result
SELECT FROM_UNIXTIME(a.ts / 1000, 'yyyy-MM-dd HH:mm:ss') AS tss, a.userId, a.eventType, a.siteId, b.site_name AS siteName
FROM expdb.kafka_analytics_access_log_app
/*+ OPTIONS('scan.startup.mode'='latest-offset','properties.group.id'='DiveIntoBlinkExp') */ a
LEFT JOIN rtdw_dim.mysql_site_war_zone_mapping_relation
FOR SYSTEM_TIME AS OF a.procTime AS b ON CAST(a.siteId AS INT) = b.site_id
WHERE a.userId > 3 + 4;
Parsing & Validation
- Build the
flink-sql-parser
module, and you'll get the exact parser for Flink SQL dialect
- Call stack
// parse
parse:54, CalciteParser (org.apache.flink.table.planner.parse)
parse:96, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)// validation
-- goes to org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator#validate()
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate:150, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
validate:108, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
convert:201, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:99, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)
-
SqlNode
tree- Note that
FOR SYSTEM_TIME AS OF
syntax is translated to aSqlSnapshot
node
- Note that
Logical Planning
- Call stack
- Obviously, these are a bunch of recursive processes
-- goes to Calcite SqlToRelConverter
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:168, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:160, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:967, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlQuery:936, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:275, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlInsert:595, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:268, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:99, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)
-
Logical planning in Flink SQL yields a tree of
Operation
s (e.g.ModifyOperation
,QueryOperation
)- Just wrappers of
RelNode
s
- Just wrappers of
-
RelNode
tree-
SqlJoin
→LogicalCorrelate
(in Calcite this means nested-loop join) -
SqlSnapshot
→LogicalSnapshot
- etc.
-
- Output of
EXPLAIN
statement
-- In fact this is the original logical plan
== Abstract Syntax Tree ==
LogicalSink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- LogicalProject(tss=[FROM_UNIXTIME(/($0, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss')], userId=[$1], eventType=[$2], siteId=[$6], siteName=[$10])+- LogicalFilter(condition=[>($1, +(3, 4))])+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{6, 8}]):- LogicalProject(ts=[$0], userId=[$1], eventType=[$2], columnType=[$3], fromType=[$4], grouponId=[$5], siteId=[$6], merchandiseId=[$7], procTime=[PROCTIME()]): +- LogicalTableScan(table=[[hive, expdb, kafka_analytics_access_log_app]], hints=[[[OPTIONS inheritPath:[] options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])+- LogicalFilter(condition=[=(CAST($cor0.siteId):INTEGER, $0)])+- LogicalSnapshot(period=[$cor0.procTime])+- LogicalTableScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]])
All-Over Optimization w/ Physical Planning
- Call stack
-
CommonSubGraphBasedOptimizer
is a Flink-implemented optimizer that divides logical plan into sub-graphs bySinkBlock
s, and reuses common sub-graphs whenever available - For most scenarios, the logical plan is merely a single tree (
optimizeTree
)
-
-- goes to org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram#optimize()
optimizeTree:163, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:79, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:284, PlannerBase (org.apache.flink.table.planner.delegation)
translate:168, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1516, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:738, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:854, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:728, TableEnvironmentImpl (org.apache.flink.table.api.internal)
-
FlinkChainedProgram
breaks down to severalFlinkHepProgram
s (resemble toHepProgram
), which defines the order of rules to be attempted withHepPlanner
- This time a lot more rules of course
- Flink SQL handles entire physical planning process with
RelOptRule
s, along with logical/physical optimization
All
RuleSet
s are presented inFlinkStreamRuleSets
, some of them are shipped natively with Calcite
-
FlinkStreamProgram
actually build up the program sequence- The names are quite straightforward though
- At the end of
LOGICAL
, specializedConverterRule
s will convert CalciteRelNode
intoFlinkLogicalRel
- e.g.
LogicalCalc
→FlinkLogicalCalcConverter
→FlinkLogicalCalc
- i.e. Converted the convention to
FLINK_LOGICAL
- Logical optimization phase is somewhat hard to observe
- e.g.
- The optimized
StreamPhysicalRel
tree- Physical planning rules are almost all
ConverterRule
s-
FlinkLogicalRel
→StreamPhysicalRel
, conventionFLINK_LOGICAL
→STREAM_PHYSICAL
- e.g.
FlinkLogicalCalc
→StreamPhysicalCalcRule
→StreamPhysicalCalc
-
-
HepRelVertex
is the wrapper ofRelNode
inHepPlanner
- Physical planning rules are almost all
- Output of
EXPLAIN
statement
== Optimized Physical Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])+- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])+- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)])+- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
Pick two rules for some explanation
TEMPORAL_JOIN_REWRITE - LogicalCorrelateToJoinFromLookupTableRuleWithFilter
This rule matches
+- LogicalCorrelate:- [RelNode related to stream table]+- LogicalFilter(condition)+- LogicalSnapshot(time_attr)+- [RelNode related to temporal table]
and transforms into
+- LogicalJoin(condition):- [RelNode related to stream table]+- LogicalSnapshot(time_attr)+- [RelNode related to temporal table]
PHYSICAL - StreamPhysicalLookupJoinRule - SnapshotOnTableScanRule
This rule matches
+- FlinkLogicalJoin(condition):- [RelNode related to stream table]+- FlinkLogicalSnapshot(time_attr)+- FlinkLogicalTableSourceScan [w/ LookupTableSource]
and transforms into StreamPhysicalLookupJoin
Execution Planning & Codegen
- Call stack
-- goes to separate FlinkPhysicalRel#translateToExecNode()
generate:74, ExecNodeGraphGenerator (org.apache.flink.table.planner.plan.nodes.exec)
generate:54, ExecNodeGraphGenerator (org.apache.flink.table.planner.plan.nodes.exec)
translateToExecNodeGraph:312, PlannerBase (org.apache.flink.table.planner.delegation)
translate:164, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1518, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:740, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:856, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:730, TableEnvironmentImpl (org.apache.flink.table.api.internal)-- goes to separate ExecNodeBase#translateToPlan() & StreamExecNode#translateToPlanInternal()
translateToPlan:70, StreamPlanner (org.apache.flink.table.planner.delegation)
translate:165, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1518, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:740, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:856, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:730, TableEnvironmentImpl (org.apache.flink.table.api.internal)
- The
ExecNodeGraph
DAG- JSON representation of this DAG can be acquired or executed by
tableEnv.asInstanceOf[TableEnvironmentInternal].getJsonPlan(sql) / executeJsonPlan(plan)
- JSON representation of this DAG can be acquired or executed by
- Output of
EXPLAIN
statement
== Optimized Execution Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME((ts / 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])+- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])+- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[(userId > 7)])+- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
-
StreamExecNode
→Transformation
→ Generated DataStreamOperator
/Function
code- e.g.
StreamExecCalc
→OneInputStreamTransformation
→OneInputStreamOperator
/FlatMapFunction
- e.g.
-
Generated code will be dynamically compiled into Java class files through Janino
- You can view all generated code by setting debug output of
CompileUtils
- Too long, refer to https://pastebin.com/NCMSxh5h
- You can view all generated code by setting debug output of
We'll leave detailed explanation of this part for the next lecture
Get Our Hands Dirty
Question
- Are there any hidden trouble in the simple example program shown above?
- Try focus on the
LookupJoin
and consider its cache locality- In extreme conditions, a lookup-ed KV can be re-cached N times
Define An Option
Distributing lookup keys (according to hash) to sub-tasks seems better
In
ExecutionConfigOptions
...
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Boolean> TABLE_EXEC_LOOKUP_DISTRIBUTE_BY_KEY =key("table.exec.lookup.distribute-by-key").defaultValue(false).withDescription("Specifies whether to distribute lookups to sub-tasks by hash value of lookup key.");
Customize A Rule
When to apply this rule? --- After physical planning
-
What should we do? --- Insert a hash-by-key operation before
StreamPhysicalLookupJoin
-
FlinkRelDistribution
will do the work - Physical redistribution means
StreamPhysicalExchange
node
-
Note that there are 5 kinds of
RelTrait
in Flink SQL
class HashDistributedLookupJoinRule extends RelOptRule(operand(classOf[StreamPhysicalLookupJoin], any()),"HashDistributedLookupJoinRule") {override def matches(call: RelOptRuleCall): Boolean = {val tableConfig = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfigtableConfig.getConfiguration.getBoolean(ExecutionConfigOptions.TABLE_EXEC_LOOKUP_DISTRIBUTE_BY_KEY)}override def onMatch(call: RelOptRuleCall): Unit = {val originalLookupJoin: StreamPhysicalLookupJoin = call.rel(0)val joinInfo = originalLookupJoin.joinInfoval traitSet = originalLookupJoin.getTraitSetval requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys)val hashDistributedTraitSet = traitSet.replace(requiredDistribution).replace(FlinkConventions.STREAM_PHYSICAL).replace(RelCollations.EMPTY).replace(traitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)).replace(traitSet.getTrait(UpdateKindTraitDef.INSTANCE))val hashDistributedInput = new StreamPhysicalExchange(originalLookupJoin.getCluster,hashDistributedTraitSet,originalLookupJoin,requiredDistribution)call.transformTo(originalLookupJoin.copy(originalLookupJoin.getTraitSet, util.Arrays.asList(hashDistributedInput)))}
}object HashDistributedLookupJoinRule {val INSTANCE: RelOptRule = new HashDistributedLookupJoinRule
}
- There's a helper method
FlinkExpandConversionRule#satisfyDistribution()
(also used in two-stage aggregation), how lucky
val hashDistributedInput = FlinkExpandConversionRule.satisfyDistribution(FlinkConventions.STREAM_PHYSICAL,originalLookupJoin.getInput,requiredDistribution
)
Put Into Rule Set
- At the tail of
FlinkStreamRuleSets
val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(// hash distributed lookup join ruleHashDistributedLookupJoinRule.INSTANCE,// optimize agg ruleTwoStageOptimizedAggregateRule.INSTANCE,// incremental agg ruleIncrementalAggregateRule.INSTANCE,// optimize window agg ruleTwoStageOptimizedWindowAggregateRule.INSTANCE
)
Have A Try
- Rebuild
flink-table-api-java
&flink-table-planner-blink
module SET table.exec.lookup.distribute-by-key=true
== Optimized Physical Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])+- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])+- Exchange(distribution=[hash[siteId0]])+- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)])+- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])== Optimized Execution Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME((ts / 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])+- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])+- Exchange(distribution=[hash[siteId0]])+- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[(userId > 7)])+- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])