One of the core use-cases for Apache Hudi is enabling seamless, efficient database ingestion to your data lake. Even though a lot has been talked about and even users already adopting this model, content on how to go about this is sparse. 

In this blog, we will build an end-end solution for capturing changes from a MySQL instance running on AWS RDS to a Hudi table on S3, using capabilities in the Hudi 0.5.1 release


We can break up the problem into two pieces. 

  1. Extracting change logs from MySQL : Surprisingly, this is  still a pretty tricky problem to solve and often Hudi users get stuck here. Thankfully, at-least for AWS users, there is a Database Migration service (DMS for short), that does this change capture and uploads them as parquet files on S3
  2. Applying these change logs to your data lake table :  Once there are change logs in some form, the next step is to apply them incrementally to your table. This mundane task can be fully automated using the Hudi DeltaStreamer tool. 


The actual end-end architecture looks something like this.  



Let's now illustrate how one can accomplish this using a simple orders  table, stored in MySQL (these instructions should broadly apply to other database engines like Postgres, or Aurora as well, though SQL/Syntax may change)

CREATE DATABASE hudi_dms;
USE hudi_dms;

CREATE TABLE orders(
   order_id INTEGER, 
   order_qty INTEGER, 
   customer_name VARCHAR(100),
   updated_at TIMESTAMP DEFAULT NOW() ON UPDATE NOW(),
   created_at TIMESTAMP DEFAULT NOW(),
   CONSTRAINT orders_pk PRIMARY KEY(order_id)
);

INSERT INTO orders(order_id, order_qty, customer_name) VALUES(1, 10, 'victor');
INSERT INTO orders(order_id, order_qty, customer_name) VALUES(2, 20, 'peter');

In the table, order_id is the primary key which will be enforced on the Hudi table as well. Since a batch of change records can contain changes to the same primary key, we also include updated_at and created_at fields, which are kept upto date as writes happen to the table. 

Extracting Change logs from MySQL

Before we can configure DMS, we first need to prepare the MySQL instance for change capture, by ensuring backups are enabled and binlog is turned on. 



Now, proceed to create endpoints in DMS that capture MySQL data and store in S3, as parquet files

  • Source hudi-source-db endpoint, points to the DB server and provides basic authentication details 
  • Target parquet-s3  endpoint, points to the bucket and folder on s3 to store the change logs records as parquet files


Then proceed to create a migration task, as below. Give it a name, connect the source to the target and be sure to pick the right Migration type as shown below, to ensure ongoing changes are continuously replicated to S3.  Also make sure to specify, the rules using which DMS decides which MySQL schema/tables to replicate. In this example, we simply whitelist orders table under the hudi_dms schema, as specified in the table SQL above. 




Starting the DMS task and should result in an initial load, like below. 

Simply reading the raw initial load file, shoud give the same values as the upstream table

scala> spark.read.parquet("s3://hudi-dms-demo/orders/hudi_dms/orders/*").sort("updated_at").show

+--------+---------+-------------+-------------------+-------------------+
|order_id|order_qty|customer_name|         updated_at|         created_at|
+--------+---------+-------------+-------------------+-------------------+
|       2|       10|        peter|2020-01-20 20:12:22|2020-01-20 20:12:22|
|       1|       10|       victor|2020-01-20 20:12:31|2020-01-20 20:12:31|
+--------+---------+-------------+-------------------+-------------------+

Applying Change Logs using Hudi DeltaStreamer

Now, we are ready to start consuming the change logs. Hudi DeltaStreamer runs as  Spark job on your favorite workflow scheduler (it also supports a continuous mode using --continuous  flag, where it runs as a long running Spark job), that tails a given path on S3 (or any DFS implementation) for new files and can issue an upsert  to a target hudi dataset. The tool automatically checkpoints itself and thus to repeatedly ingest, all one needs to do is to keep executing the DeltaStreamer periodically. 


