1. pom依赖
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.11.0</flink.version><avro.version>1.8.2</avro.version><java.version>1.8</java.version><kubernetes.client.version>4.9.2</kubernetes.client.version><scala.binary.version>2.11</scala.binary.version><hadoop.version>2.8.2</hadoop.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Table API and SQL components --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table</artifactId><version>${flink.version}</version><type>pom</type></dependency><!-- blink planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- use the Table API & SQL for defining pipelines --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- kafka connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink-sql-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink-json --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- flink-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version></dependency><!-- flink-webUI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version></dependency><!-- 解决日志问题 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version></dependency></dependencies>
2. flinksql demo
数据格式为: {"age":"98","id":"5886f218-52cf-467d-b590-872c4dc18a6c","sex":"男","time":"1606558449","userName":"张三"} (time是关键字,所以要加 ` 号)
EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(build);
//kafka sourceString sourceSql = "create table flink_kafka_json_t2(\n" +" userName STRING,\n" +" age STRING,\n" +" sex STRING,\n" +" id STRING,\n" +" `time` STRING\n" +") with (\n" +" 'connector' = 'kafka',\n" +" 'topic' = 'flink_kafka_json_t2',\n" +" 'properties.group.id' = 'testGroup',\n"+" 'scan.startup.mode' = 'latest-offset',\n" +" 'properties.bootstrap.servers' = 'ip:9092',\n" +" 'format' = 'json'\n" +")";tEnv.executeSql(sourceSql);//sink到mysqlString mysqlsql = "CREATE TABLE t_user (\n" +" id STRING,\n" +" userName STRING,\n" +" age STRING,\n" +" sex STRING\n" +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'username' = 'root',\n" +" 'password' = '123456',\n" +" 'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8',\n" +" 'table-name' = 't_user'\n" +")";tEnv.executeSql(mysqlsql);tEnv.executeSql("insert into t_user SELECT id,userName,age,sex FROM flink_kafka_json_t2");
3. 具体的webUi 可以在log日志中搜索 http:可以看到web 地址 打开就可以了