当前位置: 代码迷 >> PB >> spark transform系列_groupByKey
  详细解决方案

spark transform系列_groupByKey

热度:685   发布时间:2016-04-29 05:10:12.0
spark transform系列__groupByKey

这个操作的作用根据相同的key的所有的value存储到一个集合中的一个玩意.

def?groupByKey():?RDD[(K,?Iterable[V])]?=?self.withScope?{
??groupByKey(defaultPartitioner(self))
}

在 做groupByKey的操作时,由于需要根据key对数据进行重新的分区操作,因此这个操作需要有一个partitioner的实例.默认是hash算 子.这个操作根据当前操作的RDD中是否有partitioner,同时这个partitioner与当前的传入的partitioner的实例是否相同 来判断是否需要执行shuffle操作.

如果是默认的hashPartitioner时,检查spark.default.parallelism配置是否有配置,如果有分区个数按这个配置来设置,否则使用当前进行此groupByKey操作的rdd的partitions来设置.

?

def?groupByKey(partitioner:?Partitioner):?RDD[(K,?Iterable[V])]

?=?self.withScope?{

这里同样与reduceByKey的操作一样,通过调用combineByKeyWithClassTag的函数来进行处理,

不同的是,withClassTag的合并操作是一个CompactBuffer[V]类型.

这里生成aggregator实例需要的三个函数时,

createCombiner:如果key目前还有值时,根据当前传入的key-value中的value生成一个CompactBuffer的实例,并存储到key对应的位置,

mergeValue:传入一个key-value时,如果key对应的CompactBuffer已经存在,把这个value添加到这个buffer中.

mergeCombiners:这个主要在shuffle结束时,把key相同的多个buffer进行合并.

需要注意的是,在执行groupByKey的操作时,会把mapSideCombine设置为false,表示不执行map端的聚合.

为 什么groupByKey不做mapSideCombine的操作呢,因为在groupByKey的操作中,会先根据相同的key,把value存储到一 个buffer中,这个地方并不会设计到map端combine的操作会减少多少的网络传输的开效,如果做map combine操作时,反而增加了map端writer的内存使用.

? //?groupByKey?shouldn't?use?map?side?combine?because?map?side?combine?does?not
??//?reduce?the?amount?of?data?shuffled?and?requires?all?map?side?data?be?inserted
??//?into?a?hash?table,?leading?to?more?objects?in?the?old?gen.
??val?createCombiner?=?(v:?V)?=>?CompactBuffer(v)
??val?mergeValue?=?(buf:?CompactBuffer[V],?v:?V)?=>?buf?+=?v
??val?mergeCombiners?=?(c1:?CompactBuffer[V],?c2:?CompactBuffer[V])?=>?c1?++=?c2
??val?bufs?=?combineByKeyWithClassTag[CompactBuffer[V]](
????createCombiner,?mergeValue,?mergeCombiners,?partitioner,?mapSideCombine?=?false)
??bufs.asInstanceOf[RDD[(K,?Iterable[V])]]
}

?

在combineByKeyWithClassTag的操作函数中的处理:

mapSideCombine的传入参数为false.

这个地方,根据上面的三个函数,生成Aggregator,这里的K,V,C分别代表key的类型,value的类型,C在groupByKey的操作中是一个CompactBuffer[V]的类型

val?aggregator?=?new?Aggregator[K,?V,?C](
??self.context.clean(createCombiner),
??self.context.clean(mergeValue),
??self.context.clean(mergeCombiners))

这里主要是看看当前的partitioner是否与当前执行这个操作的rdd的partitioner实例相同.相同就不在需要执行shuffle操作,否则就需要执行shuffle操作,生成新的ShuffledRDD实例.
if?(self.partitioner?==?Some(partitioner))?{
??self.mapPartitions(iter?=>?{
????val?context?=?TaskContext.get()
????new?InterruptibleIterator(context,?aggregator.combineValuesByKey(iter,

????????context))
??},?preservesPartitioning?=?true)
}?else?{
??new?ShuffledRDD[K,?V,?C](self,?partitioner)
????.setSerializer(serializer)
????.setAggregator(aggregator)
????.setMapSideCombine(mapSideCombine)
}

?

在Aggregator的操作中,如果mapSideCombine的参数为false时,通过Aggregator中的combineValuesByKey函数执行数据的合并操作.如果mapSideCombine的参数为true时,通过Aggregator中的combineCombinersByKey函数执行数据的合并操作(只执行第三个函数,因为map端已经把结果合并成了C的类型).

在Aggregator的合并操作中,通过ExternalAppendOnlyMap实例来进行数据的合并(insertAll).这个实例会最大可能的使用内存,如果内存实在不够用时,考虑对内存中的数据进行spill到磁盘的操作.

  相关解决方案