问题描述
我有一段Java 8代码,以便通过使用Spark SQL API计算Annova统计信息,如下面的SNIPPET 1中所示。 该代码段是根据可用的原始SCALA代码量身定制的
题
当我将其作为火花作业运行时,出现以下SNIPPET 2中指示的错误,该错误发生在名为“ joined”的dataFrame中。 发生错误的部分已标记有注释“ // !!!!!!! 在下面的SNIPPET 1中。 按照这个变量的定义,即“ joined” ,我从上面指出的URL提供了原始的SCALA版本。 您能否指出Java版本中缺少的内容? 谢谢。
问题的实质在以下文本中以粗体突出显示:
线程“ main” org.apache.spark.sql.AnalysisException中的异常:给定输入列, 无法解析' c.sum(valueSq))
' [b.sum(value),d.cat,a.count,c.cat ,c.sum(valueSq),b.cat,d.avg(value),a.cat] ;;
'项目[cat#51,count#74L,sum(value)#70,'c.sum(valueSq)),'avg(value))]
片段1:
private static AnovaStats computeAnovaStats(SparkSession spark, Dataset<Row> outliersDF){
outliersDF.createOrReplaceTempView("outliersDF");
Dataset<Row> anovaBaseDF =
spark.sql("SELECT usercode as cat, cast((frequency) as double) as value FROM outliersDF");
anovaBaseDF.createOrReplaceTempView("anovaBaseDF");
Dataset<Row> newDF =
spark.sql(
"SELECT " +
"A.cat, A.value, " +
"cast((A.value * A.value) as double) as valueSq, " +
"((A.value - B.avg) * (A.value - B.avg)) as diffSq " +
"FROM anovaBaseDF A " +
"JOIN " +
"(SELECT cat, avg(value) as avg FROM anovaBaseDF GROUP BY cat) B " +
"WHERE A.cat = B.cat");
RelationalGroupedDataset grouped = newDF.groupBy("cat");
Dataset<Row> sums = grouped.sum("value");
Dataset<Row> counts = grouped.count();
long numCats = counts.count();
Dataset<Row> sumsq = grouped.sum("valueSq");
Dataset<Row> avgs = grouped.avg("value");
double totN = toDouble(counts.agg(org.apache.spark.sql.functions.sum("count")).first().get(0));
double totSum = toDouble(sums.agg(org.apache.spark.sql.functions.sum("sum(value)")).first().get(0));
double totSumSq = toDouble(sumsq.agg(org.apache.spark.sql.functions.sum("sum(valueSq)")).first().get(0));
double totMean = totSum / totN;
double dft = totN - 1;
double dfb = numCats - 1;
double dfw = totN - numCats;
//!!!! VARIABLE UNDER QUESTION IS AS FOLLOWS !!!!
Dataset<Row> joined =
(counts.as("a")
.join(sums.as("b"), (col("a.cat").$eq$eq$eq(col("b.cat"))))
.join(sumsq.as("c"), (col("a.cat").$eq$eq$eq(col("c.cat"))))
.join(avgs.as("d"), (col("a.cat").$eq$eq$eq(col("d.cat"))))
.select(col("a.cat"), col("count"), col("sum(value)"),
col("sum(valueSq))"), col("avg(value))")));
/*
The original SCALA version of the local variable "joined", which is of type
"Dataset<Row>", is as follows:
val joined = (counts.as("a").join(sums.as("b"), $"a.cat" ===
$"b.cat")).join(sumsq.as("c"),$"a.cat" ===
$"c.cat").join(avgs.as("d"),$"a.cat"===$"d.cat").select($"a.cat",$"count",$"sum(value)",$"sum(valueSq)",$"avg(value)")
*/
Dataset<Row> finaldf = joined.withColumn("totMean", lit(totMean));
JavaPairRDD<String, Double> ssb_tmp =
finaldf.javaRDD()
.mapToPair(x -> new Tuple2(x.getString(0), ((toDouble(x.get(4)) - toDouble(x.get(4))) * (toDouble(x.get(5)) * toDouble(x.get(4)) - toDouble(x.get(4)) * toDouble(x.get(1))))));
Dataset<Row> ssbDR = spark.sqlContext().createDataset(JavaPairRDD.toRDD(ssb_tmp), Encoders.tuple(Encoders.STRING(),Encoders.DOUBLE())).toDF();
double ssb = ssbDR.agg(org.apache.spark.sql.functions.sum("_2")).first().getDouble(0);
Dataset<Row> ssw_tmp = grouped.sum("diffSq");
double ssw = toDouble(ssw_tmp.agg(org.apache.spark.sql.functions.sum("sum(diffSq)")).first().get(0));
double sst = ssb + ssw;
double msb = ssb / dfb;
double msw = ssw / dfw;
double fValue = msb / msw;
double etaSq = ssb / sst;
double omegaSq = (ssb - ((numCats - 1) * msw))/(sst + msw);
AnovaStats anovaStats = new AnovaStats(dfb, dfw, fValue, etaSq, omegaSq);
return anovaStats;
}
private static double toDouble(Object value){
double retVal = 0d;
if(value instanceof Double){
retVal = ((Double) value).doubleValue();
} else if (value instanceof Long){
retVal = ((Long) value).doubleValue();
} else if (value == null){
retVal = 0d;
}
return retVal;
}
片段2:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`c.sum(valueSq))`' given input columns: [b.sum(value), d.cat, a.count, c.cat, c.sum(valueSq), b.cat, d.avg(value), a.cat];;
'Project [cat#51, count#74L, sum(value)#70, 'c.sum(valueSq)), 'avg(value))]
+- AnalysisBarrier
+- Join Inner, (cat#51 = cat#175)
:- Join Inner, (cat#51 = cat#154)
: :- Join Inner, (cat#51 = cat#139)
: : :- SubqueryAlias a
: : : +- Aggregate [cat#51], [cat#51, count(1) AS count#74L]
: : : +- Project [cat#51, value#52, cast((value#52 * value#52) as double) AS valueSq#56, ((value#52 - avg#55) * (value#52 - avg#55)) AS diffSq#57]
: : : +- Filter (cat#51 = cat#59)
: : : +- Join Inner
: : : :- SubqueryAlias A
: : : : +- SubqueryAlias anovabasedf
: : : : +- Project [usercode#10 AS cat#51, cast(frequency#0L as double) AS value#52]
: : : : +- SubqueryAlias outliersdf
: : : : +- Filter ((cast(frequency#0L as double) >= -718.5) && (cast(frequency#0L as double) <= 1413.5))
: : : : +- Project [flowId#6, StateId#9, usercode#10, frequency#0L]
: : : : +- Filter (frequency#0L > cast(30 as bigint))
: : : : +- SubqueryAlias T
: : : : +- SubqueryAlias basedf
: : : : +- Project [flowId#6, StateId#9, usercode#10, frequency#0L]
: : : : +- Sort [flowId#6 ASC NULLS FIRST, StateId#9 ASC NULLS FIRST, usercode#10 ASC NULLS FIRST], true
: : : : +- Aggregate [flowId#6, StateId#9, usercode#10], [flowId#6, StateId#9, usercode#10, count(instanceuserid#25) AS frequency#0L]
1楼
如注释中所示,有一个拼写错误(即sum(valueSq)而不是sum(valueSq))。 相同的avg(value),而不是avg(value)))。 可行的解决方案如下:
private static AnovaStats computeAnovaStats(SparkSession spark, Dataset<Row> outliersDF, int flowId){
outliersDF.createOrReplaceTempView("outliersDF");
Dataset<Row> anovaBaseDF =
spark.sql("SELECT usercode as cat, cast((frequency) as double) as value FROM outliersDF");
anovaBaseDF.createOrReplaceTempView("anovaBaseDF");
Dataset<Row> newDF =
spark.sql(
"SELECT " +
"A.cat, A.value, " +
"cast((A.value * A.value) as double) as valueSq, " +
"((A.value - B.avg) * (A.value - B.avg)) as diffSq " +
"FROM anovaBaseDF A " +
"JOIN " +
"(SELECT cat, avg(value) as avg FROM anovaBaseDF GROUP BY cat) B " +
"WHERE A.cat = B.cat");
RelationalGroupedDataset grouped = newDF.groupBy("cat");
Dataset<Row> sums = grouped.sum("value");
Dataset<Row> counts = grouped.count();
long numCats = counts.count();
Dataset<Row> sumsq = grouped.sum("valueSq");
Dataset<Row> avgs = grouped.avg("value");
double totN = toDouble(counts.agg(org.apache.spark.sql.functions.sum("count")).first().get(0));
double totSum = toDouble(sums.agg(org.apache.spark.sql.functions.sum("sum(value)")).first().get(0));
double totSumSq = toDouble(sumsq.agg(org.apache.spark.sql.functions.sum("sum(valueSq)")).first().get(0));
double totMean = totSum / totN;
double dft = totN - 1;
double dfb = numCats - 1;
double dfw = totN - numCats;
Dataset<Row> joined =
(counts.as("a")
.join(sums.as("b"), (col("a.cat").equalTo(col("b.cat"))))
.join(sumsq.as("c"), (col("a.cat").equalTo(col("c.cat"))))
.join(avgs.as("d"), (col("a.cat").equalTo(col("d.cat"))))
.select(col("a.cat"), col("count"), col("sum(value)"),
col("sum(valueSq)"), col("avg(value)")));
Dataset<Row> finaldf = joined.withColumn("totMean", lit(totMean));
JavaPairRDD<String, Double> ssb_tmp =
finaldf.javaRDD()
.mapToPair(x -> new Tuple2(x.getString(0), ((toDouble(x.get(4)) - toDouble(x.get(4))) * (toDouble(x.get(5)) * toDouble(x.get(4)) - toDouble(x.get(4)) * toDouble(x.get(1))))));
Dataset<Row> ssbDR = spark.sqlContext().createDataset(JavaPairRDD.toRDD(ssb_tmp), Encoders.tuple(Encoders.STRING(),Encoders.DOUBLE())).toDF();
double ssb = ssbDR.agg(org.apache.spark.sql.functions.sum("_2")).first().getDouble(0);
Dataset<Row> ssw_tmp = grouped.sum("diffSq");
double ssw = toDouble(ssw_tmp.agg(org.apache.spark.sql.functions.sum("sum(diffSq)")).first().get(0));
double sst = ssb + ssw;
double msb = ssb / dfb;
double msw = ssw / dfw;
double fValue = msb / msw;
double etaSq = ssb / sst;
double omegaSq = (ssb - ((numCats - 1) * msw))/(sst + msw);
AnovaStats anovaStats = new AnovaStats(dfb, dfw, fValue, etaSq, omegaSq, flowId);
return anovaStats;
}