1、数据处理
导入需要的库
from pyspark import StorageLevel
from pyspark.sql import functions as F
from pyspark.sql.types import StringType,IntegerType
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.sql.functions import col
2、导入数据(parquet、csv、json、等格式)
导入parquet
sc = SparkContext("local", "test")
hive_context = HiveContext(sc)
local_path ="hdfs路径"total_data = hive_context.read.load(local_path)
导入csv
sc = SparkContext("local", "test2")
hive_context = HiveContext(sc)
local_path = 'hdfs路径'
total_data = hive_context.read.csv(local_path,header=True)
3、数据简单处理
以kaggle中 旧金山犯罪记录分类数据为例,数据下载地址:
https://www.kaggle.com/c/sf-crime/data。
#查看前5条数据
data.show(5)
#查看数据结构
data.printSchema()
#查看数据等列名、行数、列数data.columns
data.count()
len(data.columns)
# 描述制定列 describe
#如果我们要看一下数据框中某指定列的概要信息,我们会用describe方法。
#这个方法会提供我们指定列的统计概要信息,如果没有指定列名,它会提供这个数据框对象的统计信息。ata.describe('Category').show()
data.describe().show()
#查询多列 可以使用select
data.select('Category','Descript').show()
# 查询不重复的多列组合
data.select('Category','Descript').distinct().show()
#过滤数据 filter data.filter(data.Category == 'WEAPON LAWS').show()
#一共多少条记录被筛选出来
data.filter(data.Category == 'WEAPON LAWS').count()
# 基于多个条件过滤
data.filter((data.Category == 'WEAPON LAWS')& (data.DayOfWeek == 'Wednesday')).show()
#分组数据 groupby
data.groupBy("Category").count().show()
#数据排序 orderBy
#默认 升序
data.groupBy("Category") \.count() \.orderBy(col("count")) \.show()#可以降序
data.groupBy("Category") \.count() \.orderBy(col("count").desc()) \.show()