当前位置: 代码迷 >> 综合 >> RDD算子大全,动作算子(active,non-lazy),装换算子(transformation,lazy)
  详细解决方案

RDD算子大全,动作算子(active,non-lazy),装换算子(transformation,lazy)

热度:75   发布时间:2024-02-21 16:59:17.0

动作算子

  • 动作算子
    • count
    • countByKey
    • countByValue
    • collect
    • top
    • take
    • takeOrdered(n)
    • takeSample(withReplacement,num,seed)
    • first
    • reduce
    • foreach
    • foreachPartition
    • lookup
    • max
    • min
    • saveAsTextFile(path)
    • saveAsSequenceFile(path)
    • saveAsObjectFile(path)
    • aggregate
    • fold(num)(func)
    • zip
  • 转换算子

本质上动作算子通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行
所有的动作算子都是急迫型(non-lazy),RDD遇到Action就会立即计算

有向无环图

动作算子

count

作用:返回数据集中元素的个数
举例

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.count//6

countByKey

作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
举例

val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd.countByKey//scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

countByValue

作用:根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
举例

val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd.countByValue//scala.collection.Map[(Int, Int),Long] = Map((3,6) -> 1, (1,4) -> 1, (1,3) 

collect

作用:以Array返回RDD的所有元素。一般在过滤或者处理足够小的结果的时候使用
举例

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.collect//Array[Int] = Array(1, 2, 3, 4, 5, 6)

top

作用:自定义一个排序规则(倒序),返回最大的n个数组成的数组
举例

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.top(3)// Array[Int] = Array(6, 5, 4)

take

作用:返回当前RDD中的前n个元素
举例

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.take(3)// Array[Int] = Array(1, 2, 3)

takeOrdered(n)

作用:返回该RDD排序后的前n个元素组成的数组
举例

val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd.takeOrdered(3)// Array[Int] = Array(2, 3, 4)

takeSample(withReplacement,num,seed)

作用

  • withReplacement:结果中是否可重复
  • num:取多少个
  • seed:随机种子
  • 返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定的随机数生成器种子
  • takeSample()函数和sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组

举例

val rdd1 = sc.makeRDD(1 to 10)
rdd1.takeSample(true,4,10)//Array[Int] = Array(10, 10, 2, 3)

first

作用:返回当前RDD的第一个元素
举例

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.first// 1

reduce

作用:根据指定函数,对RDD中的元素进行两两计算,返回计算结果
举例

val a=sc.parallelize(1 to 100)
a.reduce((x,y)=>x+y)
或
a.reduce(_+_)  //与上面意思一样// 5050val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1)))
b.reduce((x,y)=>{
    (x._1+y._1,x._2+y._2)}) // (AABBC,6)

foreach

作用:对RDD中的每个元素都使用指定函数,做循环,无返回值
举例

val rd=sc.parallelize(1 to 10)
rd.foreach(println)//1 2 3 4 5 6 7 8 9 10

foreachPartition

作用:遍历的数据是每个partition的数据,返回的是一个迭代器
举例

    val rdd = sc.parallelize(List(1,2,3,4,5,6,7),2)val value = rdd.map((_, 1)).cache()value.partitionBy(new MyPartition(2)).foreachPartition(x=>println(x.toList))//List((5,1), (6,1), (7,1))
//List((1,1), (2,1), (3,1), (4,1))

lookup

作用:用于PairRDD,返回K对应的所有V值
举例

val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.lookup('a')//Array(1, 2)

max

作用:取RDD中元素的最大值
举例

val y=sc.parallelize(10 to 30)
y.max//30

min

作用:取RDD中元素的最小值
举例

val y=sc.parallelize(10 to 30)
y.min//10

saveAsTextFile(path)

作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
举例

val rdd=sc.parallelize(1 to 10,2)
rdd.saveAsTextFile("hdfs://hadoop000:8020/data/rddsave/")//会把RDD的结果分成2个区的文件保存到hdfs上

saveAsSequenceFile(path)

作用:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
举例

saveAsObjectFile(path)

作用:用于将RDD中的元素序列化成对象,存储到文件中。
举例

aggregate

作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致
举例

var rdd1 = sc.makeRDD(1 to 10,2)
//将该RDD所有元素相加得到结果
rdd.aggregate(0)(_+_,_+_)//Int = 55

fold(num)(func)

作用:折叠操作,aggregate的简化操作,将每个分区里面的元素通过seqOp和初始值进行聚合
举例

var rdd1 = sc.makeRDD(1 to 10,2)
//将该RDD所有元素相加得到结果
rdd.fold(0)(_+_)//Int = 55

zip

作用:将两个RDD的元素组合成一个K/V的RDD,需要两个RDD的partition个数和元素个数相同,不然会报异常
举例

val rdd = sc.parallelize(Array(1,2,3,4,5))
val rdd1 = sc.parallelize(Array("A","B","C","D","E"))var rdd2 = rdd.zip(rdd1)
println(rdd2.collect().mkString(" "))  // (1,A),(2,B),(3,C),(4,D),(5,E)

转换算子

常用的转换算子

map,filter,flatMap,mapPartitions,mapPartitonsWithIndex,sample,takeSample,union,intersection,distinct,partitionBy,reduceByKey,groupByKey,combinByKey,aggregateByKey,foldByKey,sortByKey,sortBy,join,cogroup,cartesian,pipe,coalesce,repartition,repartitionAndSortWithinPartitons,glom,mapValues,subtract

网上找到一篇比较全的转换算子大全,之后会转载一下

  相关解决方案