动作算子
- 动作算子
-
- count
- countByKey
- countByValue
- collect
- top
- take
- takeOrdered(n)
- takeSample(withReplacement,num,seed)
- first
- reduce
- foreach
- foreachPartition
- lookup
- max
- min
- saveAsTextFile(path)
- saveAsSequenceFile(path)
- saveAsObjectFile(path)
- aggregate
- fold(num)(func)
- zip
- 转换算子
本质上动作算子通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行
所有的动作算子都是急迫型(non-lazy),RDD遇到Action就会立即计算
动作算子
count
作用:返回数据集中元素的个数
举例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.count//6
countByKey
作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
举例:
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd.countByKey//scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
countByValue
作用:根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
举例:
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd.countByValue//scala.collection.Map[(Int, Int),Long] = Map((3,6) -> 1, (1,4) -> 1, (1,3)
collect
作用:以Array返回RDD的所有元素。一般在过滤或者处理足够小的结果的时候使用
举例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.collect//Array[Int] = Array(1, 2, 3, 4, 5, 6)
top
作用:自定义一个排序规则(倒序),返回最大的n个数组成的数组
举例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.top(3)// Array[Int] = Array(6, 5, 4)
take
作用:返回当前RDD中的前n个元素
举例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.take(3)// Array[Int] = Array(1, 2, 3)
takeOrdered(n)
作用:返回该RDD排序后的前n个元素组成的数组
举例:
val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd.takeOrdered(3)// Array[Int] = Array(2, 3, 4)
takeSample(withReplacement,num,seed)
作用:
- withReplacement:结果中是否可重复
- num:取多少个
- seed:随机种子
- 返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定的随机数生成器种子
- takeSample()函数和sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组
举例:
val rdd1 = sc.makeRDD(1 to 10)
rdd1.takeSample(true,4,10)//Array[Int] = Array(10, 10, 2, 3)
first
作用:返回当前RDD的第一个元素
举例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.first// 1
reduce
作用:根据指定函数,对RDD中的元素进行两两计算,返回计算结果
举例:
val a=sc.parallelize(1 to 100)
a.reduce((x,y)=>x+y)
或
a.reduce(_+_) //与上面意思一样// 5050val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1)))
b.reduce((x,y)=>{
(x._1+y._1,x._2+y._2)}) // (AABBC,6)
foreach
作用:对RDD中的每个元素都使用指定函数,做循环,无返回值
举例:
val rd=sc.parallelize(1 to 10)
rd.foreach(println)//1 2 3 4 5 6 7 8 9 10
foreachPartition
作用:遍历的数据是每个partition的数据,返回的是一个迭代器
举例:
val rdd = sc.parallelize(List(1,2,3,4,5,6,7),2)val value = rdd.map((_, 1)).cache()value.partitionBy(new MyPartition(2)).foreachPartition(x=>println(x.toList))//List((5,1), (6,1), (7,1))
//List((1,1), (2,1), (3,1), (4,1))
lookup
作用:用于PairRDD,返回K对应的所有V值
举例:
val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.lookup('a')//Array(1, 2)
max
作用:取RDD中元素的最大值
举例:
val y=sc.parallelize(10 to 30)
y.max//30
min
作用:取RDD中元素的最小值
举例:
val y=sc.parallelize(10 to 30)
y.min//10
saveAsTextFile(path)
作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
举例:
val rdd=sc.parallelize(1 to 10,2)
rdd.saveAsTextFile("hdfs://hadoop000:8020/data/rddsave/")//会把RDD的结果分成2个区的文件保存到hdfs上
saveAsSequenceFile(path)
作用:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
举例:
saveAsObjectFile(path)
作用:用于将RDD中的元素序列化成对象,存储到文件中。
举例:
aggregate
作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致
举例:
var rdd1 = sc.makeRDD(1 to 10,2)
//将该RDD所有元素相加得到结果
rdd.aggregate(0)(_+_,_+_)//Int = 55
fold(num)(func)
作用:折叠操作,aggregate的简化操作,将每个分区里面的元素通过seqOp和初始值进行聚合
举例:
var rdd1 = sc.makeRDD(1 to 10,2)
//将该RDD所有元素相加得到结果
rdd.fold(0)(_+_)//Int = 55
zip
作用:将两个RDD的元素组合成一个K/V的RDD,需要两个RDD的partition个数和元素个数相同,不然会报异常
举例:
val rdd = sc.parallelize(Array(1,2,3,4,5))
val rdd1 = sc.parallelize(Array("A","B","C","D","E"))var rdd2 = rdd.zip(rdd1)
println(rdd2.collect().mkString(" ")) // (1,A),(2,B),(3,C),(4,D),(5,E)
转换算子
常用的转换算子
map,filter,flatMap,mapPartitions,mapPartitonsWithIndex,sample,takeSample,union,intersection,distinct,partitionBy,reduceByKey,groupByKey,combinByKey,aggregateByKey,foldByKey,sortByKey,sortBy,join,cogroup,cartesian,pipe,coalesce,repartition,repartitionAndSortWithinPartitons,glom,mapValues,subtract
网上找到一篇比较全的转换算子大全,之后会转载一下