pom文件
<name>Flink Quickstart Job</name><url>http://www.myorganization.org</url><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.build.timestamp.format>yyyyMMddHHmmss</maven.build.timestamp.format><scala.version>2.11.11</scala.version><scala.binary.version>2.11</scala.binary.version><hadoop.version>3.0.0</hadoop.version><flink.version>1.8.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.0.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.zm.kafkaTs.KafkaOtherWriteTS</mainClass></transformer></transformers></configuration></execution></executions></plugin><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><!-- Scala Compiler --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>build-helper-maven-plugin</artifactId><version>1.7</version><executions><!-- Add src/main/scala to eclipse build path --><execution><id>add-source</id><phase>generate-sources</phase><goals><goal>add-source</goal></goals><configuration><sources><source>src/main/scala</source></sources></configuration></execution><!-- Add src/test/scala to eclipse build path --><execution><id>add-test-source</id><phase>generate-test-sources</phase><goals><goal>add-test-source</goal></goals><configuration><sources><source>src/test/scala</source></sources></configuration></execution></executions></plugin></plugins></build><profiles><profile><id>add-dependencies-for-IDEA</id><activation><property><name>idea.version</name></property></activation><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>compile</scope></dependency></dependencies></profile></profiles>
代码
package com.zm.mysqlTsimport org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._object Data2Mysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval DString: DataStream[String] = env.readTextFile(args(0))val result: DataStream[(String, Integer)] = DString.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1).map(x=>{(x._1,(new Integer(x._2)))})val sink: Conn2Mysql = new Conn2Mysql("jdbc:mysql://ip:3306/zm_nuwa?useUnicode=true&characterEncoding=utf-8&useSSL=false","bi_sa","56RyEjcTMJN")// result.print()result.addSink(sink)//启动env.execute()}}
Conn2Mysql类
package com.zm.mysqlTsimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._class Conn2Mysql(url:String,user:String,pwd:String) extends RichSinkFunction[(String,Integer)]{var conn:Connection=_var pres:PreparedStatement = _
// var username = "root";
// var password = "123456";
// var dburl = "jdbc:mysql://192.168.6.132:3306/hgs?useUnicode=true&characterEncoding=utf-8&useSSL=false";var sql = "replace into zjf_wc(word,num) values(?,?)";override def invoke(value:(String, Integer) ) {pres.setString(1, value._1);pres.setInt(2,value._2);pres.executeUpdate();System.out.println("values :" +value._1+"--"+value._2);}override def open( parameters:Configuration) {Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection(url, user, pwd);pres = conn.prepareStatement(sql);super.close()}override def close() {pres.close();conn.close();}}