当前位置: 代码迷 >> 综合 >> Spark summer-framework
  详细解决方案

Spark summer-framework

热度:83   发布时间:2024-02-27 09:49:04.0

文章目录

    • 1.1 框架设计原理
    • 1.2 框架搭建
      • 1.2.1 Util
      • 1.2.2 core
  • 需求1:Top10

1.1 框架设计原理

? 框架设计思想可以采取两种模式,一种是MVC,另外一种是三层架构,由于我们这里没有页面展示的需求,所以我们暂时采取三层架构的方式。

  1. .三层架构的概念

    1. Controller:控制层,封装调度作用,数据的流转过程
    2. Service: 服务层,封装实际的计算逻辑
    3. DAO :Data Access Object,数据访问对象,专门用于和一些关系型数据互相访问,用来和数据源的连接
  2. .调用的顺序
    在这里插入图片描述

  3. .架构中一些其他的内容

    1. bean : 用来封装一个bean类,对数据的一些封装,采用样例类,声明在包对象中
    2. helper:辅助类,如累加器类
    3. Apllication : 应用程序,主程序
    4. Util : 工具类

1.2 框架搭建

在这里插入图片描述

1.2.1 Util

EnvUtils

– 如何实现三层框架共享数据呢?

  1. 实现原理:在当前线程中创建一个内存,将共享数据存放在这个内存中,这样三层架构均可以使用。
  2. 实现方式:
    a、在线程中一直就保留着可以共享数据的空间
    b、JDK API 提供了一个工具类,可以直接访问这个空间
    c、只要调用这个工具类(ThreadLocal)将数据存入到共享数据中,也可以从这个内存中调用共享内存中的数据
  3. 具体的步骤:
    a、"创建"一个共享数据 : 案例如下:
    “private val scLocal = new ThreadLocal[SparkContext] "
    b、将共享数据"放进"共享空间中
    “scLocal.set(sc)”
    c、从当前线程的共享空间中"获取"共享数据
    “scLocal.get()”
    d、将共享数据从共享空间中"清除”
    “scLocal.remove()”
package com.ifeng.summer.foramework.utilimport org.apache.spark.{
    SparkConf, SparkContext}object EnvUtils {
    // 创建一个共享数据private val scLocal = new ThreadLocal[SparkContext]//获取环境对象def getEnv() = {
    //从当前线程的共享空间中获取环境对象var sc: SparkContext = scLocal.get()if (sc == null) {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkApplication")sc = new SparkContext(sparkConf)scLocal.set(sc)}sc}// 清除对象def clean() = {
    getEnv().stop()// 将共享内存中的数据清除scLocal.remove()}}

PropertiesUtil

–动态获取连接的资源,将需要连接的资源配置文件放置在配置文件中,从配置文件中读取连接需要的资源,这样,当我们需要更换连接的资源时,只要修改配置文件即可。

这种思想是非常重要的,类似hadoop的MR,将资源和计算分离开,做资源的调度,扩展起来就非常的方便。

package com.ifeng.summer.foramework.utilimport java.util.ResourceBundleobject PropertiesUtil {
    //绑定配置文件val summer: ResourceBundle = ResourceBundle.getBundle("summers")def getValue(key: String): String = {
    //传入一个key,返回一个valuesummer.getString(key)}}

1.2.2 core

TApplication

package com.ifeng.summer.foramework.core
import java.net.{
    InetAddress, ServerSocket, Socket}import com.ifeng.summer.foramework.util.{
    EnvUtils, PropertiesUtil}
import org.apache.spark.{
    SparkConf, SparkContext}trait TApplication {
    var envdata: Any = null//第一步:初始化环境def start(t: String)(op: => Unit) = {
    if (t == "Socket") {
    envdata = new Socket(PropertiesUtil.getValue("serverhost"), PropertiesUtil.getValue("serverport").toInt)} else if (t == "ServerSocket") {
    envdata = new ServerSocket(PropertiesUtil.getValue("serverport").toInt)} else if (t == "Spark") {
    envdata = EnvUtils.getEnv()}//业务逻辑try {
    op} catch {
    case ex: Exception => println("op执行失败,原因是:" + ex.getMessage)}//环境关闭if (t == "ServerSocket") {
    val ServerSocket = envdata.asInstanceOf[ServerSocket]if (!ServerSocket.isClosed) {
    ServerSocket.close()}} else if (t == "Socket") {
    val socket = envdata.asInstanceOf[Socket]if (!socket.isClosed) {
    socket.close()}} else if (t == "Spark") {
    EnvUtils.clean()}}}

TController

