Apache KNOX provides a single gateway to many services in your Hadoop cluster. You can leverage the KNOX shell DSL interface to interact with services such as WebHdfs, WebHCat (Templeton), Oozie, HBase, etc. For example, using groovy and DSL you can submit Hive queries via WebHCat (Templeton) as simple as:
println "[Hive.groovy] Copy Hive query file to HDFS" Hdfs.put(session).text( hive_query ).to( jobDir + "/input/query.hive" ).now() jobId = Job.submitHive(session) \ .file("${jobDir}/input/query.hive") \ .arg("-v").arg("--hiveconf").arg("TABLE_NAME=${tmpTableName}") \ .statusDir("${jobDir}/output") \ .now().jobId
submitSqoop Job API
With version of Apache KNOX 0.10.0, you can now write application using KNOX DSL for Apache SQOOP and easily submit SQOOP jobs. The WebHCAT Job class in DSL language now supports submitSqoop() as follow:
Job.submitSqoop(session) .command("import --connect jdbc:mysql://hostname:3306/dbname ... ") .statusDir(remoteStatusDir) .now().jobId
submitSqoop Request takes the following arguments:
- command (String) - The sqoop command string to execute.
- files (String) - Comma separated files to be copied to the templeton controller job.
- optionsfile (String) - The remote file which contain Sqoop command need to run.
- libdir (String) - The remote directory containing jdbc jar to include with sqoop lib
- statusDir (String) - The remote directory to store status output.
which will return jobId as Response.
Simple example
In this example we will run a simple sqoop job to extract scBlastTab table to HFDS from the public genome database (mySQL) at UCSC.
First, import the following packages:
import com.jayway.jsonpath.JsonPath import groovy.json.JsonSlurper import org.apache.hadoop.gateway.shell.Hadoop import org.apache.hadoop.gateway.shell.hdfs.Hdfs import org.apache.hadoop.gateway.shell.job.Job import static java.util.concurrent.TimeUnit.SECONDS
Next, establish connection to KNOX gateway with Hadoop.login:
// Get gatewayUrl and credentials from environment def env = System.getenv() gatewayUrl = env.gateway username = env.username password = env.password jobDir = "/user/" + username + "/sqoop" session = Hadoop.login( gatewayUrl, username, password ) println "[Sqoop.groovy] Delete " + jobDir + ": " + Hdfs.rm( session ).file( jobDir ).recursive().now().statusCode println "[Sqoop.groovy] Mkdir " + jobDir + ": " + Hdfs.mkdir( session ).dir( jobDir ).now().statusCode
Define your SQOOP job (assuming SQOOP is already configured with mySql driver already):
// Database connection information db = [ driver:"com.mysql.jdbc.Driver", url:"jdbc:mysql://genome-mysql.cse.ucsc.edu/hg38", user:"genome", password:"", name:"hg38", table:"scBlastTab", split:"query" ] targetdir = jobDir + "/" + db.table sqoop_command = "import --driver ${db.driver} --connect ${db.url} --username ${db.user} --password ${db.password} --table ${db.table} --split-by ${db.split} --target-dir ${targetdir}"
You can now submit the sqoop_command to the cluster with submitSqoop:
jobId = Job.submitSqoop(session) \ .command(sqoop_command) \ .statusDir("${jobDir}/output") \ .now().jobId println "[Sqoop.groovy] Submitted job: " + jobId
You can then check job status and output as usual:
println "[Sqoop.groovy] Polling up to 60s for job completion..." done = false count = 0 while( !done && count++ < 60 ) { sleep( 1000 ) json = Job.queryStatus(session).jobId(jobId).now().string done = JsonPath.read( json, "\$.status.jobComplete" ) print "."; System.out.flush(); } println "" println "[Sqoop.groovy] Job status: " + done // Check output directory text = Hdfs.ls( session ).dir( jobDir + "/output" ).now().string json = (new JsonSlurper()).parseText( text ) println json.FileStatuses.FileStatus.pathSuffix println "\n[Sqoop.groovy] Content of stderr:" println Hdfs.get( session ).from( jobDir + "/output/stderr" ).now().string // Check table files text = Hdfs.ls( session ).dir( jobDir + "/" + db.table ).now().string json = (new JsonSlurper()).parseText( text ) println json.FileStatuses.FileStatus.pathSuffix session.shutdown()
Here is sample output of the above example against Hadoop cluster. You need to have properly configured Hadoop cluster with Apache KNOX gateway, Apache Sqoop and WebHcat (Templeton). Test was ran against BigInsights Hadoop cluster.
:compileJava UP-TO-DATE :compileGroovy :processResources UP-TO-DATE :classes :Sqoop [Sqoop.groovy] Delete /user/biadmin/sqoop: 200 [Sqoop.groovy] Mkdir /user/biadmin/sqoop: 200 [Sqoop.groovy] Submitted job: job_1476266127941_0692 [Sqoop.groovy] Polling up to 60s for job completion... ............................................ [Sqoop.groovy] Job status: true [exit, stderr, stdout] [Sqoop.groovy] Content of stderr: log4j:WARN custom level class [Relative to Yarn Log Dir Prefix] not found. 16/11/03 16:53:05 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6_IBM_27 16/11/03 16:53:06 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead. 16/11/03 16:53:06 WARN sqoop.ConnFactory: Parameter --driver is set to an explicit driver however appropriate connection manager is not being set (via --connection-manager). Sqoop is going to fall back to org.apache.sqoop.manager.GenericJdbcManager. Please specify explicitly which connection manager should be used next time. 16/11/03 16:53:06 INFO manager.SqlManager: Using default fetchSize of 1000 16/11/03 16:53:06 INFO tool.CodeGenTool: Beginning code generation SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/iop/4.2.0.0/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/iop/4.2.0.0/zookeeper/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 16/11/03 16:53:07 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM scBlastTab AS t WHERE 1=0 16/11/03 16:53:07 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM scBlastTab AS t WHERE 1=0 16/11/03 16:53:08 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/iop/4.2.0.0/hadoop-mapreduce Note: /tmp/sqoop-biadmin/compile/4432005ab10742f26cc82d5438497cae/scBlastTab.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/11/03 16:53:09 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-biadmin/compile/4432005ab10742f26cc82d5438497cae/scBlastTab.jar 16/11/03 16:53:09 INFO mapreduce.ImportJobBase: Beginning import of scBlastTab 16/11/03 16:53:09 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar 16/11/03 16:53:09 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM scBlastTab AS t WHERE 1=0 16/11/03 16:53:10 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 16/11/03 16:53:10 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2 16/11/03 16:53:15 INFO db.DBInputFormat: Using read commited transaction isolation 16/11/03 16:53:15 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(query), MAX(query) FROM scBlastTab 16/11/03 16:53:16 WARN db.TextSplitter: Generating splits for a textual index column. 16/11/03 16:53:16 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records. 16/11/03 16:53:16 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column. 16/11/03 16:53:16 INFO mapreduce.JobSubmitter: number of splits:5 16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1476266127941_0693 16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Kind: mapreduce.job, Service: job_1476266127941_0692, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@6fbb4061) 16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:ehaascluster, Ident: (HDFS_DELEGATION_TOKEN token 4660 for biadmin) 16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Kind: RM_DELEGATION_TOKEN, Service: 172.16.222.2:8032,172.16.222.3:8032, Ident: (owner=biadmin, renewer=mr token, realUser=HTTP/bicloud-fyre-physical-17-master-3.fyre.ibm.com@IBM.COM, issueDate=1478191971063, maxDate=1478796771063, sequenceNumber=67, masterKeyId=66) 16/11/03 16:53:16 WARN token.Token: Cannot find class for token kind kms-dt 16/11/03 16:53:16 WARN token.Token: Cannot find class for token kind kms-dt Kind: kms-dt, Service: 172.16.222.1:16000, Ident: 00 07 62 69 61 64 6d 69 6e 04 79 61 72 6e 05 68 62 61 73 65 8a 01 58 2b 1b 7b 34 8a 01 58 4f 27 ff 34 8e 03 a4 09 16/11/03 16:53:16 INFO mapreduce.JobSubmitter: Kind: MR_DELEGATION_TOKEN, Service: 172.16.222.3:10020, Ident: (owner=biadmin, renewer=yarn, realUser=HTTP/bicloud-fyre-physical-17-master-3.fyre.ibm.com@IBM.COM, issueDate=1478191972979, maxDate=1478796772979, sequenceNumber=52, masterKeyId=49) 16/11/03 16:53:17 INFO impl.YarnClientImpl: Submitted application application_1476266127941_0693 16/11/03 16:53:17 INFO mapreduce.Job: The url to track the job: http://bicloud-fyre-physical-17-master-2.fyre.ibm.com:8088/proxy/application_1476266127941_0693/ 16/11/03 16:53:17 INFO mapreduce.Job: Running job: job_1476266127941_0693 16/11/03 16:53:24 INFO mapreduce.Job: Job job_1476266127941_0693 running in uber mode : false 16/11/03 16:53:24 INFO mapreduce.Job: map 0% reduce 0% 16/11/03 16:53:32 INFO mapreduce.Job: map 20% reduce 0% 16/11/03 16:53:33 INFO mapreduce.Job: map 100% reduce 0% 16/11/03 16:53:34 INFO mapreduce.Job: Job job_1476266127941_0693 completed successfully 16/11/03 16:53:34 INFO mapreduce.Job: Counters: 30 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=799000 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=644 HDFS: Number of bytes written=148247 HDFS: Number of read operations=20 HDFS: Number of large read operations=0 HDFS: Number of write operations=10 Job Counters Launched map tasks=5 Other local map tasks=5 Total time spent by all maps in occupied slots (ms)=62016 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=31008 Total vcore-milliseconds taken by all map tasks=31008 Total megabyte-milliseconds taken by all map tasks=190513152 Map-Reduce Framework Map input records=2379 Map output records=2379 Input split bytes=644 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=249 CPU time spent (ms)=6590 Physical memory (bytes) snapshot=1758576640 Virtual memory (bytes) snapshot=35233165312 Total committed heap usage (bytes)=2638741504 Fit Format Counters Bytes Read=0 File Output Format Counters Bytes Written=148247 16/11/03 16:53:34 INFO mapreduce.ImportJobBase: Transferred 144.7725 KB in 23.9493 seconds (6.0449 KB/sec) 16/11/03 16:53:34 INFO mapreduce.ImportJobBase: Retrieved 2379 records. [_SUCCESS, part-m-00000, part-m-00001, part-m-00002, part-m-00003, part-m-00004] BUILD SUCCESSFUL Total time: 1 mins 2.202 secs
From output above you can see the job output as well as the content of the table directory on HDFS which contains 5 parts (used 5 map tasks). WebHcat (Templeton) job console output will go to stderr in this case.
As part of compiling/running your code ensure you have the following dependency: org.apache.knox:gateway-shell:0.10.0.