当前位置: 代码迷 >> 综合 >> Spark-trap
  详细解决方案

Spark-trap

热度:11   发布时间:2024-02-13 10:31:38.0

文章目录

    • 1. 保存到hdfs上时,结果文件的个数跟finalRDD的个数相同,想要将结果文件合并为1个
    • 2. 保存到hdfs上时,只能自定义设置保存的目录,文件名默认由hdfs的OutputFormat控制,是由分区数 + 一长串uuid组成。如何自定义文件名?
      • 2.1 创建OutputFormat类,继承FileOutputFormat[Any, Any],重写getRecordWriter方法
      • 2.2 spark代码中执行save到hdfs时,指定要使用的OutputFormat类
    • 3.重命名df生成的结果文件
      • 3.1 上面的那种自定义OutputFormat的方式问题很多:
        • `不如直接用df的csv先保存,然后再改名`==》
      • 3.2 scala程序中重命名hdfs文件
        • `这种方式又有问题:当数据量很大时,只能多个分区并行写入到hdfs,多个结果文件,此时改成相同名字不会成功,因为hdfs不支持将名字修改为已经存在的文件名,更不会自动合并。所以要合并为1个新文件。==>`
      • 3.3 将指定文件夹下的文件合并为1个指定名字的新文件。
    • 4.Spark读写csv格式文件
      • 4.1Spark 2.0 之前,Spark SQL 读写 CSV 格式文件,需要 Databricks 官方提供的 spark-csv 库。在 Spark 2.0 之后,Spark SQL 原生支持读写 CSV 格式文件。
      • 4.2配置项
        • 4.2.1读
        • 4.2.2写
      • 4.3
  • 5.`trap:sparksql查询hive中表时,并不是简单的把sql交给hive执行然后返回结果。`
  • 6.spark内日志使用
    • 须知:
    • usage
  • 7. spark代码的注意事项
    • {1} 算子内不能使用sc对象,否则会报序列化的错误
    • {2} 在driver中拉取过大的数据,比如collect,take(bigNum)等,会出现driver参数方面的错误,尽量不这么用。需要拉取到driver进行计算的程序,一定可以分解到各节点
    • {3} scala的集合有的函数是懒加载的,比如map,后面必须跟着能执行的操作。而foreach则是立即执行的。

1. 保存到hdfs上时,结果文件的个数跟finalRDD的个数相同,想要将结果文件合并为1个

*只需要将分区数设置1个然后输出即可*
/* 结果处理 */
//写入hdfsval writer = resultDF.coalesce(1).write.mode(saveMode = SaveMode.Overwrite).option("header", "true").csv(outputPath)

2. 保存到hdfs上时,只能自定义设置保存的目录,文件名默认由hdfs的OutputFormat控制,是由分区数 + 一长串uuid组成。如何自定义文件名?

  1. 创建1个OutputFormat类,重写RecordWriter,里面可以设置保存的路径和文件名,还可以设置具体输出的内容
  2. 把df或rdd转成pairRDD,然后使用saveAs……系列方法可以设置OutputFormat

2.1 创建OutputFormat类,继承FileOutputFormat[Any, Any],重写getRecordWriter方法

