什么是State Processor
API
官方文档说明:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/libs/state_processor_api.html
目的
使用 State Processor API 可以 读取、写入和修改 savepoints 和 checkpoints ,也可以转为SQL查询来分析和处理状态数据。定位作业中的问题。
使用方式介绍
引入pom
<!--读checkpoint--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api_2.11</artifactId><version>1.12.3</version></dependency>
读取keyed state时,使用 readKeyedState 指定uid和KeyedStateReaderFunction<KeyType, OutputType> 函数来获取对应的 state。(读哪个算子的状态就使用作业中算子的uid)
package com.d4t.dataplatform.runner;import java.io.Serializable;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;import com.d4t.dataplatform.runner.functions.rulerunner.RuleCalculateProcessFunction;/*** @author sanhongbo* @date 2022/1/10* @description 读取checkpoint**/
public class ReadCheckpoint {
public static void main(String[] args) throws Exception {
final String Uid = RuleCalculateProcessFunction.class.getSimpleName();final ParameterTool parameterTool = ParameterTool.fromArgs(args);final String checkpointPath = parameterTool.get("checkpoint.path");// set up the batch execution environmentfinal ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<KeyedMapState> keyedMapStateDataSet = Savepoint.load(env, checkpointPath ,new MemoryStateBackend()).readKeyedState(Uid, new ReaderFunction());keyedMapStateDataSet.writeAsText("hdfs:///flink/state/test");// execute programenv.execute("read the list state");}static class KeyedMapState implements Serializable {
String key;String mapKey;Object value;@Overridepublic String toString() {
return "KeyedMapState{" +key + ',' +mapKey + ',' +value +'}';}}static class ReaderFunction extends KeyedStateReaderFunction<String, KeyedMapState> {
private transient MapState<String, Object> mapState;@Overridepublic void open(Configuration parameters) {
/*** 状态描述符*/final MapStateDescriptor<String, Object> DESCRIPTOR_MAP_STATE =new MapStateDescriptor<>("XXXXXXX", String.class, Object.class);mapState = getRuntimeContext().getMapState(DESCRIPTOR_MAP_STATE);}@Overridepublic void readKey(String key,Context ctx,Collector<KeyedMapState> out) throws Exception {
Iterable<String> keys = mapState.keys();for (String s : keys) {
if(s.contains("XXXXXX")){
KeyedMapState km = new KeyedMapState();km.key = key;km.mapKey = s;km.value = mapState.get(s);out.collect(km);}}}}}
打包运行
# 并行度可与读取state的作业保持一致 否则容易内存溢出
flink run -d -p 6 -t yarn-per-job -Dtaskmanager.memory.process.size=3072mb -Dtaskmanager.memory.managed.size=0 -ynm map-state -c com.d4t.dataplatform.runner.ReadCheckpoint runner-0.1-jar-with-dependencies.jar --job.name map-state --checkpoint.path hdfs:///flink/checkpoint/rule_runner_20220108/820008c1f70ed755109219f40fc4efb9/chk-558
执行完后,将文件拉到本地。合并文件
hdfs dfs -ls /flink/state/test/ | awk '{print $NF}' | xargs -I{
} hdfs dfs -copyToLocal {
}
cat * >> total