当前位置: 代码迷 >> 综合 >> flink1.11 sql本地运行demo 本地webUI可视解决
  详细解决方案

flink1.11 sql本地运行demo 本地webUI可视解决

热度:95   发布时间:2023-10-17 02:58:52.0

1. pom依赖

  <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.11.0</flink.version><avro.version>1.8.2</avro.version><java.version>1.8</java.version><kubernetes.client.version>4.9.2</kubernetes.client.version><scala.binary.version>2.11</scala.binary.version><hadoop.version>2.8.2</hadoop.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Table API and SQL components --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table</artifactId><version>${flink.version}</version><type>pom</type></dependency><!-- blink planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- use the Table API & SQL for defining pipelines --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- kafka connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink-json --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- flink-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version></dependency><!-- flink-webUI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version></dependency><!-- 解决日志问题 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version></dependency></dependencies>

2. flinksql demo

数据格式为: {"age":"98","id":"5886f218-52cf-467d-b590-872c4dc18a6c","sex":"男","time":"1606558449","userName":"张三"}  (time是关键字,所以要加 ` 号)

 EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(build);
//kafka sourceString sourceSql = "create table flink_kafka_json_t2(\n" +"  userName STRING,\n" +"  age STRING,\n" +"  sex STRING,\n" +"  id STRING,\n" +"  `time` STRING\n" +") with (\n" +"    'connector' = 'kafka',\n" +"    'topic' = 'flink_kafka_json_t2',\n" +"    'properties.group.id' = 'testGroup',\n"+"    'scan.startup.mode' = 'latest-offset',\n" +"    'properties.bootstrap.servers' = 'ip:9092',\n" +"    'format' = 'json'\n" +")";tEnv.executeSql(sourceSql);//sink到mysqlString mysqlsql = "CREATE TABLE t_user (\n" +"  id STRING,\n" +"  userName STRING,\n" +"  age STRING,\n" +"  sex STRING\n" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'username' = 'root',\n" +"   'password' = '123456',\n" +"   'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8',\n" +"   'table-name' = 't_user'\n" +")";tEnv.executeSql(mysqlsql);tEnv.executeSql("insert into t_user SELECT id,userName,age,sex FROM flink_kafka_json_t2");

3. 具体的webUi 可以在log日志中搜索 http:可以看到web 地址 打开就可以了

flink1.11 sql本地运行demo 本地webUI可视解决

有个问题,原本我是先打印kafka消息,后面执行mysql sink 发现mysql sink并没有执行,也就是怎么一次执行多个sink,有知道的小伙伴欢迎留言!!

  相关解决方案