package cn.dcimport org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{JobContext, RecordWriter, TaskAttemptContext}/*** 创建CustomOutputFormat,继承FileOutputFormat[Any, Any],重写getRecordWriter,* 在返回值RecordWriter[Any,Any]中的write中设置要输出的路径、文件名、输出的内容**/
class CustomOutputFormat extends FileOutputFormat[Any, Any] {override def getRecordWriter(job: TaskAttemptContext): RecordWriter[Any, Any] = {val fileSystem = FileSystem.newInstance(job.getConfiguration)/*FileOutputFormat.getOutputPath(job) 用来获取在saveAsNewAPIHadoopFile中设置的args(0)。相当于在mapreduce的runner中设置的FileOutputFormat.setOutputPath(job, new Nothing("output1"))*/val fs = fileSystem.create(new Path(FileOutputFormat.getOutputPath(job) + "_result.txt"))return new RecordWriter[Any,Any] {override def write(key: Any, value: Any): Unit = {val separator = "#";val charSet = "UTF-8";//输出keyfs.write(key.toString().getBytes(charSet), 0, key.toString().getBytes(charSet).length);//输出key和value的分隔符fs.write(separator.getBytes(charSet), 0, separator.getBytes(charSet).length);//输出valuefs.write(value.toString().getBytes(charSet), 0, value.toString().getBytes(charSet).length);fs.write("\n".getBytes(charSet), 0, "\n".getBytes(charSet).length);fs.flush();}override def close(context: TaskAttemptContext): Unit = {if (fs != null) fs.close}}}
}

2.2 spark代码中执行save到hdfs时,指定要使用的OutputFormat类

/* 因为MR只支持kv类型的数据,所以此处要转为PairRDD然后才能调用saveAsNewAPIHadoopFile方法 */
val writer = resultDF.coalesce(1).rdd.map(r => "" -> r).
/* args(0) 输出目录这里要注意的是这里的out目录和RecordWriter类中设置的目录不同,out用来放SUCCESS文件,而具体的结果文件放在RecordWriter类中设置的目录 args(1) key的类型 args(2) value的类型 args(3) OutputFormat的class对象 这里写入的时候,要指定我们自定义的CustomOutputFormat类 */
saveAsNewAPIHadoopFile(outputPath,classOf[String],classOf[String],classOf[CustomOutputFormat],new Configuration())sc.stop()        

3.重命名df生成的结果文件

3.1 上面的那种自定义OutputFormat的方式问题很多:

  1. 只支持pairRDD,还要把RDD转成pairRDD,
  2. FileOutputFormat的value输出默认包裹在中括号里,还要进行处理
  3. 不支持直接保存为csv,还要用逗号分隔
  4. 还要进行路径的exists判断
  5. 会报一些错

不如直接用df的csv先保存,然后再改名==》

3.2 scala程序中重命名hdfs文件

/* 重命名hdfs文件获取指定目录下的文件对象Array[FileStatus] - Array[Path] - Array[String]改名*/val configuration = new Configuration();val fileSystem = FileSystem.get(configuration)val path = new Path(outputPath + "/" + dateTime)val fsArr: Array[FileStatus] = fileSystem.listStatus(path)val pathArr: Array[Path] = FileUtil.stat2Paths(fsArr)val pathStrArr:Array[String] = pathArr.filter(fileSystem.getFileStatus(_).isFile()).map(_.toString)//用来过滤出part开头的结果文件,比如.SUCCESS文件val sourceFile = "part-"for (i <- 0.until(pathStrArr.length)) {val beforeFilePath = pathStrArr(i)/*最后一个 "/" 的index + 1hdfs://ns/testdata/data/test/part-00000-e9d534da-d1e1-47fa-9d0e-ef9734d900d4-c000.csv*/val lastIndex = beforeFilePath.lastIndexOf("/") + 1val beforeFileName = beforeFilePath.substring(lastIndex)if (beforeFileName.contains(sourceFile)) {val afterFilePath = beforeFilePath.substring(0, lastIndex) + s"$dateTime-result.csv"fileSystem.rename(new Path(beforeFilePath), new Path(afterFilePath))}}

这种方式又有问题:当数据量很大时,只能多个分区并行写入到hdfs,多个结果文件,此时改成相同名字不会成功,因为hdfs不支持将名字修改为已经存在的文件名,更不会自动合并。所以要合并为1个新文件。==>

3.3 将指定文件夹下的文件合并为1个指定名字的新文件。

4.Spark读写csv格式文件

4.1Spark 2.0 之前,Spark SQL 读写 CSV 格式文件,需要 Databricks 官方提供的 spark-csv 库。在 Spark 2.0 之后,Spark SQL 原生支持读写 CSV 格式文件。

<!--spark-csv-->
<dependency><groupId>com.databricks</groupId><artifactId>spark-csv_2.10</artifactId><version>1.4.0</version>
</dependency>

4.2配置项

  • 从官网可以查看到所有配置:
    • DataFrameReader
    • DataFrameWriter

4.2.1读

