Extract Transform Load (ETL) Example

The ETL (Extract, Transform, Load) example shows how to load data into a database using Camel. In this example we will poll for files, transform them and then store them in the database via the JPA component.

Overview

This example lives in the examples/camel-example-etl directory and will poll for XML files in the child src/data directory. When the files are detected, they are converted, using the fallback JAXB Type Converter to a PersonDocument class. This POJO is then transformed using a custom Type Converter into a CustomerEntity bean using the CustomerTransformer class. Then this bean is stored in the database via the JPA component.

The code for this example is as follows

Java Class

Purpose

CustomerEntity

The JPA entity bean (i.e. a POJO with @Entity)

PersonDocument

The JAXB2 POJO used to parse the XML

CustomerTransformer

The custom Type Converter used to convert a PersonDocument into a CustomerEntity

EtlRoutes

The Camel routing DSL to define the flow from the files to the converter to the JPA endpoint

Then there is the spring configuration file in src/resources/META-INF/services/camel-context.xml which defines the JPA template and tells Camel to look in the org.apache.camel.example.etl package to find its routes.

Code walkthrough

So lets start with the route definition in EtlRoutes

public class EtlRoutes extends SpringRouteBuilder {
    public void configure() throws Exception {

        from("file:src/data?noop=true")
            .convertBodyTo(PersonDocument.class)
            .to("jpa:org.apache.camel.example.etl.CustomerEntity");

        // the following will dump the database to files
        from("jpa:org.apache.camel.example.etl.CustomerEntity?consumeDelete=false&delay=3000&consumeLockEntity=false")
            .setHeader(Exchange.FILE_NAME, el("${in.body.userName}.xml"))
            .to("file:target/customers");
    }
}

The above sets up a route from the src/data directory. Notice we're using the noop mode of the File component so that the files are not moved or deleted when they are processed (so when the tool is restarted they will be processed again). Because the file consumer uses a memory based Idempotent Consumer every file is only processed once (per program run) (see also noop and idempotent* options from file component).

We're converting the body of the message to a PersonDocument which since this POJO as an @XmlRootElement annotation from JAXB will kick in the Type Converter to use JAXB to unmarshall the object.

Then we send the message with a PersonDocument body to the JPA endpoint. Notice how this endpoint specifies the expected type. So the Type Converter is gonna try convert the PersonDocument to a CustomerEntity. Here Camel will find the CustomerTransformer class which has an @Converter method

@Converter
public final class CustomerTransformer {

    private static final Logger LOG = LoggerFactory.getLogger(CustomerTransformer.class);

    private CustomerTransformer() {
    }

    /**
     * A transformation method to convert a person document into a customer
     * entity
     */
    @Converter
    public static CustomerEntity toCustomer(PersonDocument doc, Exchange exchange) throws Exception {
        EntityManager entityManager = exchange.getIn().getHeader(JpaConstants.ENTITYMANAGER, EntityManager.class);
        TransactionTemplate transactionTemplate = exchange.getContext().getRegistry().lookupByNameAndType("transactionTemplate", TransactionTemplate.class);

        String user = doc.getUser();
        CustomerEntity customer = findCustomerByName(transactionTemplate, entityManager, user);

        // let's convert information from the document into the entity bean
        customer.setUserName(user);
        customer.setFirstName(doc.getFirstName());
        customer.setSurname(doc.getLastName());
        customer.setCity(doc.getCity());

        LOG.info("Created object customer: {}", customer);
        return customer;
    }

    /**
     * Finds a customer for the given username
     */
    private static CustomerEntity findCustomerByName(TransactionTemplate transactionTemplate, final EntityManager entityManager, final String userName) throws Exception {
        return transactionTemplate.execute(new TransactionCallback<CustomerEntity>() {
            public CustomerEntity doInTransaction(TransactionStatus status) {
                entityManager.joinTransaction();
                List<CustomerEntity> list = entityManager.createNamedQuery("findCustomerByUsername", CustomerEntity.class).setParameter("userName", userName).getResultList();
                CustomerEntity answer;
                if (list.isEmpty()) {
                    answer = new CustomerEntity();
                    answer.setUserName(userName);
                    LOG.info("Created a new CustomerEntity {} as no matching persisted entity found.", answer);
                } else {
                    answer = list.get(0);
                    LOG.info("Found a matching CustomerEntity {} having the userName {}.", answer, userName);
                }

                return answer;
            }
        });
    }

}

which performs the necessary conversion to an entity bean which is then stored in the database

Running the example

To run the example we use the Camel Maven Plugin. For example from the source or binary distribution the following should work

cd examples/camel-example-etl
mvn camel:run

If you prefer you can just run the Main directly using

mvn compile exec:java

Please note that when you run the example for the first time, the converter CustomerTransformer will not be able to find any entities inside the database, so that along the logs written into the console you should see:

...
thread #0 - file://src/data] CustomerTransformer            INFO  Created a new CustomerEntity Customer[userName: james firstName: null surname: null] as no matching persisted entity found.
...
thread #0 - file://src/data] CustomerTransformer            INFO  Created a new CustomerEntity Customer[userName: hiram firstName: null surname: null] as no matching persisted entity found.
...

However running the example for a second time, as the entites have been already inserted into the database, the log should now say:

...
thread #0 - file://src/data] CustomerTransformer            INFO  Found a matching CustomerEntity Customer[userName: james firstName: James surname: Strachan] having the userName james.
...
thread #0 - file://src/data] CustomerTransformer            INFO  Found a matching CustomerEntity Customer[userName: hiram firstName: Hiram surname: Chirino] having the userName hiram.
...

Failing that you can run the Main from inside your IDE if you prefer. Follow the Building instructions to create an Eclipse/IDEA project to import

  • No labels