当前位置: 代码迷 >> 综合 >> PageRank 算法 Spark实现(Scala + Python)
  详细解决方案

PageRank 算法 Spark实现(Scala + Python)

热度:50   发布时间:2023-12-09 22:44:20.0

转自: https://plmsmile.github.io/2017/03/13/Spark-PairRDD/


PageRank

PageRank的python版本

#!/usr/bin/env python
# -*- coding: utf-8 -*-""" PageRank算法
author = PuLiming
运行: bin/spark-submit files/pagerank.py data/mllib/pagerank_data.txt 10
"""
from __future__ import print_functionimport re
import sys
from operator import addfrom pyspark import SparkConf, SparkContextdef compute_contribs(urls, rank):""" 给urls计算Args:urls: 目标url相邻的urls集合rank: 目标url的当前rankReturns:url: 相邻urls中的一个urlrank: 当前url的新的rank"""num_urls = len(urls)for url in urls:yield (url, rank / num_urls)def split_url(url_line):""" 把一行url切分开来Args:url_line: 一行url,如 1 2Returns:url, neighbor_url"""parts = re.split(r'\s+', url_line) # 正则return parts[0], parts[1]def compute_pagerank(sc, url_data_file, iterations):""" 计算各个page的排名Args:sc: SparkContexturl_data_file: 测试数据文件iterations: 迭代次数Returns:status: 成功就返回0"""# 读取url文件 ['1 2', '1 3', '2 1', '3 1'] lines = sc.textFile(url_data_file).map(lambda line: line.encode('utf8'))# 建立Pair RDD (url, neighbor_urls) [(1,[2,3]), (2,[1]), (3, [1])]links = lines.map(lambda line : split_url(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()# 初始化所有url的rank为1 [(1, 1), (2, 1), (3, 1)]ranks = lines.map(lambda line : (line[0], 1))for i in range(iterations):# (url, [(neighbor_urls), rank]) join neighbor_urls and rank # 把当前url的rank分别contribute到其他相邻的url (url, rank)contribs = links.join(ranks).flatMap(lambda url_urls_rank: compute_contribs(url_urls_rank[1][0], url_urls_rank[1][1]))# 把url的所有rank加起来,再赋值新的ranks = contribs.reduceByKey(add).mapValues(lambda rank : rank * 0.85 + 0.15)for (link, rank) in ranks.collect():print("%s has rank %s." % (link, rank)) return 0if __name__ == '__main__':if len(sys.argv) != 3:print("Usage: python pagerank.py <data.txt> <iterations>", file = sys.stderr)sys.exit(-1)# 数据文件和迭代次数url_data_file = sys.argv[1]iterations = int(sys.argv[2])# 配置 SparkContextconf = SparkConf().setAppName('PythonPageRank')conf.setMaster('local')sc = SparkContext(conf=conf)ret = compute_pagerank(sc, url_data_file, iterations)sys.exit(ret)

PageRank的scala版本

val sc = new SparkContext(...)
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()var ranks = links.mapValues(_ => 1.0)// 迭代10次
for (i <- 0 until 10) {val contributions = links.join(ranks).flatMap {case (pageId, (links, rank)) =>links.map(dest => (dest, rank / links.size))}ranks = contributions.reduceByKey(_ + _).mapValues(0.15 + 0.85* _)
}
ranks.saveAsTextFile("ranks")

当前scala版本的PageRank算法的优点:

  • links每次都会和ranks发生连接操作,所以一开始就对它进行分区partitionBy,就不会通过网络进行数据混洗了,节约了相当可观的网络通信开销
  • 对links进行persist,留在内存中,每次迭代使用
  • 第一次创建ranks,使用mapValues保留了父RDD的分区方式,第一次连接开销就会很小
  • reduceByKey后已经是分区了,再使用mapValues分区方式,再次和links进行join就会更加高效