Proposers

Approvers

  • @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • ...

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS


ABANDONED


COMPLETED

(tick)

INACTIVE


Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Z-order is a technique that allows you to map multidimensional data to a single dimension. We can use this feature to improve query performance.


Background

Z-order is not currently supported by open source engines and open source data Lake components. It makes sense to introduce this feature to Hudi


Introducing Z-order

Z-order is a technique that allows you to map multidimensional data to a single dimension.

Refer to Wik:  imagine that you have a collection of (X, Y) coordinate pairs laid out on a 2-dimensional plane. Using Z-ordering, you could arrange those 2D pairs on a 1-dimensional line. Importantly, values that were close together in the 2D plane would still be close to each other on the line. The figure below shows the Z-values for the two dimensional case with integer coordinates 0 ≤ x ≤ 7, 0 ≤ y ≤ 7 (shown both in decimal and binary). Interleaving the binary coordinate values yields binary z-values as shown. Connecting the z-values in their numerical order produces the recursively Z-shaped curve. Two-dimensional Z-values are also called as quadkey ones


It can be seen that if we sort the data according to the order of z-values and divide it into four files on average, no matter we use X or Y field filtering for point query in the query, we can skip half of the irrelevant files. If the amount of data is larger, the effect will be better. That is to say, the file based on z-order partition storage, It can have better data skipping effect on multiple fields.  Fortunately, Z-order is not limited to 2-dimensional space—it can be abstracted to work in any number of dimensions.

Implementation

At a high level there are 3 components to implement z-order support: z-value generation, statistical info preservation, query engine integration. Next 3 subsections discuss this in detail.


Z-value generation

The core mechanism underlying Z-ordering is the ability to translate a tuple of several field values into a single numbe (call it z-value). Refer to Wik, we simply interleave the bits of each field in the record

For example, imagine that we’re trying to calculate the Z-address of  two fields value which are unsigned (X=97, Y=214),  first let's represent x,y as bits

X value:   01100001
Y value:   11010110

second, Starting with the leftmost bit of Y, we interleave the bits of X and Y, then we can get z-value as follow

Z value: 1011011000101001

As mentioned above, for multiple unsigned int incremental value, Z-value is generated by crossing and merging bits.  Since the bits of each dimension value cross in the final Z-value, the sorting based on Z-value naturally forms a z-order curve which has good aggregation effect for X and Y fields. However, in the actual use scenario, the following problems need to be solved to generate Z-value based on this rule

1.There are many kinds of actual data types. How to deal with the data of non signed int type.

answer: we provide two ways to solve this problems

  • According to different types of data, find out differnt conversion method to convert those datas to bits. The conversion method must ensure the same lexicographically order before and after the conversion

          unsigned int converter:  

          just convert it to bits, the convert result has same lexicographically order before and after the conversion


          Int converter:

          we cannot convert it to bits directly, since in java the first bit of a negative number represents a sign。

          

Decimal value

Two’s complement

signed integer

0

0000 0000

1

0000 0001

2

0000 0010

126

0111 1110

127

0111 1111

-128

1000 0000

-127

1000 0001

-126

1000 0010

-2

1111 1110

-1

1111 1111


           From the above table we find that, the bits value of -1/-2 is larger than 1/2 which is wrong.

           Fortunately, we only need to modify our transformation method slightly to solve this problem  just reverse the first bit of above table。

Decimal value

Two’s complement signed integer

With first bit flipped

Resulting unsigned integer decimal value

0

0000 0000

1000 0000

128

1

0000 0001

1000 0001

129

2

0000 0010

1000 0010

130

126

0111 1110

1111 1110

254

127

0111 1111

1111 1111

255

-128

1000 0000

0000 0000

0

-127

1000 0001

0000 0001

1

-126

1000 0010

0000 0010

2

-2

1111 1110

0111 1110

126

-1

1111 1111

0111 1111