package com.ifeng.summer.foramework.coretrait TController {
    //执行控制def execute() : Unit}

TService

package com.ifeng.summer.foramework.coretrait TService {
    //数据分析def analysis() : Any//数据分析def analysis(data :Any) : Any}

TDAO

package com.ifeng.summer.foramework.coreimport com.ifeng.summer.foramework.util.EnvUtils
import org.apache.spark.rdd.RDDtrait TDAO {
    def readFile(path :String) ={
    val fileRDD: RDD[String] = EnvUtils.getEnv().textFile(path)fileRDD}}

需求1:Top10

在这里插入图片描述

bean

package object bean {
    //用户访问动作表case class UserVisitAction(date: String,//用户点击行为的日期user_id: String,//用户的IDsession_id: String,//Session的IDpage_id: String,//某个页面的IDaction_time: String,//动作的时间点search_keyword: String,//用户搜索的关键词click_category_id: String,//某一个商品品类的IDclick_product_id: String,//某一个商品的IDorder_category_ids: String,//一次订单中所有品类的ID集合order_product_ids: String,//一次订单中所有商品的ID集合pay_category_ids: String,//一次支付中所有品类的ID集合pay_product_ids: String,//一次支付中所有商品的ID集合city_id: String //城市 id)
}

HotCategoryTOP10ApplicationReview

import com.atguigu.core.hotcategorytop10review.controller.HotCategoryTop10ControllerReview
import com.atguigu.summer.framework.core.TApplicationobject HotCategoryTOP10ApplicationReview  extends App with   TApplication{
    start("Spark"){
    val hotCategoryTop10ControllerReview = new HotCategoryTop10ControllerReviewhotCategoryTop10ControllerReview.execute()}}

HotCategoryTop10ControllerReview

import com.atguigu.core.hotcategorytop10review.service.HotCategoryTop10ServiceReview
import com.atguigu.summer.framework.core.TControllerclass HotCategoryTop10ControllerReview extends  TController{
    private val hotCategoryTop10ServiceReview = new HotCategoryTop10ServiceReviewoverride def execute(): Unit = {
    val result: Array[(String, (Int, Int, Int))] = hotCategoryTop10ServiceReview.analysis()result.foreach(println)}
}

HotCategoryTop10DAOReview

import com.atguigu.core.hotcategorytop10review.bean.UserVisitAction
import com.atguigu.summer.framework.core.TDAO
import org.apache.spark.rdd.RDD/*** @Description 资源连接层* ** @author lianzhipeng* @create 2020-06-11 0:38:25*/
class HotCategoryTop10DAOReview extends TDAO {
    def getUserVisitAction() = {
    // 读取路径下的数据val fileRDD: RDD[String] = readFile("input/user_visit_action.txt")// 将数据封装对象fileRDD.map(data => {
    val datasArray: Array[String] = data.split("_")UserVisitAction(datasArray(0),datasArray(1),datasArray(2),datasArray(3),datasArray(4),datasArray(5),datasArray(6),datasArray(7),datasArray(8),datasArray(9),datasArray(10),datasArray(11),datasArray(12))})}
}

HotCategoryTop10ServiceReview

import java.ioimport com.atguigu.core.hotcategorytop10review.bean
import com.atguigu.core.hotcategorytop10review.dao.HotCategoryTop10DAOReview
import com.atguigu.summer.framework.core.TService
import org.apache.spark.rdd.RDD/*** @Description 计算逻辑层*** @author lianzhipeng* @create 2020-06-11 0:37:41*/
class HotCategoryTop10ServiceReview  extends  TService{
    private val hotCategoryTop10DAOReview = new HotCategoryTop10DAOReviewoverride def analysis() = {
    // 获取数据,数据已经被封装成一个一个的对象val UserRDD: RDD[bean.UserVisitAction] = hotCategoryTop10DAOReview.getUserVisitAction()//1. 数据进行切分,根据每条数据是哪种行为,将数据转换为:// (品类,(1,0,0)) : 点击行为// (品类,(0,1,0)) : 下单行为// (品类,(0,0,1)) : 支付行为val cagegoryToOneRDD: RDD[(String, (Int, Int, Int))] = UserRDD.flatMap(UserBean => {
    if (UserBean.click_category_id != "-1") {
    List((UserBean.click_category_id, (1, 0, 0)))} else if (UserBean.order_category_ids != "null") {
    val ids: Array[String] = UserBean.order_category_ids.split(",")ids.map(id => (id, (0, 1, 0)))} else if (UserBean.pay_category_ids != "null") {
    val ids: Array[String] = UserBean.pay_category_ids.split(",")ids.map(id => (id, (0, 0, 1)))} else {
    Nil}})//2. 按照品类进行分组聚合:// 品类,(clickCount,orderCont,payCount)val categorySumRDD: RDD[(String, (Int, Int, Int))] = cagegoryToOneRDD.reduceByKey {
    case ((click, order, pay), (click1, order1, pay1)) => {
    (click + click1, order + order1, pay + pay1)}}//3. 排序取前10val result: Array[(String, (Int, Int, Int))] = categorySumRDD.sortBy(_._2,false).take(10)result}override def analysis(data: Any): Any = ???
}

在这里插入图片描述

  相关解决方案