当前位置: 代码迷 >> 综合 >> spark:sparksql:jdbc测试(mysql)
  详细解决方案

spark:sparksql:jdbc测试(mysql)

热度:58   发布时间:2024-02-05 10:12:55.0
/***  数据源:JDBC* @param spark*/def testJDBC(spark: SparkSession): Unit = {//    从机器1的mysql读取数据println("========================第一种读取mysql方式================================")//默认partation为1val url1: String = "jdbc:mysql://127.0.0.1/test?useUnicode=true&characterEncoding=utf-8"val table1 = "tb_score"val properties1: Properties = new Properties()properties1.setProperty("user","root")properties1.setProperty("password","root")properties1.setProperty("driver","com.mysql.jdbc.Driver")val jdbcDF1: DataFrame = spark.read.jdbc(url1, table1, properties1)jdbcDF1.show()println(jdbcDF1.rdd.getNumPartitions) //查看并发度 默认返回1val jdbcDF1New: DataFrame = jdbcDF1.union(jdbcDF1) //将两个dataframe做合并jdbcDF1New.show()logger.info("第一种读取mysql完成!")println("========================第二种读取mysql方式================================")//自定义partation数量val url2: String = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8"val table2 = "tb_score"val colName: String = "userid"val lowerBound = 1val upperBound = 10000val numPartions = 10val properties2: Properties = new Properties()properties2.setProperty("user","root")properties2.setProperty("password","root")properties2.setProperty("driver","com.mysql.jdbc.Driver")val jdbcDF2: DataFrame = spark.read.jdbc(url2, table2,colName,lowerBound,upperBound,numPartions,properties2)jdbcDF2.show()println(jdbcDF2.rdd.getNumPartitions) //查看并发度 返回10println("========================第三种读取mysql方式================================")val jdbcDF3: DataFrame = spark.read.format("jdbc").option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "tb_score")
//      .option("dbtable", "(select subject,score from tb_score) s")//也可以写查询字句的语句,但要括起来并给一个别名.load()jdbcDF3.show()println(jdbcDF3.rdd.getNumPartitions) //查看并发度 返回10println("--------------------------------------------------------------------------")jdbcDF1.createTempView("tb_score")//    jdbcDF1.createOrReplaceTempView("tb_score")//    jdbcDF1.createGlobalTempView("tb_score") //注册成全局临时表,可在同一spark应用程序的多个session中共享,引用全局临时表时需用global_temp 进行标识 如下示例:val newspark :SparkSession = spark.newSession()//    val res :DataFrame = newspark.sql("SELECT userid,score FROM global_temp.tb_score") // 引用全局临时表(使用一个新的SparkSession测试,验证了全局临时表是可以跨会话进行查询的)val results:DataFrame = spark.sql("SELECT userid,score FROM tb_score") //引用局部临时表var scoreflag =90val results1:DataFrame = spark.sql("SELECT userid,score FROM tb_score").where(s"score >${scoreflag}")results.show()results1.show()//将mysql表中的数据装入集合//collect()val mysqlRows:Array[Row] = results.collect()var lst = List[String]() //定义一个集合用于封装表中的数据for (row <- mysqlRows){val userid: String = row.getAs[String]("userid")val score: Double = row.getAs[Double]("score")lst = userid :: lst}//得到集合lst,就可以进行接下来的操作println(lst.mkString(","))//collectAsList()val ls:util.List[Row] = results.collectAsList()var lis = List[String]()for(i <- 0 to ls.size()-1){val exe_pro:String = ls.get(i).get(0)+"-"+ls.get(i).get(1) //如001-90.0println(exe_pro)lis = exe_pro :: lis}println(lis.mkString(","))//stripMargin和多行字符串的使用////    val tmpfiled2 = spark.sql(//      """//        |select brand,regionname,branchname,workno,employeename,entrydate,historyoffercount from t_cal_tmp1//      """.stripMargin     //https://blog.csdn.net/weixin_38750084/article/details/99643789//    )println("========================写出到mysql第一种方式================================")//注意:mode(SaveMode.Overwrite)参数 这里使用Overwrite时会删掉表然后自动创建表,但Append模式不会删掉表,所以使用Append模式时可以自己定义表(自己定义字段类型提前在数据库里创建表)//推送到mysql secondval coonProperties = new Properties()coonProperties.setProperty("user", ConfigUtils.mysqlUser)coonProperties.setProperty("password", ConfigUtils.mysqlPw)coonProperties.setProperty("driver", ConfigUtils.mysqlDriver)results.show()//将结果df保存到mysql表results.write.mode(SaveMode.Overwrite).jdbc(ConfigUtils.mysqlUrl, "sparksql_auo_cre_new_tb_score1", coonProperties) //Append 追加 Overwrite 覆盖 注意:表自动创建println("========================写出到mysql第二种方式================================")results.write.mode(SaveMode.Overwrite).option("createTableColumnTypes","userid text,score text").jdbc(ConfigUtils.mysqlUrl,"sparksql_auo_cre_new_tb_score2",coonProperties)println("========================写出到mysql第三种方式================================")results.write.mode(SaveMode.Overwrite).format("jdbc").option("url", ConfigUtils.mysqlUrl).option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "sparksql_auo_cre_new_tb_score3").save()}

 

  相关解决方案