Generally, application data are stored on several data silos according to their goals. These data silos usually have different storage types such as HDFS, S3, or HBase, each of which are optimized for their target applications. In addition, data can be stored in various file formats, e.g., CSV, Parquet, or ORC, on even in a single storage type. Thus, it is important to provide an way to easily handle data stored on different storage types in various file formats.

Tablespaces allow users to define the locations in several storage systems where the files or data objects representing database objects can be stored. It is useful when managing several storage types because it effectively abstracts the details of underlying data sources. 

This page describes how to add a new tablespace to Tajo.

Tablespace Overview

Relationship of tablespaces, databases, and tables

Conceptually, tablespaces and databases represent the physical space and logical namespace, respectively. That is, while a database contains logical information of tables, a tablespace contains physical information about tables like how to access to data sources, authorization information, storage-specific reader and writer, etc.  As seen in the above figure, the relationship between data sources, tablespaces, databases, and tables is as follows.

For Tajo to read data from a data source, users need to register at least a tablespace for that data source. The tablespace manager of Tajo is responsible for managing the metadata of tablespaces. This metadata includes storage-specific information like table URIs, table sizes, and physical locations of table data in distributed file systems. When a query is submitted, the Tajo master and the query master ask metadata about target tables to the tablespace manager, and then sends some information required for I/O operations like data locations to Tajo workers.

How to add a new Tablespace

To add a new tablespace, you need to implement following classes and interfaces.

Fragment Interface

The Fragment is similar to the split in MapReduce. For distributed processing of a a single large table, it contains the information of which part of data will be processed by each task.

Tajo basically provides an abstract Fragment class. You need to implement a new class which inherits the Fragment class for the new Tablespace. The following methods are provided by the abstract Fragment class, and you don't have to worry about them.

Fragment
public abstract class Fragment {
  public final String getKind() { .. }
  public final URI getUri() { .. }
  public final String getInputSourceId() { .. }
  public final T getStartKey() { .. }
  public final T getEndKey() { .. }
  public final long getLength() { .. }
  public final ImmutableList<String> getHostNames() { .. }
}
  • getKind(): returns the fragment type. A fragment type should be matched to a tablespace.
  • getUri(): returns an unique URI of the fragment.
  • getInputSourceId(): returns the input source id.
  • getStartKey(): returns the start key of the data range.
  • getEndKey(): returns the end key of the data range.
  • getLength(): returns the length of data be read.
  • getHostNames(): returns the hosts who have data.

FragmentSerde Interface

FragmentSerde is responsible for serializing / deserializing a Fragment instance to / from a corresponding Protocol Buffer message. So, you also needs to define a Protocol Buffer message for the new Fragment.

FragmentSerde
public interface FragmentSerde<F extends Fragment, P extends Message> {
  Builder newBuilder();
  P serialize(F fragment);
  F deserialize(P proto);
}
  • newBuilder(): returns a builder of Protocol Buffer message.
  • serialize(): returns a serialized Protocol Buffer message instance.
  • deserialize(): returns a deserialized Fragment instance.

Once you implements a new Fragment and a corresponding FragmentSerde, you need to add them to a ${TAJO_HOME}/conf/storage-site.xml file for Tajo to recognize them. 

Here is an example configuration for FileFragment.

Fragment Configuration
<property>
  <name>tajo.storage.fragment.kind.file</name>
  <value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
  <name>tajo.storage.fragment.serde.file</name>
  <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
</property>

Tablespace Interface

Tablespace interface has several important methods to generate splits and do storage-specific works to handle table data.

Tablespace
public abstract class Tablespace {
	public URI getUri();
	public abstract URI getTableUri(String databaseName, String tableName);
	public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter);
	public abstract List<Fragment> getSplits(String inputSourceId, TableDesc tableDesc, @Nullable EvalNode filterCondition) throws IOException, TajoException;
 
	public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws TajoException, IOException;
	public abstract void purgeTable(TableDesc tableDesc) throws IOException, TajoException;
	public abstract void prepareTable(LogicalNode node) throws IOException, TajoException;
	public abstract Path commitTable(OverridableConf queryContext,
	                                 ExecutionBlockId finalEbId,
	                                 LogicalPlan plan, Schema schema,
	                                 TableDesc tableDesc) throws IOException;
}
  • getUri(): every tablespace has its unique URI. This method returns the URI of the tablespace.
  • getTableUri(): returns the URI of a table.
  • getTableVolume(): returns the total physical size of a table
  • getSplits(): generates splits for distributed processing of a table. If some filter conditions are given, it should create splits which satisfy those conditions.
  • createTable(): This method is called after executing "CREATE TABLE" statement. It does some storage-specific works to store a table. For example, it creates a proper directory which will contain table data for file systems.
  • purgeTable(): This method is called after executing "DROP TABLE" statement with the 'PURGE' option. It does some storage-specific works to remove a table. For example, it remove a proper directory which contains table data for file systems.
  • prepareTable(): This method is called before executing 'INSERT' or 'CREATE TABLE AS SELECT (CTAS)'. In general Tajo creates the target table after finishing the final sub-query of CTAS. But In the special cases, such as HBase INSERT or CTAS query uses the target table information. That kind of the storage should implements the logic related to creating table in this method.
  • commitTable(): Tajo stores result data in the staging directory. If the query fails, clean up the staging directory. Otherwise the query is successful, move to the final directory from the staging directory.

Scanner Interface

Scanner interface reads data from underlying data sources.

Scanner
public interface Scanner {
	void init() throws IOException;
	Tuple next() throws IOException;
	void close() throws IOException;
}
  • init(): It initializes the scanner.
  • next(): It returns one tuple at each call. If there are no more data to scan, it returns null.
  • close(): It closes the scanner. This method must be called if a scanner is initiated. 

Appender Interface

Appender interface writes data to underlying data sources.

Appender
public interface Appender {
	void init() throws IOException;
	void addTuple(Tuple t) throws IOException;
	void flush() throws IOException;
	void close() throws IOException;
}
  • init(): It initializes the appender.
  • addTuple(): This method writes the given tuple to the storage system. Tuple can be buffered.
  • flush(): This method flushes buffered tuples if they exist.
  • close(): It closes the appender. This method must be called if an appender is initiated.

Data Source and Tablespace Configurations

Once every modules explained above are ready, you need to configure Tajo to properly recognize your custom data source. For configuration, please refer to the following links.

  • No labels