?
2015-09-02 14:46:27,681-[TS] DEBUG Executor task launch worker-0 org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection - code for input[0, StringType],input[1, StringType],input[2, StringType],input[3, StringType],input[4, StringType]:
?
日志中的如下信息是如何产生的,这是列及其类型么?
input[0, StringType],input[1, StringType],input[2, StringType],input[3, StringType],input[4, StringType]
?
?
代码:
?
?
/** * A projection that returns UnsafeRow. */abstract class UnsafeProjection extends Projection { //将IntervalRow转换为UnsafeRow override def apply(row: InternalRow): UnsafeRow}
?
?
2. UnsafeRow的pointTo方法
/** * Update this UnsafeRow to point to different backing data. * * @param baseObject the base object * @param baseOffset the offset within the base object * @param numFields the number of fields in this row * @param sizeInBytes the size of this row's backing data, in bytes */ public void pointTo(Object baseObject, long baseOffset, int numFields, int sizeInBytes) { assert numFields >= 0 : "numFields (" + numFields + ") should >= 0"; this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); this.baseObject = baseObject; this.baseOffset = baseOffset; this.numFields = numFields; this.sizeInBytes = sizeInBytes; }
?
?
package org.apache.spark.sql.test;import org.apache.spark.sql.catalyst.InternalRow;import org.apache.spark.sql.catalyst.expressions.UnsafeRow;import org.apache.spark.unsafe.Platform;import org.apache.spark.unsafe.types.UTF8String;///在Spark的代码中中没有UnsafeProjection的子类,UnsafeProjection的子类是动态生成的//UnsafeProjection是一个抽象类,子类需要实现apply方法class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection { ///涉及的expressions private org.apache.spark.sql.catalyst.expressions.Expression[] expressions; //apply方法返回的结果 private UnsafeRow convertedStruct10; private byte[] buffer11; private int cursor12; public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) { this.expressions = expressions; this.convertedStruct10 = new UnsafeRow(); //buffer11是字节数组,长度为48 this.buffer11 = new byte[48]; this.cursor12 = 0; } // Scala.Function1 need this// public Object apply(Object row) {// return apply((InternalRow) row);// } public UnsafeRow apply(InternalRow i) { //cursor12首先复制为48,这个更buffer11的长度一样 cursor12 = 48; //这步操作是对convertedStruct10(UnsafeRow)的一些属性更新更新 //第二个参数是baseOffset,值是 //第三个参数(值5)是Row中的列数 //第四个参数cursor12表示这个Row的sizeInBytes(the size of this row's backing data, in bytes) convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, cursor12); /* input[0, StringType] */ //第一列是否为null boolean isNull0 = i.isNullAt(0); //不为null,则通过调用getUTF8String获取其UTF8String数据 UTF8String primitive1 = isNull0 ? null : (i.getUTF8String(0)); //获取第一列的字节数(如果是null,则为0),加到cursor12上。 int numBytes14 = cursor12 + (isNull0 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive1)); //如果第一列不为null if (buffer11.length < numBytes14) { // This will not happen frequently, because the buffer is re-used. //扩容 byte[] tmpBuffer13 = new byte[numBytes14 * 2]; //将buffer11的数据复制到tmpBuffer13,然后将tmpBuffer13复制给buffer11,此时buffer11完成了扩容的工作 Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET, tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length); buffer11 = tmpBuffer13; } //更新值 convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes14); if (isNull0) { convertedStruct10.setNullAt(0); } else { cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 0, cursor12, primitive1); } /* input[1, StringType] */ boolean isNull2 = i.isNullAt(1); UTF8String primitive3 = isNull2 ? null : (i.getUTF8String(1)); int numBytes15 = cursor12 + (isNull2 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive3)); if (buffer11.length < numBytes15) { // This will not happen frequently, because the buffer is re-used. byte[] tmpBuffer13 = new byte[numBytes15 * 2]; Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET, tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length); buffer11 = tmpBuffer13; } convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes15); if (isNull2) { convertedStruct10.setNullAt(1); } else { cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 1, cursor12, primitive3); } /* input[2, StringType] */ boolean isNull4 = i.isNullAt(2); UTF8String primitive5 = isNull4 ? null : (i.getUTF8String(2)); int numBytes16 = cursor12 + (isNull4 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive5)); if (buffer11.length < numBytes16) { // This will not happen frequently, because the buffer is re-used. byte[] tmpBuffer13 = new byte[numBytes16 * 2]; Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET, tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length); buffer11 = tmpBuffer13; } convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes16); if (isNull4) { convertedStruct10.setNullAt(2); } else { cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 2, cursor12, primitive5); } /* input[3, StringType] */ boolean isNull6 = i.isNullAt(3); UTF8String primitive7 = isNull6 ? null : (i.getUTF8String(3)); int numBytes17 = cursor12 + (isNull6 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive7)); if (buffer11.length < numBytes17) { // This will not happen frequently, because the buffer is re-used. byte[] tmpBuffer13 = new byte[numBytes17 * 2]; Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET, tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length); buffer11 = tmpBuffer13; } convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes17); if (isNull6) { convertedStruct10.setNullAt(3); } else { cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 3, cursor12, primitive7); } /* input[4, StringType] */ boolean isNull8 = i.isNullAt(4); UTF8String primitive9 = isNull8 ? null : (i.getUTF8String(4)); int numBytes18 = cursor12 + (isNull8 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive9)); if (buffer11.length < numBytes18) { // This will not happen frequently, because the buffer is re-used. byte[] tmpBuffer13 = new byte[numBytes18 * 2]; Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET, tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length); buffer11 = tmpBuffer13; } convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes18); if (isNull8) { convertedStruct10.setNullAt(4); } else { cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 4, cursor12, primitive9); } return convertedStruct10; }}
?