With an initial load already on S3, we then run the following command (deltastreamer command, here on) to ingest the full load first and create a Hudi dataset on S3. 

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
  --packages org.apache.spark:spark-avro_2.11:2.4.4 \
  --master yarn --deploy-mode client \
  hudi-utilities-bundle_2.11-0.5.1-SNAPSHOT.jar \
  --table-type COPY_ON_WRITE \
  --source-ordering-field updated_at \
  --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
  --target-base-path s3://hudi-dms-demo/hudi_orders --target-table hudi_orders \
  --transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer \
  --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
  --hoodie-conf hoodie.datasource.write.recordkey.field=order_id,hoodie.datasource.write.partitionpath.field=customer_name,hoodie.deltastreamer.source.dfs.root=s3://hudi-dms-demo/orders/hudi_dms/orders
  

A few things are going on here 

  • First, we specify the --table-type as COPY_ON_WRITE. Hudi also supports another MERGE_ON_READ type you can use if you choose from. 
  • To handle cases where the input parquet files contain multiple updates/deletes or insert/updates to the same record, we use updated_at  as the ordering field. This ensures that the change record which has the latest timestamp will be reflected in Hudi. 
  • We specify a target base path and a table table, all needed for creating  and writing to the Hudi table 
  • We use a special payload class - AWSDMSAvroPayload , to handle the different change operations correctly. The parquet files generated have an Op  field, that indicates whether a given change record is an insert (I), delete (D) or update (U) and the payload implementation uses this field to decide how to handle  a given change record. 
  • You may also notice a special transformer class AWSDmsTransformer , being specified. The reason here is tactical, but important. The initial load file does not contain an Op  field, so this adds one to Hudi table schema additionally. 
  • Finally, we specify the record key for the Hudi table as same as the upstream table. Then we specify partitioning by customer_name and also the root of the DMS output.


Once the command is run, the Hudi table should be created and have same records as the upstream table (with all the _hoodie fields as well). 

scala> spark.read.format("org.apache.hudi").load("s3://hudi-dms-demo/hudi_orders/*/*.parquet").show
+-------------------+--------------------+------------------+----------------------+--------------------+--------+---------+-------------+-------------------+-------------------+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|order_id|order_qty|customer_name|         updated_at|         created_at| Op|
+-------------------+--------------------+------------------+----------------------+--------------------+--------+---------+-------------+-------------------+-------------------+---+
|     20200120205028|  20200120205028_0_1|                 2|                 peter|af9a2525-a486-40e...|       2|       10|        peter|2020-01-20 20:12:22|2020-01-20 20:12:22|   |
|     20200120205028|  20200120205028_1_1|                 1|                victor|8e431ece-d51c-4c7...|       1|       10|       victor|2020-01-20 20:12:31|2020-01-20 20:12:31|   |
+-------------------+--------------------+------------------+----------------------+--------------------+--------+---------+-------------+-------------------+-------------------+---+


Now, let's do an insert and an update 

INSERT INTO orders(order_id, order_qty, customer_name) VALUES(3, 30, 'sandy');
UPDATE orders set order_qty = 20 where order_id = 2;

This will add a new parquet file to the DMS output folder and when the deltastreamer command is run again, it will go ahead and apply these to the Hudi table. 

So, querying the Hudi table now would yield 3 rows and the hoodie_commit_time accurately reflects when these writes happened. You can notice that order_qty for order_id=2, is updated from 10 to 20! 

+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| Op|order_id|order_qty|customer_name|         updated_at|         created_at|
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+
|     20200120211526|  20200120211526_0_1|                 2|                 peter|af9a2525-a486-40e...|  U|       2|       20|        peter|2020-01-20 21:11:47|2020-01-20 20:12:22|
|     20200120211526|  20200120211526_1_1|                 3|                 sandy|566eb34a-e2c5-44b...|  I|       3|       30|        sandy|2020-01-20 21:11:24|2020-01-20 21:11:24|
|     20200120205028|  20200120205028_1_1|                 1|                victor|8e431ece-d51c-4c7...|   |       1|       10|       victor|2020-01-20 20:12:31|2020-01-20 20:12:31|
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+

A nice debugging aid would be read all of the DMS output now and sort it by update_at, which should give us a sequence of changes that happened on the upstream table. As we can see, the Hudi table above is a compacted snapshot of this raw change log.  

+----+--------+---------+-------------+-------------------+-------------------+ 
|  Op|order_id|order_qty|customer_name|         updated_at|         created_at|
+----+--------+---------+-------------+-------------------+-------------------+
|null|       2|       10|        peter|2020-01-20 20:12:22|2020-01-20 20:12:22|
|null|       1|       10|       victor|2020-01-20 20:12:31|2020-01-20 20:12:31|
|   I|       3|       30|        sandy|2020-01-20 21:11:24|2020-01-20 21:11:24|
|   U|       2|       20|        peter|2020-01-20 21:11:47|2020-01-20 20:12:22|
+----+--------+---------+-------------+-------------------+-------------------+

