本博文程序是读取hadoop的hdfs中的文件,使用正则化解析出规定格式的数据,然后加载到sparkSQL数据库中。
正则化如果不太了解,请看正则表达式30分钟入门教程
文件内容大致为:
CREATE TABLE IF NOT EXISTS `rs_user` (
`id` mediumint(8) unsigned NOT NULL AUTO_INCREMENT,
`uid` mediumint(8) unsigned DEFAULT NULL,
`url` varchar(255) DEFAULT NULL,
`title` varchar(1024) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=gbk AUTO_INCREMENT=59573 ;
INSERT INTO `rs_user` (`id`, `uid`, `url`, `title`) VALUES
(1, 269781, 'http://rs.xidian.edu.cn/forum.php?mod=viewthread&tid=721360', '[体育][其他][2002年亚运会羽毛球男单决赛 陶菲克vs李炫一][rmvb][国语]'),
(2, 256188, 'http://rs.xidian.edu.cn/forum.php?mod=viewthread&tid=721360', '[体育][其他][2002年亚运会羽毛球男单决赛 陶菲克vs李炫一][rmvb][国语]'),
package com.spark.firstApp
import org.apache.spark.SparkContextimport org.apache.spark._
import org.apache.log4j.{Level, Logger}
object HelloSpark {
case class Person(id:Int,uid:String,url:String,title:String)
def main(args:Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//去除log日志
val conf = new SparkConf().setAppName("HelloSpark")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val r = """\d*, \d*, 'http://[a-z/.?&=0-9]*', '[^']+'""".r
val data=sc.textFile("/user/root/home/rs_user.sql").map(s=>s.mkString).
map(z=>r.findAllIn(z).toList).filter(_.length>0).map(_.head.split(", ").toList)
val people=data.map(p=>Person(p(0).toInt,p(1),p(2),p(3))).toDF()
people.registerTempTable("people")
val teen=sqlContext.sql("SELECT title from people where uid='199988'")
teen.map(t => "title: " + t).collect().foreach(println)
sc.stop()
}
}
提交任务:
[email protected]:/# spark-submit --master spark://192.168.0.10:7077 --class com.spark.firstApp.HelloSpark --executor-memory 100m /root/IdeaProjects/FirstSparkApp/out/artifacts/FirstSparkAppJar/FirstSparkAppJar.jar
输出结果:
Spark assembly has been built with Hive, including Datanucleus jars on classpath
15/04/15 21:53:56 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/04/15 21:53:56 INFO Remoting: Starting remoting
15/04/15 21:53:57 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:[email protected]:52584]
15/04/15 21:53:57 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/04/15 21:53:57 INFO server.AbstractConnector: Started [email protected]:54183
15/04/15 21:54:03 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/04/15 21:54:03 INFO server.AbstractConnector: Started [email protected]:4040
15/04/15 21:54:12 WARN util.SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
15/04/15 21:54:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/04/15 21:54:21 WARN snappy.LoadSnappy: Snappy native library not loaded
15/04/15 21:54:21 INFO mapred.FileInputFormat: Total input paths to process : 1
title: ['[其他][视频][LOL][微笑卷毛1月13号双排三场合集][微笑卷毛解说][mp4]']
title: ['[其他][视频][LOL][SMZ24解说:S5盲僧李青的全场gank之旅_高清][SMZ24解说][mp4]']