当前位置: 代码迷 >> SQL >> spark定做之六:sql版start.scala
  详细解决方案

spark定做之六:sql版start.scala

热度:294   发布时间:2016-05-05 11:17:12.0
spark定制之六:sql版start.scala

上个版本的start.scala用的是HiveContext,这个是SQLContext的,不需编译。

# cat testperson.txt #字段用table键分隔

zs 10 30.0
li 12 32.0

# spark-shell -i:start.scala

scala> help

根据提示逐步运行

import org.apache.spark.sql.SchemaRDD    var FIELD_SEPERATOR = "\t"  var RECORD_SEPERATOR = "\n"  var lastrdd : SchemaRDD = null    object MyFileUtil extends java.io.Serializable {      import org.apache.hadoop.fs.Path      import org.apache.hadoop.fs.FileSystem      import org.apache.hadoop.fs.FileStatus      import scala.collection.mutable.ListBuffer        def regularFile(filepath:String):String = {          if(filepath == "") {              filepath;          } else if(filepath.startsWith("hdfs:")) {              filepath          } else if(filepath.startsWith("file:")) {              filepath          } else if(filepath.startsWith("/")) {              "file://" + filepath          } else {              val workdir = System.getProperty("user.dir")              "file://" + workdir + "/" + filepath          }      }        var SAFEMINPATH_LENGTH : Int = 24        def getFileSystem(filepath:String) = {          if(filepath.startsWith("hdfs:")) {              FileSystem.get(new org.apache.hadoop.conf.Configuration());          } else if(filepath.startsWith("file:")) {              FileSystem.getLocal(new org.apache.hadoop.conf.Configuration());          } else {              throw new Exception("file path invalid")          }      }        def deletePath(filepath:String) = {          if(filepath.length < SAFEMINPATH_LENGTH)              throw new Exception("file path is to short")          var fs : FileSystem = getFileSystem(filepath)          if (fs.exists(new Path(filepath))) {              fs.delete(new Path(filepath), true);          }      }        def listFile(fs:FileSystem, path:Path, pathlist:ListBuffer[Path], statuslist:ListBuffer[FileStatus]=null) {          if ( fs.exists(path) ) {              val substatuslist =  fs.listStatus(path);              for(substatus <- substatuslist){                  if(statuslist != null)                      statuslist.append(substatus)                  if(substatus.isDir()){                      listFile(fs,substatus.getPath(),pathlist);                  }else{                      pathlist.append(substatus.getPath());                  }              }          }      }        def hasContext(filepath:String) = {          val realpath = regularFile(filepath)          val fs = getFileSystem(realpath)           val pathlist = ListBuffer[Path]()          val statuslist = ListBuffer[FileStatus]()          listFile(fs,new Path(filepath),pathlist,statuslist)          var length:Long = 0          for( status <- statuslist )              length += status.getLen()          length > 0      }  }    org.apache.spark.repl.Main.interp.command("""  class MySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) extends java.io.Serializable {        def go() = {          var startstr = ""          var endstr = RECORD_SEPERATOR          val result = rdd.collect          result.foreach( x =>              print(x.mkString(startstr,FIELD_SEPERATOR,endstr))            )      }        def result() = {          rdd.collect      }        def saveto(output: String) = {          import org.apache.hadoop.io.{NullWritable,Text}          var startstr = ""          var endstr = RECORD_SEPERATOR          if(output.startsWith("hdfs:")) {              val outputpath = MyFileUtil.regularFile(output)              MyFileUtil.deletePath(outputpath)              rdd.map(x =>                     (NullWritable.get(), new Text(x.mkString(FIELD_SEPERATOR)))                  ).saveAsHadoopFile[                    org.apache.hadoop.mapred.TextOutputFormat[NullWritable, Text]                  ](outputpath)          } else {              val outputpath = MyFileUtil.regularFile(output)              MyFileUtil.deletePath(outputpath)              val result = rdd.collect()              val writer = new java.io.FileWriter(output)              result.foreach(x =>                   writer.write(x.mkString(startstr,FIELD_SEPERATOR,endstr))                )              writer.close()          }      }  }  object MySchemaRDD {      implicit def toMySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) = new MySchemaRDD(rdd)  }  """)    val ssc = new org.apache.spark.sql.SQLContext(sc)  import ssc._  import MySchemaRDD._  def getRegisterString(rddname:String,classname:String,tablename:String,tabledef:String) : String = {      val members = tabledef.trim.split(",").map(_.trim.split(" ").filter(""!=)).map(x => (x(0).trim,x(1).trim.head.toString.toUpperCase+x(1).trim.tail))      val classmemberdef = members.map(x => (x._1+":"+x._2)).mkString(",")      val convertstr = members.map(x => x._2).zipWithIndex.map(x => "t("+x._2+").to"+x._1).mkString(",")      return s"""          case class ${classname}(${classmemberdef})          val schemardd = ${rddname}.map(_.split("${FIELD_SEPERATOR}")).map(t=>${classname}(${convertstr}))          ssc.registerRDDAsTable(schemardd,"${tablename}")      """  }  org.apache.spark.repl.Main.interp.command("""  class MyCommandTranslator(cmd:String) extends java.io.Serializable {        def go()(implicit f: SchemaRDD => MySchemaRDD) = {          lastrdd = sql(cmd)          lastrdd.go()      }        def saveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {          lastrdd = sql(cmd)          lastrdd.saveto(output)      }        def result()(implicit f: SchemaRDD => MySchemaRDD) = {          lastrdd = sql(cmd)          lastrdd.result()      }    //    def hqlgo()(implicit f: SchemaRDD => MySchemaRDD) = {  //        lastrdd = hql(cmd)  //        lastrdd.go()  //    }  //  //    def hqlsaveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {  //        lastrdd = hql(cmd)  //        lastrdd.saveto(output)  //    }  //  //    def hqlresult()(implicit f: SchemaRDD => MySchemaRDD) = {  //        lastrdd = hql(cmd)  //        lastrdd.result()  //    }        def defineas(tabledef:String) = {          if( tabledef != "" ) {              org.apache.spark.repl.Main.interp.command(                   getRegisterString(cmd,cmd.toUpperCase,cmd,tabledef)              )          } else {              org.apache.spark.repl.Main.interp.command(                  "ssc.registerRDDAsTable(${cmd},\"${cmd}\")"              )          }      }        def from(filepath:String) {          if( cmd.trim.startsWith("create table ") ) {              val tablename = cmd.trim.substring(13).trim().split(" ")(0)              val leftstr = cmd.substring(13).trim().substring(tablename.length).trim()              val tabledef = leftstr.substring(1,leftstr.length-1).trim()              val realfile = MyFileUtil.regularFile(filepath)              org.apache.spark.repl.Main.interp.command(                  "val "+tablename+" = sc.textFile(\""+realfile+"\")"              )              new MyCommandTranslator(tablename).defineas(tabledef)          } else {              println("usage:")              println("\"create table sometablename (field1 string,field2 int...)\" from \"somefile or hdfs:somepath\"")          }      }        def isok() = {          if(cmd.contains(".") || cmd.contains("/")) {              MyFileUtil.hasContext(cmd)          } else {              val res = sql(s"select count(*) from ${cmd}").result()              val count = res(0).getLong(0)              count > 0          }      }  }  object MyCommandTranslator {      implicit def stringToTranslator(cmd:String) = new MyCommandTranslator(cmd)        def show(tabledata:Array[org.apache.spark.sql.Row]) = {          tabledata.foreach( x => println(x.mkString("\t")))      }  }  """)    def to = MyCommandTranslator  import MyCommandTranslator._    val onetable = sql("select 1 as id")  ssc.registerRDDAsTable(onetable,"onetable")    def help = {      println("""example:          "create table testperson (name string,age int,weight double)" from "testperson.txt"          "select * from testperson" go          "select * from testperson" saveto "somelocalfile.txt"          "select * from testperson" saveto "hdfs:/basedir/parentdir/testperson"          "testperson" isok          "somelocalfile.txt" isok          "hdfs:/basedir/parentdir/testperson" isok          val data = "select * from testperson" result          to show data          val somerdddata = sc.textFile("hdfs:/basedir/parentdir/testperson")<span style="font-family: Arial, Helvetica, sans-serif;"> </span>        "somerdddata" defineas "name string,age int,weight double"          "select * from somerdddata" go          if you want to see the help of enveronment, please type :help          """)  }


  相关解决方案