Initial load with no Op  field value , followed by an insert and an update.


Now, lets do deletes an inserts 

DELETE FROM orders WHERE order_id = 2;
INSERT INTO orders(order_id, order_qty, customer_name) VALUES(4, 40, 'barry');
INSERT INTO orders(order_id, order_qty, customer_name) VALUES(5, 50, 'nathan');


This should result in more files on S3, written by DMS , which the DeltaStreamer command will continue to process incrementally (i.e only the newly written files are read each time) 

Running the deltastreamer command again, would result in the follow state for the Hudi table. You can notice the two new records and that the order_id=2 is now gone


+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| Op|order_id|order_qty|customer_name|         updated_at|         created_at|
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+
|     20200120212522|  20200120212522_1_1|                 5|                nathan|3da94b20-c70b-457...|  I|       5|       50|       nathan|2020-01-20 21:23:00|2020-01-20 21:23:00|
|     20200120212522|  20200120212522_2_1|                 4|                 barry|8cc46715-8f0f-48a...|  I|       4|       40|        barry|2020-01-20 21:22:49|2020-01-20 21:22:49|
|     20200120211526|  20200120211526_1_1|                 3|                 sandy|566eb34a-e2c5-44b...|  I|       3|       30|        sandy|2020-01-20 21:11:24|2020-01-20 21:11:24|
|     20200120205028|  20200120205028_1_1|                 1|                victor|8e431ece-d51c-4c7...|   |       1|       10|       victor|2020-01-20 20:12:31|2020-01-20 20:12:31|
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---------+-------------+-------------------+-------------------+


Our little informal change log query yields the following.

+----+--------+---------+-------------+-------------------+-------------------+
|  Op|order_id|order_qty|customer_name|         updated_at|         created_at|
+----+--------+---------+-------------+-------------------+-------------------+
|null|       2|       10|        peter|2020-01-20 20:12:22|2020-01-20 20:12:22|
|null|       1|       10|       victor|2020-01-20 20:12:31|2020-01-20 20:12:31|
|   I|       3|       30|        sandy|2020-01-20 21:11:24|2020-01-20 21:11:24|
|   U|       2|       20|        peter|2020-01-20 21:11:47|2020-01-20 20:12:22|
|   D|       2|       20|        peter|2020-01-20 21:11:47|2020-01-20 20:12:22|
|   I|       4|       40|        barry|2020-01-20 21:22:49|2020-01-20 21:22:49|
|   I|       5|       50|       nathan|2020-01-20 21:23:00|2020-01-20 21:23:00|
+----+--------+---------+-------------+-------------------+-------------------+



Note that the delete and update have the same updated_at, value. thus it can very well order differently here.. In short this way of looking at the changelog has its caveats. For a true changelog of the Hudi table itself, you can issue an incremental query.


And Life goes on .....  Hope this was useful to all the data engineers out there! 




  • No labels

