Status
Current state:ACCEPTED
Discussion thread: mail of vote result
JIRA: SAMZA-2421
Released:
Problem
Samza jobs can currently produce to Kafka, HDFS, EventHubs and ElasticSearch systems. We wish to enable jobs to send messages to Azure Blob Storage. However, at this point consuming from Azure Blob Storage is out-of-scope and hence no plans are made for a SystemConsumer counterpart.
Azure Background
Azure Blob Storage: The storage system consists of a three level hierarchy: A storage account can include an unlimited number of containers, and a container can store an unlimited number of blobs. However, a container can not contain more containers.
Block Blob is comprised of blocks which are first uploaded and then a commit operation is performed to seal all these blocks into forming a blob. If the commit operation is omitted then no blob is formed and the uploaded blocks are purged out of Storage. Each block can be upto 100Mb and there can be upto 50,000 blocks in a blob.
Proposed Changes
A native SystemProducer in Samza will be introduced which will send messages to Azure Blob Storage. A Samza job can choose this producer by configuring to use the corresponding SystemFactory and providing Azure account credentials such as account name and key.
Samza jobs produce to a System Stream Partition (SSP) of the destination "system". This SystemProducer maps the Azure container to the System and blob name is formatted as Stream/Partition/timestamp. Samza SystemProducers get the message as an OutgoingMessageEnvelope which has an SSP. This producer, uses those fields to identify the Azure container and form a blob. The partition field of the envelope is optional and in this case the blob name is formatted as Stream/timestamp. If two tasks produce to the same SSP then two different blobs will be created with the same prefix of stream/partition but different timestamps. However, its recommended to turn on the config to add a random string suffix to avoid blob name collisions.
For example a blob with SSP: system = "oss-testcontainer", stream = "OSSWikipediaFeedStreamEvent" and partition = empty gives blob name as below
. Here the 2019/12/20/16/45-16-0 is the timestamp.
Similarly, with SSP: system = "oss-testcontainer", stream = "OSSWikipediaFeedStreamEvent" and partition = "partition-0" gives blob name as below
If there are several blobs for the same SSP over a period of time then it looks as below
Azure treats "/" in the blob name to organize the UI as virtual folders. The above blobs all share the same prefix and the timestamp differs only in the last few seconds.
The flow of data is as follows.
Messages to be sent to Azure Blob Storage are buffered in-memory.
Every time the buffer reaches the maxBlock size, the block is uploaded to Azure Blob Storage asynchronously using stageBlock API provided by Azure.
List of all blocks uploaded for a blob is maintained in memory.
During flush of SystemProducer, for each stream-partition, remaining buffer is uploaded as a new block and the block list is committed which creates the Block Blob with list of blocks uploaded using the commitBlockList API provided by Azure.
Committing block list has to be a blocking call to wait for all pending block uploads of the blob to finish before the commit as this is expected by Azure Blob Storage.
Messages sent through the SystemProducer after a flush, are part of a new blob. Hence timestamp is added to the blob name as there could be multiple blobs (one per flush of SystemProducer) for the same SSP. This timestamp corresponds to the time when the first message is received by the SystemProducer after a commit.
Optionally, a random string can be suffixed to the blob name to avoid blob name collisions for cases when two tasks write to the same SSP at the same time.
Blocks of different streams can be uploaded in parallel which leads to better network utilization.
Public interfaces
SystemProducer API
SystemProducer.send(String source, OutgoingMessageEnvelope ome)
Source: is the source of the message. It indicates the origin of this message.
Example: for messages coming from Samza job’s task source is the task name, for coordinator messages source is “SamzaContainer”, for checkpoint messages source is task name, for diagnostics messges it is “DiagnosticManager”, for metrics related messages it is either “ApplicationMaster” or “StreamProcessor” and so on.
ome.systemStream:
SystemStream.system: Azure container name. Azure account name is provided as a config.
SystemStream.stream: Azure blob name prefix.
- ome.partition : empty or partition. If not empty, then blob name is “ome.stream/ome.partition/timestamp” else it is “ome.stream/timestamp".
Example: to send message to SSP system = Azure container name = “azure-container”, Stream = “azure-stream”, the blob with name “azure-stream/..” is created in Azure container “azure-container”. The blob can be found at https://<azure-account-name>.blob.core.windows.net/azure-container/azure-stream/.../timestamp where the blob name itself is azure-stream/../timestamp
ome.keySerializerName: no-op serde
ome.messageSerializerName: no-op serde
Ome.message: supports a single message.
Ome.key: key for the message.
SystemProducer.flush(String source): flush for all the streams associated with that source. During flush, the SystemProducer waits till all the pending blocks for all streams are asynchronously uploaded and then blobs are created. All messages coming after a flush will go into a new blob.
Metrics
This SystemProducer emits the following metrics.
- sent messages count
- sent bytes count
- compressed bytes count
- message send error count
- block upload count
- blob commit count and
- azure connection error count.
It puts out these metrics for three groups — "aggregate", system name and source of the OutgoingMessageEnvelope.
Configs
This SystemProducer allows the following properties to be configurable.
Name | Default | Description |
---|---|---|
systems.<azure-container-name>.azureblob.account.name | Azure account name. Is not optional | |
systems.<azure-container-name>.azureblob.account.key | Azure account key. Is not optional | |
systems.<azure-container-name>.azureblob.proxy.use | "false" | if true, proxy will be used to connect to Azure. |
systems.<azure-container-name>.azureblob.proxy.hostname | if proxy.use is true then host name of proxy | |
systems.<azure-container-name>.azureblob.proxy.port | if proxy.use is true then port of proxy | |
systems.<azure-container-name>.azureblob.compression.type | "none" | type of compression to be used before uploading blocks. Can be "none" or "gzip" |
systems.<azure-container-name>.azureblob.maxFlushThresholdSize | 10485760 (10 MB) | max size of the uncompressed block to be uploaded in bytes. Maximum size allowed by Azure is 100MB. |
systems.<azure-container-name>azureblob.maxBlobSize | Long.MAX_VALUE - unlimited | max size of the uncompressed blob in bytes. If default value then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB per block X 50,000 blocks). |
systems.<azure-container-name>.azureblob.maxMessagesPerBlob | Long.MAX_VALUE - unlimited | max number of messages per blob |
systems.<azure-container-name>.azureblob.threadPoolCount | 2 | number of threads for the asynchronous uploading of blocks |
systems.<azure-container-name>.azureblob.blockingQueueSize | Thread Pool Count * 2 | size of the queue to hold blocks ready to be uploaded by asynchronous threads. If all threads are busy uploading then blocks are queued and if queue is full then main thread will start uploading which will block processing of incoming messages |
systems.<azure-container-name>.azureblob.flushTimeoutMs | 3 mins | timeout to finish uploading all blocks before committing a blob |
systems.<azure-container-name>.azureblob.closeTimeoutMs | 5 mins | timeout to finish committing all the blobs currently being written to. This does not include the flush timeout per blob |
systems.<azure-container-name>.azureblob.suffixRandomStringToBlobName | false | if true, a random string of 8 chars is suffixed to the blob name to prevent name collision when more than one Samza tasks are writing to the same SSP. It is advisable to set this to true |
Implementation / Test Plan
AzureBlobSystemProducer which implements the existing org.apache.samza.system.SystemProducer interface.
public class AzureBlobSystemProducer implements SystemProducer { /** * send messages from the source to Azure Blob Storage * get or create a writer for the <source, stream-partition> pair * pass the envelope to the writer */ public void send(String source, OutgoingMessageEnvelope messageEnvelope) { } /** * commit all blobs pertaining to this source * close and delete all writers associated with this source * next message written for this source will create new writers. */ public void flush(String source) { } }
AzureBlobWriter interface which is used by AzureBlobSystemProducer to send messages to Azure Blob Storage.
public interface AzureBlobWriter { /** * write given message envelope to the blob opened */ void write(OutgoingMessageEnvelope messageEnvelope); /** * Asynchronously upload messages written as a blob. * Messages written after this go as a new block. */ void flush(); /** * Close the writer and all of its underlying components. * At the end of close, all the messages sent to the writer should be persisted in a blob. * flush should be called explicitly before close. * It is not the responsibility of close to upload blocks. * After close, no other operations can be performed. */ void close(); }
AvroAzureBlobWriter is an AzureBlobWriter implementation to write messages which are Avro records.
- Schema of the incoming Avro records is expected to be the same for all messages between two flushes of the SystemProducer. This is because an Avro file expects all of its entries to conform to a schema and here blob is an Avro file.
- OutgoingMessageEnvelope.key will be ignored here. This is because the Avro blobs created consist of only the schema taken from the OutgoingMessageEnvelope.message.getSchema and all the messages which are Avro records. There is no way to insert a key into the records as that would make the record incompatible with the schema. Other implementations of AzureBlobWriter can utilize the key in a meaning fashion.
- Optionally, OutgoingMessageEnvelope.message can be a byte array provided the first message to the SystemProducer or the first message sent after a flush is an Avro record. The first message being an Avro record is needed to get the schema for the Avro file. If the subsequent messages are byte arrays, then the byte arrays are appended to the Avro file directly without any check to see if the byte array indeed is an encoded Avro record which adheres to the schema of the Avro file. It is responsibility of the user of the SystemProducer to ensure that the byte arrays are valid failing which the resulting Azure file will be unreadable.
- SystemProducer is agnostic to schema evolution as long as the user adheres to the same schema between two flushes. Hence, if the user wants to change the schema then a flush prior to the change is mandatory. This is because, all messages sent between two flushes go into a single Avro file and are expected to adhere to the schema of the file.
public class AvroAzureBlobWriter implements AzureBlobWriter { // Avro's DataFileWriter is used to write the incoming Avro records. The sink for DataFileWriter is AzureBlobOutputStream. /** * write the message to blob via DataFileWriter which writes to AzureBlobOutputStream */ public void write(OutgoingMessageEnvelope messageEnvelope) { } /** * flush the contents of DataFileWriter and underlying AzureBlobOutputStream * this operation uploads a block. */ public void flush() { } /** * Close the DataFileWriter and underlying AzureBlobOutputStream * this operation commits the blob with all blocks uploaded so far. */ public void close() { } }
AvroAzureBlobWriter uses AzureBlobOutputStream for buffering a block and uploading it to Azure and committing a blob.
public class AzureBlobOutputStream implements OutputStream { // ByteArrayOutputStream is used as the in-memory buffer to hold a block until ready to upload // BlockBlobAsyncClient is used to upload blocks and commit blobs on Azure Blob Storage /** * write is invoked by DataFileWriter.append() while writing an Avro record. * the byte[] is buffered in ByteArrayOutputStream * if buffer size reaches max block size, buffer contents are uploaded as a block */ public void write(byte[] b, int off, int len) { } /** * flush is invoked by DataFileWriter.flush(). * uploads buffer contents as a block */ public void flush() { } /** * close is invoked by DataFileWriter.close() * The list of blocks uploaded so far are committed to form a blob. */ public void close() { } }
Test Plan
- Unit tests for AzureBlobSystemProducer, AvroAzureBlobWriter and AzureBlobOutputStream.
- Sample Samza job which produces to Azure Blob Storage using this SystemProducer natively.
Migration Plan and Compatibility
A Samza job can use this SystemProducer by configuring systems.<Azure-container-name>.samza.factory to be the SystemFactory of this producer. The job configs also need to include other essential Azure connection info such as account name, account key and port details. Thus any Samza job can migrate to this SystemProducer by just changing its configs.
Since this is a new SystemProducer being added, it is fully backward compatible.
Rejected Alternatives
Buffering and uploads
Design 1: No Buffering
In this design, in-memory buffering of messages is not required and each message is uploaded as a block.
Problem with this approach is poor QPS and very high network usage. Additionally, the number of transactions to Azure Blob Storage will be very high which results in high cost to serve.
Design 2: File Buffering
In this design, the messages are buffered in the file system. A file is created per SSP and messages are written to it in one of the formats Azure Blob Storage accepts. During SystemProducer.flush(), Azure Blob Storage API TransferManager.uploadFileToBlockBlob can be leveraged which takes max block size as input and split the file into multiple blocks, stage all the blocks and commit the block list to the blockBlob. This api uploads the blocks in parallel and then wait for their completion to commit the block list.
Problem with this approach is the use of file system and overhead associated with it like disk corruption, reading from and writing to disk.
Design 3: In-Memory Buffering with BlockingGet
In this design, in-memory buffer is used for messages but blocks are uploaded to Azure in a blocking manner.
Problem with this approach is to wait for each operation to be finished which will slow the QPS across multiple streams.
SystemProducer API
AzureBlobSystemProducer extends samza-azure’s AsyncSystemProducer instead of natively implementing SystemProducer
AsyncSystemProducer is an abstract class requiring its subclass to implement sendAsync which performs the actual sending of the OutgoingMessageEnvelope to the underlying system and returns a CompletableFuture. This future is then tracked by the AsyncSystemProducer and waited upon during flush().
Con: The proposed design for buffering and upload of blocks requires that an asynchronous upload of the block does not happen at every message send. Hence, a CompletableFuture can not be generated for every message send making it infeasible to extend AsyncSystemProducer.
Granularity of OutgoingMessageEnvelope: SystemProducer accepts entire blob in the OutgoingMessageEnvelope.
Con: it would make user code in Samza job very complex with need to buffer and create stage blocks and appropriately stitch them.
Serialization: user of SystemProducer serializes and SystemProducer always gets byte array.
Con: if SystemProducer gets a byte[] then it is hard to specify a delimiter between two messages and the schema of the message to ensure it can be read from Azure blob storage.
Partition key optional: user of SystemProducer can omit the partition key in the OutgoingMessageEnvelope to improve performance.
Consider the two cases: (case 1) blob name based on stream/timestamp and (case 2) blob name based on stream/partition/timestamp. In case 1, there is a single blob at a time for a stream while there are as many blob as partitions in case 2. This affects the upload performance and RAM usage as follows. Suppose the block size for upload is set to 10mb and number of partitions is 1000 and qps per partition is 5mb/min. For case 1, since its a single block per stream and blocks are uploaded as soon as full, there is a continuous upload throughout the 1 min. At flush at the end of a min, there is only a 10mb block to be uploaded and the blob is created. The RAM usage is ~10Mb as only that much is kept in memory before upload. For case 2, with the same block size of 10mb, now there are 1000 blobs to be created and 1000 blocks sitting in memory as none of them hit the 10 mb limit before 1 min. At flush, there are now 1000 blocks of 5mb each to be uploaded which saturates the upload channels and causes timeouts. Apart from that, there were no uploads empty until flush and RAM usage grows to 5GB within a min. To partially overcome this issue, the block size can be tuned to 1mb which will still mean a RAM usage of 1GB by flush.
Azure blob type
Azure Blob Storage supports three kinds of blobs - Page blob, Append blob and Block blob.
Page blob is a blob which is divided into 512 byte page and supports random read and write operations.
Append blob is comprised of blocks and is optimized for append operations. It does not expose its blockIds and maximum block size supported is 4MB
Block blob is also comprised of blocks but exposes its blockIds and maximum block size is 100MB.
Con: if Page or Append blob is chosen then the number of transactions to Azure Blob Storage will be much higher and also result in increased network calls.