127


          we find the bits value -1/-2 is small than 1/2 and the  lexicographically order is same as origin value.    Obviously, this method meets the requirements of zorder

          Float/Double/Long/ converter:

          follow the convert method just like int type. 

          Decimal/Date/TimeStamp  converter:

          firstly convert those value to long , then follow the convert method just like int type

          UTF-8 String converter:

         the bits value of String has the same lexicographically order with String, so we can produce the final result by follow steps:

         if the string value is exactly 8 bytes long, use the word’s UTF-8 representation as is.
         if the word is shorter than 8 bytes long, add bytes with a value of zero to the end of the word’s UTF-8 representation until it is exactly 8 bytes.
         if the word is longer than 8 bytes, use only the first 8 bytes of its UTF-8 representation.

The conversion method above seems perfect, however it cannot solve the following problems:

       problem 1: If all dimension data are incremental positive integers starting from 0,  the Z-value of the calculation results will be ordered according to the z-order curve.
                         However, if the dimension data involved in the calculation are not incremental data sets starting from 0,  the z-values calculated are only part of the complete z-order curve, and have a poor aggregation effect
                         The above conversion method is obviously cannot guaranteed that the convert results are incremental positive integers starting from 0
        problem 2: for string value, the convert methond can cause words with a common prefix to have the same 8-byte value .  the reason of this problem is that By truncating each UTF-8 string to 4 bytes, we lose a great deal of precision.

                          eg: "https://www.baidu.com" , "https://www.google.com" has the same 8-byte value


/** Generates z-value*/

val newRDD = df.rdd.map { row =>
val values = zFields.map { case (index, field) =>
field.dataType match {
case LongType =>
ZOrderingUtil.longTo8Byte(row.getLong(index))
case DoubleType =>
ZOrderingUtil.doubleTo8Byte(row.getDouble(index))
case IntegerType =>
ZOrderingUtil.intTo8Byte(row.getInt(index))
case FloatType =>
ZOrderingUtil.doubleTo8Byte(row.getFloat(index).toDouble)
case StringType =>
ZOrderingUtil.utf8To8Byte(row.getString(index))
case DateType =>
ZOrderingUtil.longTo8Byte(row.getDate(index).getTime)
case TimestampType =>
ZOrderingUtil.longTo8Byte(row.getTimestamp(index).getTime)
case ByteType =>
ZOrderingUtil.byteTo8Byte(row.getByte(index))
case ShortType =>
ZOrderingUtil.intTo8Byte(row.getShort(index).toInt)
case d: DecimalType =>
ZOrderingUtil.longTo8Byte(row.getDecimal(index).longValue())
case _ =>
null
}
}.filter(v => v != null).toArray
val zValues = ZOrderingUtil.interleaveMulti8Byte(values)
Row.fromSeq(row.toSeq ++ Seq(zValues))
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)))



to solve those two problems,Boundary-base Interleaved Index seems good。

  • Boundary-based Interleaved Index

          Sampling the data. From the sampled data, we filter the specified number of boundaries for each field participating in z-order and sort them.
          Each field is mapped to the index of the data in boundaries, and then participates in the calculation of Z-value.
          Since the index of boundaries must be continuous positive integer data starting from 0, it fully meets the calculation requirements of interleaving index


/** Generates z-value */

val indexRdd = internalRdd.mapPartitionsInternal { iter =>
val bounds = boundBroadCast.value
val origin_Projections = sortingExpressions.map { se =>
UnsafeProjection.create(Seq(se), outputAttributes)
}

iter.map { unsafeRow =>
val interleaveValues = origin_Projections.zip(origin_lazyGeneratedOrderings).zipWithIndex.map { case ((rowProject, lazyOrdering), index) =>
val row = rowProject(unsafeRow)
val decisionBound = new DecisionBound(sampleRdd, lazyOrdering)
if (row.isNullAt(0)) {
bounds(index).length + 1
} else {
decisionBound.getBound(row, bounds(index).asInstanceOf[Array[InternalRow]])
}
}.toArray.map(ZOrderingUtil.toBytes(_))
val zValues = ZOrderingUtil.interleaveMulti4Byte(interleaveValues)
val mutablePair = new MutablePair[InternalRow, Array[Byte]]()

mutablePair.update(unsafeRow, zValues)
}
}.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1)


2.The bits number of each dimension value are handled differently, such as how a short type and an int type cross merge bits. How to deal with those values

answer:  just align all fields value to 64 bits ( if we choose  Boundary-base Interleaved Index , align all fiedls value to 32 bits is ok)