24 Comments

  1. Thanks Vinod for putting this blog.  It has been very helpful.

    Followed all the steps as you had outlined and it has worked well except at a couple of places.

    I had to get the latest hudi utilities bundle (as the one included in AWS EMR is a old one) and also I had to create dfs-source.properties file manually. As the delta streamer complained that the properties files are not found. After these steps, everything worked fine.

    Once again thanks for putting this nice post.

    Have a great day!!!

    1. Can you explain the process to create that file manually, please?

  2. Chandra Thanks for the feedback! Really appreciate it. 


    >As the delta streamer complained that the properties files are not found

    That is just a warning correct? delta streamer should go ahead as long as the properties are overriden using command line? 

  3. Vinoth Chandar Thank you, this has been helpful. Can you suggest what's the easiest workaround if I have to use DeltaStreamer for multiple tables, not just one table?

    1. Pratyaksh Sharma  has an open PR for this.. are you interested in trying it out?  It will go on the next release (smile) 

      1. Yes, I'd like to try that. Do you have the beta version available on any repository? When is the next release planned to be?

        1. Let me circle back here, once the PR lands in master.. Since this is a major feature, it will be targetted for 0.6.0 release in march

          An easy way to follow whats going on and even take part in shaping it would be engage on this PR

          https://github.com/apache/incubator-hudi/pull/1150

          1. Vinoth Chandar This is very helpful article.

            Does new version 0.5.2 that released in Mar 2020 support multiple tables?

          2. Not yet . 0.6.0 will have this feature. but it's in master already, if you want to try building it yourself. 

  4. Gave a shot at following this process through using a Postgres RDS instance instead of MySQL.  Thought I would share some feedback about the experience.  I arrived here with a fairly strong background in Java, but next to no contextual experience with Spark or Hadoop.  I possibly could have made things easier on myself by using EMR, but wanted to have more control over the environment and thought just starting with a bare EC2 instance would be more informative.  I'll use this post to comment on just the DMS setup portion...

    Source Setup

    Only thing noteworthy about the decision to use Postgres was that it made it necessary to use the most recent DMS engine version, 3.4.0.  With 3.1.4, the replication consistently failed to start with a complaint about pg_xlogical not being in the catalog.  This DMS documentation resource clued me in that my instinct to start with the next-to-most recent release had been incorrect: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html#CHAP_Source.PostgreSQL.v10

    The standard process for enabling replication on a Postgres RDS instance worked just fine–I mostly just had to create an options group and set `rds.logicalReplication=1`.

    When I had finally reached a point where the Postgres connection was good, but I had not yet dealt with S3 permissions, I did have a problem where an accumulation of WAL logs for the replication slot DMS had created for itself overflowed the minimal RDS storage I had allocated for this experiment.  I had not expected this since the database was not being used for any sort of user query workload at that point, but dropping the replication slot and re-creating the Postgres source when I was ready to use it instantly recovered my space buffer, and the WAL logs did not backlog again once I granted its DWS instance access to write into S3 and then left it consistently online.

    Target Setup

    Two things worth mentioning about the S3 Parquet destination endpoint.  First was that the interface does not contain an obvious widget for switching from the default behavior of writing changes out to S3 from CDC as CSV files.  I had to do some hunting through the documentation to find that switching from the default CSV to Parquet required knowing some key-value pair assignnments I needed to type manually the "Extra Connection Attributes" field:

    dataFormat=parquet;enableStatistics=true;encodingType=rle-dictionary;parquetVersion=PARQUET_2_0

    ...at the same time I removed:

    csvDelimiter=,;csvRowDelimiter=\n

    ...and I also added use of a sub-folder as my root:

    bucketFolder=hudidb

    Second thing I had to figure out on my own was a role with permission to write to the S3 bucket since I had not left it open to the public.  I probably should not have needed to read that to remember, but it would have spared me some confusion and made for a smoother experiment if a gentle reminder had been offered all the same.

    1. This is great feedback! We do want to make the ingestion easy without even spark background.. let me process this and work it into the blog..

      Once again, thanks for taking this time to write a detailed feedback!

      1. You're most welcome. it's a pleasure to offer something back in return for the opportunity to become more familiar with these tools.  One thing I forgot to mention but would like to add is that the s3:// filesystem prefix corresponds to a legacy version of Spark's adapter interface for remote filesystem extensions.  Newer implementations are s3n:// and s3a://, and the latter of those three turned out to be the only one I've been able to work with.

        Security credentials and TLS certificates were a little troublesome to configure for, but I got a lot of what I needed there from two resources:
        http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
        https://docs.cloudera.com/documentation/enterprise/5-9-x/topics/spark_s3.html


  5. My team has a fair amount of investment in CDC through Debezium by way of Kafka Connect.  I've had a rather easy time experimenting with the Schema Registry based SchemaProvider and the Kafka Topic SourceProvider used in your Docker Demo.  The AWSDMSAvroPayload and AWSDmsTransformer seem very similar in purpose to the Record and SingleMessageTransform abstractions in Apache's Kafka Connect.  Connect has a very Avro-influenced Schema concept, but it intentionally more abstract about how its types match to concrete storage representation.  I wonder if you have any plans to provide a path to help development teams with an existing investment in Kaffka Connect's Single Message Transformations somehow adapt those ingress/egress time transformations for reuse with Hudi?

    1. There are people who have done this already. DeltaStreamer can read the Confluent avro data already, if you can raise a JIRA with any gaps, happy to make this work in the next release. Its a very valuable integration for the community at large as well

  6. Vinoth Chandar Thanks for putting together this blog. We are currently trying to run a small POC with HUDI. The main question I have is how can I pass in the composite key to the DeltaStreamer. Can I just supply the keys as comma separated on this config hoodie.datasource.write.recordkey.field=ky1,ky2,k3 or do I need do something else to accomplish this.


    Thanks

    1. Syed Musa Raza Zaidi  please direct these questions to the mailing list.. where there are lot more eyes to help you... 


      if you want a compound key, you can specify https://github.com/apache/incubator-hudi/blob/652224edc882c083ac46cff095324975e2457004/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java as the key generator..

  7. Vinoth Chandar I tried it out with EMR 5.3.0 that has hudi-utilities-bundle_2.11-0.5.2-incubating.jar and packages org.apache.spark:spark-avro_2.11:2.4.5

    I tried enabling hive sync and end up in adding properties hive_sync.partition_extractor_class & hive_sync.partition_fields and able to sync hive table as part of history run.  On the consecutive run for deltas spark-submit starts complaining and fails saying Exception in thread "main" org.apache.hudi.exception.HoodieException: Please provide a valid schema provider class! 
    The moment i added --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider it starts looking for the dfs-source.properties file which in turn has two properties which i havent passed on the spark submit 
    hoodie.deltastreamer.filebased.schemaprovider.source.schema.file & hoodie.deltastreamer.filebased.schemaprovider.target.schema.file 
    I don't understand for processing parquet files that sits on s3 why would we need avsc files ? 

    1. Bhavani Sudha cc 


      Saravanan Prabhagaran we can continue this on the mailing list, which gets faster responses for questions like these.. Not sure why are using FilebaseSchemaProvider, given you already have s3 parquet input files? 

    2. I have the same issue, did you find any solution?

  8. Hi,

    I am facing some issues with the steps. Kindly provide some insights:

    > Error1: It expects a .hoodie folder in the target-path

    Exception in thread "main" org.apache.hudi.exception.TableNotFoundException: Hoodie table not found in path s3://dmssourceandtargettest/test/.hoodie

    I had this conception that it automatically create the .hoodie folder when we run spark-sumit command

    > Error2: It expects a schema provider class

    I tried passing in the parameter

    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

    but it expects a hoodie.properties file and source schema file

    Exception in thread "main" java.io.IOException: Could not load schema provider class org.apache.hudi.utilities.schema.FilebasedSchemaProvider


    Note: I tried out with EMR 5.2.0 which has "hudi-utilities-bundle_2.11-0.5.2-incubating.jar" and "hudi-utilities-bundle.jar".


    Can you let us know if this is expected and the proper steps to eliminate the error

    1. I am seeing similar behavior. This blog talks about using Hudi 5.1. In fact I tested out with both Hudi 5.0 and 5.2 because Amazon EMR only supports these 2 versions but i am seeing similar behaviour.


      For 5.0 all the delta streamer config files are requested :-

      https://github.com/logicalclocks/hudi/tree/master/hudi-utilities/src/test/resources/delta-streamer-config

      If these are manually created I am getting below exception :-

      ========

      Exception in thread "main" java.lang.NullPointerException
      at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:67)
      at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:292)
      at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:214)
      at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:120)
      at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:292)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
      at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
      at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
      at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
      at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
      at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

      ===========

      I got passed the .hoodie exception by creating it manually.


      For 5.2 I got the same exception as you :-

      Exception in thread "main" org.apache.hudi.exception.HoodieException: Please provide a valid schema provider class!

      This exception was added in Nov 2019 :-

      https://github.com/apache/hudi/pull/995


      Vinoth Chandar

      @Can you please help in resolving this error on Hudi 5.2/5.0?

    2. Hi folks.. can you please open a github issue (which is the support forum) or mailing list, with the steps to repro .. and we can go from there.. ?

  9. Hello.

    Can we have the same example with Spark datasource writer?

    Thank you.

  10. Hello, have you tried the experiment of rds-> dms-> msk-> hudi processing streaming data? I have some questions about processing and deleting data. Do you have any suggestions for update and delete streaming data?