当前位置: 代码迷 >> 综合 >> pyspark 浅析
  详细解决方案

pyspark 浅析

热度:36   发布时间:2023-12-09 22:43:14.0

Spark的scala也好, 还是pyspark也好, 它们的编程思想都是函数式编程, 关于函数式编程的解析可以看这篇文章:

http://www.ruanyifeng.com/blog/2012/04/functional_programming.html

函数式编程只是返回新的值, 不修改原有的值, 所以在对RDD操作时一定要注意, 不要用对RDD操作以后, 引用了老的变量.

import pysparksc = pyspark.SparkContext()rdd = sc.parallelize([1, 2, 3, 4, 5, 6, -1, 0])
rdd.sortBy(lambda x: x)
print rdd.collect()  # not ordered, because you are referencing the original variable 'rdd'rdd = sc.parallelize([1, 2, 3, 4, 5, 6, -1, 0])
rdd.sortBy(lambda x: x).persist()
print rdd.collect()  # not ordered, same reason as aboverdd = sc.parallelize([1, 2, 3, 4, 5, 6, -1, 0])
print rdd.sortBy(lambda x : x).collect() # ordered 

 

collect函数的作用把RDD的收集到当前的节点上, 注意如果RDD过大, 可能当前节点内存不能存放RDD,导致报错

 

Spark driver 是用来解析用户提交的代码, 并把这些代码转换成集群要执行的任务. 把这些任务交给master来分发执行.

master可以有很多选择, 比如yarn就是一个master(参考: https://blog.csdn.net/chengyuqiang/article/details/77864246#comments)

pyspark --master yarn --queue queue1 --deploy-mode client

 

通过配置spark-site.xml的HADOOP_CONF_DIR就能让spark知道yarn的访问路径

Spark 还分 client 模式和 cluster 模式, 详见: 

https://blog.csdn.net/SummerMangoZz/article/details/72627518

https://blog.csdn.net/Trigl/article/details/72732241

简单来说如果你向spark集群提交任务的那台电脑和集群很近, 延迟很低, 那client就够了, 但是如果你用的是VPN在家里向公司的某个国外的集群提交spark任务, 那么由于网络延迟很大, 就用cluster模式吧

 

 

Spark也有类似hadoop中分布式缓存的实现, 叫broadcast variables.

Spark中有一个全局计数的实现叫Accumulators

Spark的map可以基于每一行, 去应用lambda表达式对各个元素进行转换, 也可以基于Partition来去转换, 以节省数据库的链接这样宝贵的资源.

reduce里指定的变换函数, 返回的结果必须和序列的元素的结构一致, 因为reducer的原理是 把第一个和第二个元素的转换结果和第三个元素再一起转换:

import pysparksc = pyspark.SparkContext()rdd = sc.parallelize([1, 2, 3], 1)
# calculate the mean value
#the lambda function must return the same structure 
print reduce( lambda x,y: x/y,rdd.map(lambda x: [x, 1.0]).reduce(lambda x,y:[x[0]+y[0],x[1]+y[1]])) 

 

 

关于 Spark 的 fold 算子: 

# -*- coding: utf-8 -*-import pysparksc = pyspark.SparkContext()
baseline = 5
partition_count = 3
rdd = sc.parallelize([1, 2, 3], partition_count)
print rdd.fold(baseline, lambda x,y:x+y)
#equals
print sum([1, 2, 3]) + (len([1, 2, 3]) - 1) * baseline + (partition_count-1) * baseline

 

Spark-submit的参数: 

spark-submit --master yarn --deploy-mode client
--executor-memory 1G
--queue yarnqueue
--num-executors 6
--executor-cores 1
somepythonfile.py

executor-memory:每个执行器的内存拥有量

num-execuotrs: 默认为2(yarn only)

queue: 使用yarn的哪个queue(yarn only)

executor-cores: 每个执行器的核心使用数, 默认1(yarn only)

 

 

 

其他参考

https://www.cnblogs.com/lujinhong2/p/4666748.html