hadoop(配置文件都在$HADOOP_HOME/etc/hadoop)
- hadoop.env.sh
#export JAVA_HOME=${JAVA_HOME} export JAVA_HOME=/opt/modules/jdk1.8.0_11#配置java_home
- core-site.xml
<configuration><property><name>fs.default.name</name><value>hdfs://pyspark-1.bigload.com:8020</value></property> </configuration>
- hdfs-site.xml
<configuration><property><name>dfs.namenode.name.dir</name><value>/opt/modules/hadoop-2.6.0-cdh5.7.0/tmp/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>/opt/modules/hadoop-2.6.0-cdh5.7.0/tmp/dfs/data</value></property><!--副本数--><property><name>dfs.replication</name><value>1</value></property> </configuration>
- mapred-site.xml(cp mapred-site.xml.template mapred-site.xml)
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property> </configuration>
- yarn-site.xml
<configuration><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property> </configuration>
- 格式化
bin/hadoop namenode –format
- 无秘钥登录
ssh无秘钥登录 cd ~/.ssh 主节点 NameNode 1)生成一对公钥与秘钥 ssh-keygen -t rsa 2)拷贝公钥到各个机器上 ssh-copy-id pyspark-1.bigload.com ssh-copy-id localhost ssh-copy-id 0.0.0.0
- 启动hdfs
sbin/start-dfs.sh
- 游览器查看(http://pyspark-1.bigload.com:50070),传个文件
bin/hadoop dfs -mkdir -p /test bin/hadoop fs -put README.txt /test
- 启动yarn
sbin/start-yarn.sh
- 游览器查看(http://pyspark-1.bigload.com:8088/)
maven
- 解压
tar -zxf apache-maven-3.3.9-bin.tar.gz -C /opt/modules/
- 建立local/repo文件夹,并且修改config/settings.xml
创建local/repo文件夹mkdir -p local/reposettings.xml<localRepository>/opt/modules/apache-maven-3.3.9/local/repo</localRepository>并添加镜像:<mirror><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><mirrorOf>central</mirrorOf></mirror>
python
yum源(阿里云)http://www.cnblogs.com/lin1/p/5607121.htmlyum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel gcctar -zxf Python-3.6.9.tgz
cd Python-3.6.9
./configure --prefix=/opt/modules/python3/
cd /opt/modules/python3
make && make install
spark
解压
修改为对应的版本(dev/make-distribution.sh)
初始
VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ 2>/dev/null | grep -v "INFO" | tail -n 1)
SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ 2>/dev/null\
| grep -v "INFO"\
| tail -n 1)
SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null\
| grep -v "INFO"\
| tail -n 1)
SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@ 2>/dev/null\
| grep -v "INFO"\
| fgrep --count "<id>hive</id>";\
# Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
# because we use "set -o pipefail"
echo -n)
替换为下面对应的参数值
VERSION=2.4.4
SCALA_VERSION=2.11.12
SPARK_HADOOP_VERSION=2.6.0-cdh5.7.0
SPARK_HIVE=1e.spark pom.xml 添加 cdh reponsitory
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>如果不添加会出现如下错误信息:
Failed to execute goal on project spark-launcher_2.11: Could not resolve dependencies for project org.apache.spark:spark-launcher_2.11:jar:2.1.0: Could not find artifact org.apache.hadoop:hadoop-client:jar:2.5.0-cdh5.3.6[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn <goals> -rf :spark-launcher_2.11
-rf :spark-launcher_2.11
./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.7.0编译时间会有些长,会生成一个spark-2.4.4-bin-2.6.0-cdh5.7.0.tgz
tar -zxf spark-2.4.4-bin-2.6.0-cdh5.7.0.tgz -C /opt/modules
随后进入编译后的文件夹
cd /opt/modules/spark-2.4.4-bin-2.6.0-cdh5.7.0
可以用bin/spark-shell测试一下配置SPARK默认使用 PYTHON3
在 bin/pyspark 文件中添加 export PYSPARK_PYTHON=python3
(python3已经加入环境变量)
http://www.bubuko.com/infodetail-2294107.html
官网:http://spark.apache.org/
源码:https://github.com/apache/spark
spark core核心rdd
什么是rdd,弹性分布式数据集(Resilient Distributed Dataset )
1)rdd是一个抽象类
2)带泛型的,可以支持多种类型:String,User,Person
rdd特性
不可变
分区
并行计算的
1.A list of partitions RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的list;将数据加载为RDD时,一般会遵循数据的本地性(一般一个hdfs里的block会加载为一个partition)。 2.A function for computing each split 一个函数计算每一个分片,RDD的每个partition上面都会有function,也就是函数应用,其作用是实现RDD之间partition的转换。 3.A list of dependencies on other RDDs RDD会记录它的依赖 ,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。为了容错(重算,cache,checkpoint),也就是说在内存中的RDD操作时出错或丢失会进行重算。 rdd1==>rdd2==>rdd3==>rdd4 rdd2=f1(rdd1) rdd3=f2(rdd2) rdd4=f3(rdd3)
4.Optionally,a Partitioner for Key-value RDDs 可选项,如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区,例如这里自定义的Partitioner是基于key进行分区,那则会将不同RDD里面的相同key的数据放到同一个partition里面 5.Optionally, a list of preferred locations to compute each split on 最优的位置去计算,也就是数据的本地性。(数据在哪优先吧作业调度到数据所在的节点进行计算:移动数据不如移动计算) |
图解rdd
sparkcontext & sparkconf
Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个,SparkContext
您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
该appName
参数是您的应用程序显示在集群UI上的名称。 master
是Spark,Mesos或YARN群集URL或特殊的“本地”字符串,以本地模式运行。实际上,当在集群上运行时,您将不希望master
在程序中进行硬编码,而是在其中启动应用程序spark-submit
并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark。
rdd创建方式
1.并行集合
>>> data=[1,2,3,4,5]
>>> distData=sc.parallelize(data)
>>> distData
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
>>> data
[1, 2, 3, 4, 5]
>>> disData.collect()
Traceback (most recent call last):File "<stdin>", line 1, in <module>
NameError: name 'disData' is not defined
>>> distData.collect()
[1, 2, 3, 4, 5]
>>> distData.reduce(lambda a,b:a+b)
15
2.外部数据集
PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括您的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。
可以使用
SparkContext
的textFile
方法创建文本文件RDD 。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://
,s3a://
等URI),并读取其作为行的集合。这是一个示例调用:
>>> distFile = sc.textFile("data.txt")
一旦创建,
distFile
就可以通过数据集操作对其进行操作。例如,我们可以使用map
和reduce
操作将所有行的大小相加,如下所示:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
。关于使用Spark读取文件的一些注意事项:
如果在本地文件系统上使用路径,则还必须在工作节点上的相同路径上访问该文件。将文件复制到所有工作服务器,或者使用网络安装的共享文件系统。
Spark的所有基于文件的输入方法(包括
textFile
)都支持在目录,压缩文件和通配符上运行。例如,你可以使用textFile("/my/directory")
,textFile("/my/directory/*.txt")
和textFile("/my/directory/*.gz")
。该
textFile
方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中的块默认为128MB),但是您也可以通过传递更大的值来请求更大数量的分区。请注意,分区不能少于块。除文本文件外,Spark的Python API还支持其他几种数据格式:
SparkContext.wholeTextFiles
使您可以读取包含多个小文本文件的目录,并将每个小文本文件作为(文件名,内容)对返回。与相比textFile
,会在每个文件的每一行返回一条记录。
RDD.saveAsPickleFile
并SparkContext.pickleFile
支持以包含腌制Python对象的简单格式保存RDD。批处理用于咸菜序列化,默认批处理大小为10。SequenceFile和Hadoop输入/输出格式
请注意,此功能当前已标记
Experimental
,仅供高级用户使用。将来可能会替换为基于Spark SQL的读/写支持,在这种情况下,Spark SQL是首选方法。
sc.textFile("file:///opt/datas/README.md").collect()
sc.textFile("hdfs://pyspark-1.bigload.com/hadoop/README.md").collect()
spark应用程序开发及运行
我是把一切环境都是装载了虚拟机上,发现在Windows还需要再做一次环境绝望之余看见了能远程连接,换上自己的虚拟机的ip就行了,反正是怎么省事怎么来,日后要是出现问题了再改吧,感谢分享
Windows上的PyCharm 远程连接调试pyspark
from pyspark import SparkConf,SparkContext
#创建sparkconf:设置的是spark相关的参数信息
conf=SparkConf().setMaster("local[2]").setAppName("spark0301")
#创建sparkcontext
sc=SparkContext(conf=conf)#业务逻辑
data=[1,3,44,5]
distData=sc.parallelize(data)
print(distData.collect())sc.stop()