当前位置: 代码迷 >> 综合 >> Flink Demo测试------Kafka连接(Flink1.8;Hadoop3.0)
  详细解决方案

Flink Demo测试------Kafka连接(Flink1.8;Hadoop3.0)

热度:27   发布时间:2023-10-17 03:34:15.0

pom文件

 <name>Flink Quickstart Job</name><url>http://www.myorganization.org</url><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.build.timestamp.format>yyyyMMddHHmmss</maven.build.timestamp.format><scala.version>2.11.11</scala.version><scala.binary.version>2.11</scala.binary.version><hadoop.version>3.0.0</hadoop.version><flink.version>1.8.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.zm.kafkaTs.KafkaOtherWriteTS</mainClass></transformer></transformers></configuration></execution></executions></plugin><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><!-- Scala Compiler --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>build-helper-maven-plugin</artifactId><version>1.7</version><executions><!-- Add src/main/scala to eclipse build path --><execution><id>add-source</id><phase>generate-sources</phase><goals><goal>add-source</goal></goals><configuration><sources><source>src/main/scala</source></sources></configuration></execution><!-- Add src/test/scala to eclipse build path --><execution><id>add-test-source</id><phase>generate-test-sources</phase><goals><goal>add-test-source</goal></goals><configuration><sources><source>src/test/scala</source></sources></configuration></execution></executions></plugin></plugins></build><profiles><profile><id>add-dependencies-for-IDEA</id><activation><property><name>idea.version</name></property></activation><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>compile</scope></dependency></dependencies></profile></profiles>

代码

import java.util.Propertiesimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchemaobject KafkaOtherWriteTS {def main(args: Array[String]): Unit = {
//kafka测试通过val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.enableCheckpointing(5000)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)val kafkaProps: Properties = new Properties()kafkaProps.setProperty("bootstrap.servers","***:9092")kafkaProps.setProperty("group.id",args(0))kafkaProps.setProperty("auto.offset.reset","earliest")val consumer = new FlinkKafkaConsumer010[String]("****",new SimpleStringSchema,kafkaProps)val transaction: DataStream[String] = env.addSource(consumer)transaction.print()env.execute(this.getClass.getSimpleName)}}

 

  相关解决方案