  • sep (default ,): sets a single character as a separator for each field and value.

  • encoding (default UTF-8): decodes the CSV files by the given encoding type.

  • quote (default "): sets a single character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you need to set not null but an empty string. This behaviour is different from com.databricks.spark.csv.

    quote 引号字符,默认为双引号"

  • escape (default \): sets a single character used for escaping quotes inside an already quoted value.

  • charToEscapeQuoteEscaping (default escape or \0): sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise.

  • comment (default empty string): sets a single character used for skipping lines beginning with this character. By default, it is disabled.

  • header (default false): uses the first line as names of columns.

    header 第一行不作为数据内容,作为标题

  • enforceSchema (default true): If it is set to true, the specified or inferred schema will be forcibly applied to datasource files, and headers in CSV files will be ignored. If the option is set to false, the schema will be validated against all headers in CSV files in the case when the header option is set to true. Field names in the schema and column names in CSV headers are checked by their positions taking into accountspark.sql.caseSensitive. Though the default value is true, it is recommended to disable the enforceSchema option to avoid incorrect results.

  • inferSchema (default false): infers the input schema automatically from data. It requires one extra pass over the data.

    inferSchema 自动推测字段类型

  • samplingRatio (default is 1.0): defines fraction of rows used for schema inferring.

  • ignoreLeadingWhiteSpace (default false): a flag indicating whether or not leading whitespaces from values being read should be skipped.

  • ignoreTrailingWhiteSpace (default false): a flag indicating whether or not trailing whitespaces from values being read should be skipped.

  • nullValue (default empty string): sets the string representation of a null value. Since 2.0.1, this applies to all supported types including the string type.

    nullValue 指定1个字符串代表 null 值

  • emptyValue (default empty string): sets the string representation of an empty value.

    emptyValue指定1个字符串代表空串

  • nanValue (default NaN): sets the string representation of a non-number" value.

  • positiveInf (default Inf): sets the string representation of a positive infinity value.

  • negativeInf (default -Inf): sets the string representation of a negative infinity value.

  • dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type.

  • timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.

  • maxColumns (default 20480): defines a hard limit of how many columns a record can have.

  • maxCharsPerColumn (default -1): defines the maximum number of characters allowed for any given value being read. By default, it is -1 meaning unlimited length

  • mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing. It supports the following case-insensitive modes. Note that Spark tries to parse only required columns in CSV under column pruning. Therefore, corrupt records can be different based on required set of fields. This behavior can be controlled by spark.sql.csv.parser.columnPruning.enabled (enabled by default).

  • PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets other fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. A record with less/more tokens than schema is not a corrupted record to CSV. When it meets a record having fewer tokens than the length of the schema, sets null to extra fields. When the record has more tokens than the length of the schema, it drops extra tokens.

  • DROPMALFORMED : ignores the whole corrupted records.

  • FAILFAST : throws an exception when it meets corrupted records.

  • columnNameOfCorruptRecord (default is the value specified in spark.sql.columnNameOfCorruptRecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord.

  • multiLine (default false): parse one record, which may span multiple lines.

object CSVReader {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("CSV Reader").master("local").getOrCreate()val result = spark.read.format("csv").option("delimiter", "|").option("header", "true").option("quote", "'").option("nullValue", "\\N").option("inferSchema", "true")
//设置样例类来给df提供结果和类型,这样就可以不使用inferSchema了.schema(ScalaReflection.schemaFor[User].dataType.asInstanceOf[StructType]).load("test-in/csv/csv_without_header.csv")//df也可以再调用toDF来设置字段名.toDF("id", "name", "age")result.show()result.printSchema() }
}

4.2.2写

  • sep (default ,): sets a single character as a separator for each field and value. kv之间的分割符

  • quote (default "): sets a single character used for escaping quoted values where the separator can be part of the value. If an empty string is set, it uses u0000 (null character).

  • escape (default \): sets a single character used for escaping quotes inside an already quoted value.

  • charToEscapeQuoteEscaping (default escape or \0): sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise.

  • escapeQuotes (default true): a flag indicating whether values containing quotes should always be enclosed in quotes. Default is to escape all values containing a quote character.

  • quoteAll (default false): a flag indicating whether all values should always be enclosed in quotes. Default is to only escape values containing a quote character.

  • header (default false): writes the names of columns as the first line. 是否把列名作为第一行输出

  • nullValue (default empty string): sets the string representation of a null value.

    nullValue 空值设置,如果不想用任何符号作为空值,可以赋值null即可

  • emptyValue (default ""): sets the string representation of an empty value.

  • encoding (by default it is not set): specifies encoding (charset) of saved csv files. If it is not set, the UTF-8 charset will be used.设置编码格式,默认UTF-8

  • compression (default null): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate).压缩格式

  • dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type.字符串和Date对象的转换器

  • timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.字符串和时间对象的转换器

  • 在spark 2.1.1 使用 Spark SQL 保存 CSV 格式文件,默认情况下,会自动裁剪字符串前后空格。在 Spark 2.2.0 之后,可以通过配置关闭该功能

    ignoreLeadingWhiteSpace 裁剪前面的空格。

    ignoreTrailingWhiteSpace 裁剪后面的空格

    • ignoreLeadingWhiteSpace (default true): a flag indicating whether or not leading whitespaces from values being written should be skipped.是否清除前面空格
    • ignoreTrailingWhiteSpace (default true): a flag indicating defines whether or not trailing whitespaces from values being written should be skipped.是否清除前面空格
    result.write.mode(SaveMode.Overwrite).option("delimiter", "|").option("quote", "").option("ignoreLeadingWhiteSpace", false).option("ignoreTrailingWhiteSpace", false).option("nullValue", null).format("csv").save("test-out/csv/")
    

    4.3

    使用jpmml-evaluator-spark使用pmml预测dataframe数据。
    我使用scala-spark API csv method 将transform后的df保存为csv文件。提示gbtValue is not defined
    奇怪的是,transform之前的input-dataframe是从hive表中通过sql查询获取的。如果我直接从hdfs文件中创建input-dataframe,就成功保存了。
    不同的df,都可以正常生成resultDF,但保存为csv文件时jpmml提示gbtvalue not defined,并且是jpmml报的错,保存跟jpmml有关系吗???

  1. 从hdfs文件获取的df和从hive表中获取的df有不同吗,我show和printSchema都一样,只有个别字段的类型不同,因为从hdfs文件获取的df的类型是自动推断的。
  2. 将结果df保存为csv文件,这个阶段跟jpmml已经没关系了,为什么jpmml会报getValue的错

5.trap:sparksql查询hive中表时,并不是简单的把sql交给hive执行然后返回结果。

  1. pmml-spark时,如果input-df的数据不对,比如把列行当作数据,会报一些莫名其妙的错,比如not access value 'xxx'gbtValue is not defined坑!
  2. 在hive中设置了load数据时跳过首行,在hue和hive中都查询正常。但sparksql查的时候会把csv源文件的列行当作数据。此时需要在sparksql中单独处理。

6.spark内日志使用

须知:

  1. OFF < FATAL < WARN < INFO < DEBUG < TRACE < ALL
    低可以输出高

usage

将自己的日志同spark默认输出的日志合并导一起。

注意要导org.slf4j.LoggerFactory

import org.slf4j.LoggerFactory
val log = LoggerFactory.getLogger(classOf[SparkMockData1])

7. spark代码的注意事项

{1} 算子内不能使用sc对象,否则会报序列化的错误

{2} 在driver中拉取过大的数据,比如collect,take(bigNum)等,会出现driver参数方面的错误,尽量不这么用。需要拉取到driver进行计算的程序,一定可以分解到各节点

{3} scala的集合有的函数是懒加载的,比如map,后面必须跟着能执行的操作。而foreach则是立即执行的。

example,在mapPartitionWithIndex中,使用for循环来向hdfs写入数据,在mapPartitionWithIndex提供的iter中使用map却不执行,最后换foreach就执行了。

  相关解决方案