3.z-value is produced by interleaving bits of each diemesion, If the z-order value bits exceed 64 (that is, a bit of long type), how to store and express the Z-value values in hudi and compare them.

answer: Using Array[Byte] to stored the z-value (we limit the length of this array to 1024, just like Amazon DynamoDB); notice that now hbase rowkey is sorted by Array[Byte] internal  we can copy those code directly as compartor for z-value.


statistical info save

support collect min-max and null value info is necessary when we finished clustering data by z-order. Use those statistical info, hudi can do file level filtering directly.

which information will be collected?

min-max value, null value count for all z-order columns will be collected.

How to collect those information?

There are two ways to achive goals.

  • Collect those information by reading parquet header which stored min-max value and null value count. This method is not universal, only valid for parquet file. And

not all statistics stored in parquet header are collectly. For example parquet stored wrong statistics for timestamp value.


/** collect statistic info*/

val sc = df.sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(conf)
val numParallelism = inputFiles.size/3
val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
try {
val description = s"Listing parquet column statistics"
sc.setJobDescription(description)
sc.parallelize(inputFiles, numParallelism).mapPartitions { paths =>
val hadoopConf = serializableConfiguration.value
paths.map(new Path(_)).flatMap { filePath =>
val blocks = ParquetFileReader.readFooter(hadoopConf, filePath).getBlocks().asScala
blocks.flatMap(b => b.getColumns().asScala.
map(col => (col.getPath().toDotString(),
FileStats(col.getStatistics().minAsString(), col.getStatistics().maxAsString(), col.getStatistics.getNumNulls.toInt))))
.groupBy(x => x._1).mapValues(v => v.map(vv => vv._2)).
mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max, value.map(_.num_nulls).max)).toSeq.
map(x => ColumnFileStats(filePath.getName(), x._1, x._2.minVal, x._2.maxVal, x._2.num_nulls))
}.filter(p => cols.contains(p.colName))
}.collect()
} finally {
sc.setJobDescription(previousJobDescription)
}
  • Calcaute those information by sparksql, which can collect all types of column statistics correctly with poor efficiency


/** collect statistic info*/

val inputFiles = df.inputFiles
val conf = df.sparkSession.sparkContext.hadoopConfiguration
val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount")))
val valueCounts = count("*").as("totalNum")
val projectValues = Seq(col("file")) ++ cols.flatMap(c =>
Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls")))


val result = df.select(input_file_name() as "file", col("*"))
.groupBy($"file")
.agg(valueCounts, values: _*).select(projectValues:_*)
result

Example generated: (zorder columns: col, col1)

file

col_minValue

col_maxValue

col_num_nulls

col1_minValue

col1_maxValue

col1_num_nulls

xx-1

49152

65535

0

65536

98303

0

xx-2

65536

90111

0

65536

98303

0

  • How to save those statistics info

We can introduce _index_ directory to store those statistics info, what’s more we can use a internal hudi cow table to save those statistics info, just like hudi metadata

Integrations with query engines

Generally speaking, we no need to do any extra jobs for query engine, since most query engines automatic use min-max value which stored in parquet/orc.  

But, if we do can use statistics info in sql preparation phase, the effect of z-order will be better.

How to apply those statistic info to SparkSQL

  • Load index table to indexDataFrame
  • Construct data filter for indexDataFrame by using origin query filter
  • Query indexDataFrame to select candidate files
  • Use those candidate files to reconstruct hudiMemoryIndex.

The only question is how to construct data filter for indexDataFrame:  just use the min-max value and null counts info to construct data filters.


/** convert filter */

def createZindexFilter(condition: Expression): Expression = {

val minValue = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_minValue").expr
val maxValue = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_maxValue").expr
val num_nulls = (colName: Seq[String]) =>
col(UnresolvedAttribute(colName) + "_num_nulls").expr

condition match {
case EqualTo(attribute: AttributeReference, value: Literal) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
case EqualTo(value: Literal, attribute: AttributeReference) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))

case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
val colName = HudiMergeIntoUtils.getTargetColNameParts(equalNullSafe.left)
EqualTo(num_nulls(colName), equalNullSafe.right)
............

Rollout/Adoption Plan


  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>