DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Discussion thread | |
Vote thread | |
ISSUE | |
Release | TBD |
Motivation
Multimodal data storage needs to support multimedia files, including text, images, audio, video, embedding vectors, etc. Paimon needs to meet the demand for multimodal data entering the lake, and achieve unified storage and efficient management of multimodal data and structured data.
Most multimodal files are actually not large, around 1MB or even below 1MB, but there are also relatively large multimodal files, such as 10GB+files, which pose storage challenges for us.
Consider two ways:
- Multimodal data can be directly stored in column files, such as Parquet or Lance files. The biggest problem with this solution is that it brings challenges to the file format, such as solving the read and write problems of OOM, which requires a streaming API to the file format to avoid loading the entire multimodal data. In addition, the additional fields of multimodal data may undergo frequent changes, additions, or even deletions. If these changes require multimodal files to participate in reading and writing together, the cost is very high.
- Multimodal data is stored on object storage, and Parquet references these files through pointers. The downside of doing so is that it cannot directly manage multimodal data and may result in a large number of small files, which can cause a significant amount of file IO during use, leading to decreased performance and increased costs.
We should consider new way to satisfy this requirement. Create a high-performance architecture specifically designed for mixed scenarios of massive small and large multimodal files, achieving high throughput writing and low latency reading, meeting the stringent performance requirements of AI, big data, and other businesses.
A more intuitive solution is: independent multimodal storage and structured storage, separate management of multimodal storage, introduction of blob file mechanism to store multiple multimodal data, Parquet saves all auxiliary fields, while the blob field column is saved separately in a blob file.
Solution1
Column split in data evolution table. We can split blob column apart in data-evolution table.
Prerequisite
We have already introduced Data Evolution [1]. Data Evolution Mode is a new feature for Append tables that revolutionizes how you handle data evolution, particularly when adding new columns. This mode allows you to update partial columns without rewriting entire data files. Instead, it writes new column data to separate files and intelligently merges them with the original data during read operations. This means that we can use this mechanism to store blob data separately. Using a new file format Blob file to store blob data separately. The blob file is a special data file which tracked by manifests only store one blob type column.
Table Level
We can use this mechanism to store blob data separately. Using a new file format Blob file to store blob data separately. The blob file is a special data file which tracked by manifests only store one blob type column.
But how to handle it when encountering a compaction? Could the compaction of Parquet files lead to the rewriting of blob files? We need to address this situation by allowing blob files to have different row id ranges and decoupling blob files compaction.
When reading, based on the first_row_id and row_number in the data file manifest metadata, we can determine which files belong to the same rowGroup, which can restore the entire data.
Blob File
Blob file is a new file format that need to meet the following requirements:
- Simply store multiple Blob data into one file.
- Quickly query the offset and length of a blob based on the line number.
The Blob File Structure is:
Blob files support sequential scanning and quick querying by line number.
Write New Files
We write blob column independently into a parquet file which only contains one column. Every time we finish file writing, we will get
two kind ManifestEntries. One is the entry of the normal fields, the other one is the blob file entry. For example, our table schema is (id STRING, num INT, name STRING, picture BLOB),
we can get two kind manifest entry,the one with write_col (id, num, name), the other one with write_col (picture). Notice that, one normal manifest entry may corresponding to multiple
blob manifest entry, because blob column is usually large.
We may tag for these entries to indicate they are in the same file group (should be assign the same first row id or some thing like that, and read together). Anyway, they must be read
together in anytime which we should guarantee. The one blob data file meta should contain a link to the next one when they are in a file group.
Compact Files
There are lots of limitation in data evolution mode of compaction. The key points:
- If two files have consequence row id in the same partition, they can be compacted.
- Blob file could have its own target-file-size
The file 1 and file 2 in Partition 0 could be compacted into one file.
However, file 1, file 2, file 3, file 4 could not be compacted, cause they are not consequence in one partition.
Compaction with blob files:
Table API
Introduce a new type: BlobType, create a table with blob field. If you use BlobType for a field, by default, it is same to BinaryType in Flink SQL and Spark SQL. But if you enable 'blob-storage.enabled', will use blob file to store blob data.
In Java API, you can just use BlobType to create a table. But for Flink SQL or Spark SQL, they don't have Blob Type, you need to use option to specific blob fields.
For example:
CREATE TABLE T ( i INT, j BYTES ) WITH ( 'row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true', 'blob-field' = 'j' )
Then, Paimon will use blob type for 'j' field.
Java API
BlobType:
public final class BlobType extends DataType {
private static final long serialVersionUID = 1L;
public static final int DEFAULT_SIZE = 1024 * 1024;
private static final String FORMAT = "BLOB";
public BlobType(boolean isNullable) {
super(isNullable, DataTypeRoot.BLOB);
}
public BlobType() {
this(true);
}
@Override
public int defaultSize() {
return DEFAULT_SIZE;
}
@Override
public DataType copy(boolean isNullable) {
return new BlobType(isNullable);
}
@Override
public String asSQLString() {
return withNullability(FORMAT);
}
@Override
public <R> R accept(DataTypeVisitor<R> visitor) {
return visitor.visit(this);
}
}
Data is Blob, it is an interface, which has implementation BlobData to store blob data bytes directly. And we can also have implementation to refer blob input streaming. Interface provides the ability to lazied reads. Allow subsequent reading in streaming way.
Blob:
public interface Blob {
byte[] toBytes();
SeekableInputStream newInputStream() throws IOException;
}
BlobData:
public class BlobData implements Blob {
private static final long serialVersionUID = 1L;
public static final byte BLOB_TYPE = 1;
private final byte[] data;
public BlobData(byte[] data) {
this.data = data;
}
@Override
public byte[] toBytes() {
return data;
}
@Override
public SeekableInputStream newInputStream() throws IOException {
return new ByteArraySeekableStream(data);
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
BlobData blobData = (BlobData) o;
return Objects.deepEquals(data, blobData.data);
}
@Override
public int hashCode() {
return Arrays.hashCode(data);
}
}
For example, write blob streams:
BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit();
write.write(GenericRow.of(0, new MyBlobRef()));
commit.commit(write.prepareCommit());
class MyBlobRef implements Blob {
File localFile;
@Override
public SeekableInputStream newInputStream() throws IOException {
// new SeekableInputStream from local file
}
}
For example, read blob streams:
ReadBuilder readBuilder = table.newReadBuilder(); RecordReader<InternalRow> reader = readBuilder.newRead().createReader(readBuilder.newScan().plan()); RecordIterator<InternalRow> batch = reader.readBatch(); InternalRow row = batch.next(); Blob blob = row.getBlob(1); SeekableInputStream in = blob.newInputStream(); // blob returned by underly will be lazied input stream // consume input stream
We can also add some util methods to Blob class:
Blob blob = Blob.fromLocal("/data/image.png"); // local file
Blob blob = Blob.fromPath(fileIO, "oss://data/image.png"); // remote path
Blob blob = Blob.fromUrl("https://example.com/audio.mp3"); // remote URL
Blob blob = Blob.fromByteArray(embeddingBytes); // inline data
Python API
PyPaimon has its own type system, so also we can introduce BlobType to Python API.
For example, write blob streams:
write_builder = table.new_batch_write_builder()
write = write_builder.new_write()
commit = write_builder.new_commit()
data = {}
data['f0'] = 1
You can write your BlobFile:
read_builder = table.new_read_builder().with_filter(predicate) read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() iterator = read.to_iterator(splits) value = next(iterator, None) while value is not None: blobFile = value.get_field(1) # consume blobFile # BlobFile is a file-like abstraction value = next(iterator, None)
Discussion
- No compression in blob file, multimodal data has already been compressed by itself. If compression is really needed in the future, we will introduce version v2.
[1] https://paimon.apache.org/docs/master/append-table/data-evolution/



