Hadoop requires a client that can be used to interact remotely with the services provided by Hadoop cluster.
This will also be true when using the Apache Knox Gateway to provide perimeter security and centralized access for these services.
The two primary existing clients for Hadoop are the CLI (i.e. Command Line Interface, hadoop) and HUE (i.e. Hadoop User Environment).
for several reasons however, neither of these clients can currently be used to access Hadoop services via the Apache Knox Gateway.
This lead to thinking about a very simple client that could help people use and evaluate the gateway.
The list below outline the general requirements for such a client.
The result is a very simple DSL (Domain Specific Language) of sorts that is used via Groovy scripts.
Here is an example of a command that copies a file from the local file system to HDFS.
Note: The variables session, localFile and remoteFile are assumed to be defined.
Hdfs.put( session ).file( localFile ).to( remoteFile ).now() |
This work is very early in development but is also very useful in its current state.
We are very interested in receiving feedback about how to improve this feature and the DSL in particular.
A note of thanks to REST-assured which provides a Fluent interface style DSL for testing REST services.
It served as the initial inspiration for the creation of this DSL.
This document assumes a few things about your environment in order to simplify the examples.
The DSL requires a shell to interpret the Groovy script.
The shell can either be used interactively or to execute a script file.
To simplify use, the distribution contains an embedded version of the Groovy shell.
The shell can be run interactively. Use the command exit
to exit.
java -jar bin/shell.jar |
When running interactively it may be helpful to reduce some of the output generated by the shell console.
Use the following command in the interactive shell to reduce that output.
This only needs to be done once as these preferences are persisted.
set verbosity QUIET set show-last-result false |
Also when running interactively use the exit
command to terminate the shell.
Using ^C
to exit can sometimes leaves the parent shell in a problematic state.
The shell can also be used to execute a script by passing a single filename argument.
java -jar bin/shell.jar samples/ExamplePutFile.groovy |
Once the shell can be launched the DSL can be used to interact with the gateway and Hadoop.
Below is a very simple example of an interactive shell session to upload a file to HDFS.
java -jar bin/shell.jar knox:000> hadoop = Hadoop.login( "https://localhost:8443/gateway/sample", "bob", "bob-password" ) knox:000> Hdfs.put( hadoop ).file( "README" ).to( "/tmp/example/README" ).now() |
The knox:000>
in the example above is the prompt from the embedded Groovy console.
If you output doesn't look like this you may need to set the verbosity and show-last-result preferences as described above in the Usage section.
If you relieve an error HTTP/1.1 403 Forbidden
it may be because that file already exists.
Try deleting it with the following command and then try again.
knox:000> Hdfs.rm(hadoop).file("/tmp/example/README").now() |
Without using some other tool to browse HDFS it is hard to tell that that this command did anything.
Execute this to get a bit more feedback.
knox:000> println "Status=" + Hdfs.put( hadoop ).file( "README" ).to( "/tmp/example/README2" ).now().statusCode Status=201 |
Notice that a different filename is used for the destination.
Without this an error would have resulted.
Of course the DSL also provides a command to list the contents of a directory.
knox:000> println Hdfs.ls( hadoop ).dir( "/tmp/example" ).now().string {"FileStatuses":{"FileStatus":[{"accessTime":1363711366977,"blockSize":134217728,"group":"hdfs","length":19395,"modificationTime":1363711366977,"owner":"bob","pathSuffix":"README","permission":"644","replication":1,"type":"FILE"},{"accessTime":1363711375617,"blockSize":134217728,"group":"hdfs","length":19395,"modificationTime":1363711375617,"owner":"bob","pathSuffix":"README2","permission":"644","replication":1,"type":"FILE"}]}} |
It is a design decision of the DSL to not provide type safe classes for various request and response payloads.
Doing so would provide an undesirable coupling between the DSL and the service implementation.
It also would make adding new commands much more difficult.
See the Groovy section below for a variety capabilities and tools for working with JSON and XML to make this easy.
The example below shows the use of JsonSlurper and GPath to extract content from a JSON response.
knox:000> import groovy.json.JsonSlurper knox:000> text = Hdfs.ls( hadoop ).dir( "/tmp/example" ).now().string knox:000> json = (new JsonSlurper()).parseText( text ) knox:000> println json.FileStatuses.FileStatus.pathSuffix [README, README2] |
In the future, "built-in" methods to slurp JSON and XML may be added to make this a bit easier.
This would allow for this type if single line interaction.
println Hdfs.ls(hadoop).dir("/tmp").now().json().FileStatuses.FileStatus.pathSuffix
Shell session should always be ended with shutting down the session.
The examples above do not touch on it but the DSL supports the simple execution of commands asynchronously.
The shutdown command attempts to ensures that all asynchronous commands have completed before existing the shell.
knox:000> hadoop.shutdown() knox:000> exit |
All of the commands above could have been combined into a script file and executed as a single line.
java -jar bin/shell.jar samples/ExamplePutFile.groovy |
This script file is available in the distribution but for convenience, this is the content.
import org.apache.hadoop.gateway.shell.Hadoop import org.apache.hadoop.gateway.shell.hdfs.Hdfs import groovy.json.JsonSlurper gateway = "https://localhost:8443/gateway/sample" username = "bob" password = "bob-password" dataFile = "README" hadoop = Hadoop.login( gateway, username, password ) Hdfs.rm( hadoop ).file( "/tmp/example" ).recursive().now() Hdfs.put( hadoop ).file( dataFile ).to( "/tmp/example/README" ).now() text = Hdfs.ls( hadoop ).dir( "/tmp/example" ).now().string json = (new JsonSlurper()).parseText( text ) println json.FileStatuses.FileStatus.pathSuffix hadoop.shutdown() exit |
Notice the Hdfs.rm command. This is included simply to ensure that the script can be rerun.
Without this an error would result the second time it is run.
The DSL supports the ability to invoke commands asynchronously via the later() invocation method.
The object returned from the later() method is a java.util.concurrent.Future parametrized with the response type of the command.
This is an example of how to asynchronously put a file to HDFS.
future = Hdfs.put(hadoop).file("README").to("tmp/example/README").later() println future.get().statusCode |
The future.get() method will block until the asynchronous command is complete.
To illustrate the usefullness of this however multiple concurrent commands are required.
readmeFuture = Hdfs.put(hadoop).file("README").to("tmp/example/README").later() licenseFuture = Hdfs.put(hadoop).file("LICENSE").to("tmp/example/LICENSE").later() hadoop.waitFor( readmeFuture, licenseFuture ) println readmeFuture.get().statusCode println licenseFuture.get().statusCode |
The hadoop.waitFor() method will wait for one or more asynchronous commands to complete.
Futures alone only provide asynchronous invocation of the command.
What if some processing should also occur asynchronously once the command is complete.
Support for this is provided by closures.
Closures are blocks of code that are passed into the later() invocation method.
In Groovy these are contained within {} immediately after a method.
These blocks of code are executed once the asynchronous command is complete.
Hdfs.put(hadoop).file("README").to("tmp/example/README").later(){ println it.statusCode } |
In this example the put() command is executed on a separate thread and once complete the println it.statusCode
block is executed on that thread.
The it variable is automatically populated by Groovy and is a reference to the result that is returned from the future or now() method.
The future example above can be rewritten to illustrate the use of closures.
readmeFuture = Hdfs.put(hadoop).file("README").to("tmp/example/README").later() { println it.statusCode } licenseFuture = Hdfs.put(hadoop).file("LICENSE").to("tmp/example/LICENSE").later() { println it.statusCode } hadoop.waitFor( readmeFuture, licenseFuture ) |
Again, the hadoop.waitFor() method will wait for one or more asynchronous commands to complete.
In order to understand the DSL there are three primary constructs that need to be understood.
This construct encapsulates the client side session state that will be shared between all command invocations.
In particular it will simplify the management of any tokens that need to be presented with each command invocation.
It also manages a thread pool that is used by all asynchronous commands which is why it is important to call one of the shutdown methods.
The syntax associated with this is expected to change we expect that credentials will not need to be provided to the gateway.
Rather it is expected that some form of access token will be used to initialize the session.
Services are the primary extension point for adding new suites of commands.
The built in examples are: Hdfs, Job and Workflow.
The desire for extensibility is the reason for the slightly awkward Hdfs.ls(hadoop) syntax.
Certainly something more like hadoop.hdfs().ls() would have been preferred but this would prevent adding new commands easily.
At a minimum it would result in extension commands with a different syntax from the "built-in" commands.
The service objects essentially function as a factory for a suite of commands.
Commands provide the behavior of the DSL.
They typically follow a Fluent interface style in order to allow for single line commands.
There are really three parts to each command: Request, Invocation, Response
The request is populated by all of the methods following the "verb" method and the "invoke" method.
For example in Hdfs.rm(hadoop).ls(dir).now() the request is populated between the "verb" method rm() and the "invoke" method now().
The invocation method controls how the request is invoked.
Currently supported synchronous and asynchronous invocation.
The now() method executes the request and returns the result immediately.
The later() method submits the request to be executed later and returns a future from which the result can be retrieved.
In addition later() invocation method can optionally be provided a closure to execute when the request is complete.
See the Futures and Closures sections below for additional detail and examples.
The response contains the results of the invocation of the request.
In most cases the response is a thin wrapper over the HTTP response.
In fact many commands will share a single BasicResponse type that only provides a few simple methods.
public int getStatusCode() public long getContentLength() public String getContentType() public String getContentEncoding() public InputStream getStream() public String getString() public byte[] getBytes() public void close(); |
Thanks to Groovy these methods can be accessed as attributes.
In the some of the examples the staticCode was retrieved for example.
println Hdfs.put(hadoop).rm(dir).now().statusCode |
Groovy will invoke the getStatusCode method to retrieve the statusCode attribute.
The three methods getStream(), getBytes() and getString deserve special attention.
Care must be taken that the HTTP body is read only once.
Therefore one of these methods (and only one) must be called once and only once.
Calling one of these more than once will cause an error.
Failing to call one of these methods once will result in lingering open HTTP connections.
The close() method may be used if the caller is not interested in reading the result body.
Most commands that do not expect a response body will call close implicitly.
If the body is retrieved via getBytes() or getString(), the close() method need not be called.
When using getStream(), care must be taken to consume the entire body otherwise lingering open HTTP connections will result.
The close() method may be called after reading the body partially to discard the remainder of the body.
There are three basic DSL services and commands bundled with the shell.
Provides basic HDFS commands.
Using these DSL commands requires that WebHDFS be running in the Hadoop cluster.
Provides basic job submission and status commands.
Using these DSL commands requires that Templeton/WebHCat be running in the Hadoop cluster.
Provides basic workflow submission and status commands.
Using these DSL commands requires that Oozie be running in the Hadoop cluster.
Hdfs.ls(hadoop).ls().dir("/").now()
Hdfs.rm(hadoop).file("/tmp/example").recursive().now()
Hdfs.put(hadoop).file("localFile").to("/tmp/example/remoteFile").now()
Hdfs.get(hadoop).file("localFile").from("/tmp/example/remoteFile").now()
Hdfs.mkdir(hadoop).dir("/tmp/example").perm("777").now()
Job.submitJava(hadoop).jar(remoteJarName).app(appName).input(remoteInputDir).output(remoteOutputDir).now().jobId
Job.submitPig(hadoop).file(remotePigFileName).arg("-v").statusDir(remoteStatusDir).now()
Job.submitHive(hadoop).file(remoteHiveFileName).arg("-v").statusDir(remoteStatusDir).now()
Job.queryQueue(hadoop).now().string
Job.queryStatus(hadoop).jobId(jobId).now().string
Workflow.submit(hadoop).file(localFile).action("start").now()
Workflow.status(hadoop).jobId(jobId).now().string
Extensibility is a key design goal of the KnoxShell and DSL.
There are two ways to provide extended functionality for use with the shell.
The first is to simply create Groovy scripts that use the DSL to perform a useful task.
The second is to add new services and commands.
In order to add new service and commands new classes must be written in either Groovy or Java and added to the classpath of the shell.
Fortunately there is a very simple way to add classes and JARs to the shell classpath.
The first time the shell is executed it will create a configuration file in the same directory as the JAR with the same base name and a .cfg
extension.
bin/shell.jar bin/shell.cfg |
That file contains both the main class for the shell as well as a definition of the classpath.
Currently that file will by default contain the following.
main.class=org.apache.hadoop.gateway.shell.Shell class.path=../lib; ../lib/*.jar; ../ext; ../ext/*.jar |
Therefore to extend the shell you should copy any new service and command class either to the ext
directory or if they are packaged within a JAR copy the JAR to the ext
directory.
The lib
directory is reserved for JARs that may be delivered with the product.
Below are samples for the service and command classes that would need to be written to add new commands to the shell.
These happen to be Groovy source files but could with very minor changes be Java files.
The easiest way to add these to the shell is to compile them directory into the ext
directory.
Note: This command depends upon having the Groovy compiler installed and available on the execution path.
groovyc \-d ext \-cp bin/shell.jar samples/SampleService.groovy samples/SampleSimpleCommand.groovy samples/SampleComplexCommand.groovy |
These source files are available in the samples directory of the distribution but these are included here for convenience.
import org.apache.hadoop.gateway.shell.Hadoop class SampleService { static String PATH = "/namenode/api/v1" static SimpleCommand simple( Hadoop hadoop ) { return new SimpleCommand( hadoop ) } static ComplexCommand.Request complex( Hadoop hadoop ) { return new ComplexCommand.Request( hadoop ) } } |
import org.apache.hadoop.gateway.shell.AbstractRequest import org.apache.hadoop.gateway.shell.BasicResponse import org.apache.hadoop.gateway.shell.Hadoop import org.apache.http.client.methods.HttpGet import org.apache.http.client.utils.URIBuilder import java.util.concurrent.Callable class SimpleCommand extends AbstractRequest<BasicResponse> { SimpleCommand( Hadoop hadoop ) { super( hadoop ) } private String param SimpleCommand param( String param ) { this.param = param return this } @Override protected Callable<BasicResponse> callable() { return new Callable<BasicResponse>() { @Override BasicResponse call() { URIBuilder uri = uri( SampleService.PATH, param ) addQueryParam( uri, "op", "LISTSTATUS" ) HttpGet get = new HttpGet( uri.build() ) return new BasicResponse( execute( get ) ) } } } } |
import com.jayway.jsonpath.JsonPath import org.apache.hadoop.gateway.shell.AbstractRequest import org.apache.hadoop.gateway.shell.BasicResponse import org.apache.hadoop.gateway.shell.Hadoop import org.apache.http.HttpResponse import org.apache.http.client.methods.HttpGet import org.apache.http.client.utils.URIBuilder import java.util.concurrent.Callable class ComplexCommand { static class Request extends AbstractRequest<Response> { Request( Hadoop hadoop ) { super( hadoop ) } private String param; Request param( String param ) { this.param = param; return this; } @Override protected Callable<Response> callable() { return new Callable<Response>() { @Override Response call() { URIBuilder uri = uri( SampleService.PATH, param ) addQueryParam( uri, "op", "LISTSTATUS" ) HttpGet get = new HttpGet( uri.build() ) return new Response( execute( get ) ) } } } } static class Response extends BasicResponse { Response(HttpResponse response) { super(response) } public List<String> getNames() { return JsonPath.read( string, "\$.FileStatuses.FileStatus[*].pathSuffix" ) } } } |
The shell included in the distribution is basically an unmodified packaging of the Groovy shell.
The distribution does however provide a wrapper that makes it very easy to setup the class path for the shell.
In fact the JARs required to execute the DSL are included on the class path by default.
Therefore these command are functionally equivalent if you have Groovy installed15.
See below for a description of what is required for {JARs required by the DSL from lib and dep
}
java -jar bin/shell.jar samples/ExamplePutFile.groovy groovy -classpath {JARs required by the DSL from lib and dep} samples/ExamplePutFile.groovy |
The interactive shell isn't exactly equivalent.
However the only difference is that the shell.jar automatically executes some additional imports that are useful for the KnoxShell DSL.
So these two sets of commands should be functionality equivalent.
However there is currently a class loading issue that prevents the groovysh command from working propertly.
java -jar bin/shell.jar groovysh -classpath {JARs required by the DSL from lib and dep} import org.apache.hadoop.gateway.shell.Hadoop import org.apache.hadoop.gateway.shell.hdfs.Hdfs import org.apache.hadoop.gateway.shell.job.Job import org.apache.hadoop.gateway.shell.workflow.Workflow import java.util.concurrent.TimeUnit |
Alternatively, you can use the Groovy Console which does not appear to have the same class loading issue.
groovyConsole -classpath {JARs required by the DSL from lib and dep} import org.apache.hadoop.gateway.shell.Hadoop import org.apache.hadoop.gateway.shell.hdfs.Hdfs import org.apache.hadoop.gateway.shell.job.Job import org.apache.hadoop.gateway.shell.workflow.Workflow import java.util.concurrent.TimeUnit |
The list of JARs currently required by the DSL is
lib/gateway-shell-${gateway-version}.jar dep/httpclient-4.2.3.jar dep/httpcore-4.2.2.jar dep/commons-lang3-3.1.jar dep/commons-codec-1.7.jar |
So on Linux/MacOS you would need this command
groovy -cp lib/gateway-shell-0.2.0-SNAPSHOT.jar:dep/httpclient-4.2.3.jar:dep/httpcore-4.2.2.jar:dep/commons-lang3-3.1.jar:dep/commons-codec-1.7.jar samples/ExamplePutFile.groovy |
and on Windows you would need this command
groovy -cp lib/gateway-shell-0.2.0-SNAPSHOT.jar;dep/httpclient-4.2.3.jar;dep/httpcore-4.2.2.jar;dep/commons-lang3-3.1.jar;dep/commons-codec-1.7.jar samples/ExamplePutFile.groovy |
The exact list of required JARs is likely to change from release to release so it is recommended that you utilize the wrapper bin/shell.jar
.
In addition because the DSL can be used via standard Groovy, the Groovy integrations in many popular IDEs (e.g. IntelliJ , Eclipse) can also be used.
This makes it particularly nice to develop and execute scripts to interact with Hadoop.
The code-completion feature in particular provides immense value.
All that is required is to add the shell-0.2.0.jar to the projects class path.
There are a variety of Groovy tools that make it very easy to work with the standard interchange formats (i.e. JSON and XML).
In Groovy the creation of XML or JSON is typically done via a "builder" and parsing done via a "slurper".
In addition once JSON or XML is "slurped" the GPath, an XPath like feature build into Groovy can be used to access data.