Introduction

Gobblin as it exists today is a framework that can be used to build different data integration applications like ingest, replication etc. Each of these applications are typically configured as a separate job and executed through a scheduler like Azkaban. This document outlines an approach to evolve Gobblin from a framework to a service. Specifically, our goal is to enable self service so that different users can automatically provision and execute various supported Gobblin applications limiting the need for development and operation teams to be involved during the provisioning process.

An illustrative use case below should help clarify our goals: A team building an analytics application might want to ingest external data (e.g., some tables from Salesforce) and store the ingested data in NoSQL DB for consumption by online applications. Further, the ingested data might need to be analyzed in Hadoop and after processing sent back to another online store. 

The simplified diagram below illustrates how this would actually be accomplished in practice. At a high level, at least the following needs to be done:

  1. Ingest required dataset(s) from Salesforce to NoSQL DB

  2. Ingest this data from NoSQL DB tables to HDFS

  3. Potentially, replicate the ingested data to other HDFS clusters

To accomplish provisioning of these pipelines, the sequence of events might look like:

  1. Engage various stake holders to discuss the use-case, assess feasibility, and understand the process and associated timelines (~1 week)

  2. Internalize different configuration options and learning how to construct different Gobblin flows. Write one job config for each of the tasks outlined above (~1 week)

  3. Review the work, set-up a dark launch to test the system. Iteratively continue on this process until everything works as expected (~1 week)

  4. Work towards productionizing the flows (~ 1 week)

  5. Setup monitoring for validation purposes (~1 week)



Optimistically, a new application will have to spend 4-5 weeks before their needs are met - not including the time spent in follow-ups, prioritization asks etc. If the nature of the request changes in the interim, the process might take significantly longer.

Clearly, what is missing is a central orchestration system that can be relied upon to automate this entire process and minimize human involvement. We refer to this orchestration system as Gobblin as a Service (GaaS).

To understand what GaaS will need to look like, we add details to the previously outlined higher level tasks:

  • Ingest required dataset(s) from Salesforce to NoSQL DB using Gobblin Standalone Cluster

  • Ingest this data from NoSQL DB tables to HDFS using Gobblin on Azkaban (DB Ingest)

  • Replicate the ingested data to other HDFS clusters using Gobblin on Azkaban (DistcpNg)

With a specification that contains these details, GaaS can easily execute the user’s request leveraging just the existing technologies.  

  


Problem Statement

The requirements for a successful deployment of Gobblin as a Service (GaaS) are:

  • Any user can submit their User Requests (for logical Flow) through a well defined API or a UI that GaaS exposes.

  • Gobblin as a Service will have the ability to authenticate and authorize users. Flows will only be executed for permitted users.

  • GaaS will accept User Requests (for logical Flow) and convert it into physical Gobblin Jobs (<Spec Executor, Job Specification <Job Config, Job Template>>).

  • GaaS will provision these Gobblin Jobs on relevant Spec Executors (Triplet of <technology, location, communication mechanism>)

  • GaaS will provide a unified dashboard to monitor the health of different User Requests (logical Flow).

  • GaaS will schedule logical Flow for execution as requested. These logical Flow may be compilable right away into Gobblin Jobs or at a later time when topology changes.

Expanding on the few terms that we used in the description above:

  • User Request: This is a request to manage (create, delete, update) a logical Flow that can take the form of: “Ingest Salesforce data to NoSQL DB and then load the data onto HDFS”. 
    It can be represented as a triplet: <Flow Config, Source, List<Destination>>

  • Spec Executor: A Spec Executor is a data processing pipeline (typically a Gobblin application such as Gobblin Standalone Cluster running in Y-Network-Fabric OR Gobblin job running on Azkaban in X-Network-Fabric, etc.) that can process physical Gobblin Jobs. A Spec Executor typically has associated capabilities (ie. supported sources and sinks) which can be used to determine if a Gobblin Job can run on it by the GaaS.
    It can be represented as a triplet <Technology, Location, Communication Mechanism>. In our example above, one Spec Executor is <Gobblin-DB-Ingest, X-Network-Fabric, REST> while another is <Gobblin Standalone Cluster, Y-Network-Fabric, Kafka>. (Spec here is either Flow or Job that can be executed on Executor Instance)

  • Gobblin Job: This can be thought of as all the configuration information required to actually execute a physical flow (or also called as job ) that ingests, manipulates and moves data. In our example above, a DistcpNg job executing on Hadoop-1 that copies data between Hadoop-1 and Hadoop-2 is an example of Gobblin job.
    It can be represented as: Pair <Spec Executor, Job Specification <Job Config, Job Template>>

Summarizing: given these definitions, GaaS takes in a User Request (logical flow) and converts it into a series of Gobblin Jobs and coordinates and monitors the execution of these Gobblin Jobs in a highly distributed manner. It entails several components or modules that are discussed in detail for the rest of this document.

  


Existing Solutions

At present, each Gobblin deployment runs independently and is not controlled and monitored by any central system.

So, Gobblin-as-a-Service intends to be the central orchestration and provisioning system that can manage jobs for all data integration tasks on any of the Gobblin deployment modes.



Design

Gobblin as a Service (GAAS) overall Architecture



The figure above provides a high level architectural view of Gobblin-as-a-Service.

In the diagram above, each of the solid lines represents a unique Gobblin Job. The dotted lines are control flow messages that we will discuss later.

At a high level, Gobblin-as-a-Service exposes an API and UI for users to submit requests. These requests are then translated into well defined Gobblin jobs. The users principally interact with Gobblin-as-a-Service through the Orchestration Service. We discuss the details of each module below.

Service Cluster (Frontend / Backend)


GaaS will itself run in a cluster mode for high availability. This cluster will consist of identical nodes backed by a VIP for load balancing. Each node in the cluster will host a frontend server and a backend server.

  • FE servers: FE or Frontend servers will expose a User Interface to let users manage their requests. It will interact with the  Backend servers via the REST APIs.

  • BE servers: BE or Backend servers are the core of the system and will have several sub modules (discussed in detail further soon). These servers are the actual orchestrators of the user’s desired data flow.  All nodes in the cluster are identical. However, only one of them will schedule the execution of flows. This node will be designated as the master node and its election will be done using Helix/Zookeeper. Other nodes will accept the User Request but will pass it along to the master for scheduling and execution via Helix message. The BE servers will communicate with different Spec Executors using the communication paradigm supported by them. At the very minimum, Kafka based Spec Executors will be supported.

The important sub-components of Backend server are discussed in Service Core Architecture” design section.

Modes (Standalone single instance / cluster)

GaaS can be run and deployed in multiple modes. The dependencies for each can be listed as follows:

  • Single instance mode: FE and BE deployed on same box, no requirement of Zookeeper

  • Cluster mode: FE and BE deployed on multiple boxes, Zookeeper required for master election for scheduler in BE server.

Service Core Architecture (modules running on each machine)


 

As described in the previous section, each node / machine in the cluster will run identical subcomponents.

In this section, we will zoom into a few major modules that Backend server on each machine / node will run. However, a comprehensive listing of all modules and its functionalities can be found in the Appendix under sub-sections:

  • Logical Entities

  • Key Components Overview

  • Key Components Description 

REST APIs

Backend server will expose a minimalistic set of REST APIs to process User Requests. The key functionality that these REST APIs will expose are:

  • Authenticate and authorize an user implicitly

  • Manage (create / delete / update) Logical Data Flow

  • Get Logical Data Flow to Gobblin Jobs mapping

  • Monitor (get history and status of) any Logical Data Flow

  • Monitor (get history and status of) any Gobblin Job

Note: The Topology module will also eventually expose REST APIs for managing topologies, but that is not going to be covered at the moment.

Technology: REST APIs will be based on RestLi and will adhere to it standards.

Manage Gobblin Flow

A user will be able to initiate any CRUD operation on a logical Flow by defining a FlowConfig.

The FlowConfig schema is defined as follows:

FlowConfig schema

{
"type" : "record",
"name" : "FlowConfig",
"namespace": "gobblin.service",
"doc" : "Configuration for a flow",
"fields" : [
  {
    "name" : "flowName",
    "type" : "string",
    "doc" : "Name of the flow"
  },
  {
    "name" : "flowGroup",
    "type" : "string",
    "doc" : "Group of the flow"
  },
  {
    "name" : "schedule",
    "type" : "string",
    "doc" : "Schedule for flow"
  },
   {
     "name" : "runImmediately",
     "type" : "boolean",
     "optional" : true,
     "doc" : "Set to true to request that job be run immediately"
  },
  {
     "name" : "templateUris",
     "type" : "string",
     "doc" : "Comma separated list of URIs for templates used in the flow"
  },
  {
    "name" : "properties",
    "type" : { "type" : "map", "values" : "string" },
    "doc" : "properties for the flow"
  } ]
}

The field flowName contains a flow name generated by the client that should be unique within the flow group specified by flowGroup. <flowGroup, flowName> is the composite key that identifies a FlowConfig. The schedule field contains the flow schedule in the unix cron format. This field can be empty for a run-once job. The templateNames field identifies the templates that the flow properties are overlaid on top of. This field should contain a URI that references the templates in the flow catalog, such as FS:///usecase/templates/template1.template, etc. The properties field is a map of string keys to string values. Most of the Flow specific key-value pairs are specified in properties to keep the specification generic, extensible and in-sync with Job specification within Gobblin.

The FlowConfig schema is translated in a FlowSpec internally. The FlowSpec is a java representation that is used by other components in Gobblin-as-a-Service to represent and process logical Flow. In other sections, we will be discussing in terms of FlowSpec.

Operations related to FlowConfig

Create a flow definition.

POST /flowconfigs

Curli example:

curli http://localhost:8080/flowconfigs -X POST -H 'X-RestLi-Method: create' -H 'X-RestLi-Protocol-Version: 2.0.0' --data '{"flowName" : "myflow1", "flowGroup" : "mygroup", "templateNames" : "FS:///mytemplate.template", "schedule" : "", "properties" : {"prop1" : "value1"}}'

Update a flow definition.

The flowName and flowGroup cannot be changed.

POST /flowconfigs/(flowGroup:<groupName>,flowName:<flowName>)

Curli example:

curli "http://localhost:8080/flowconfigs/(flowGroup:mygroup,flowName:myflow1)" -X PUT -H 'X-RestLi-Method: update' -H 'X-RestLi-Protocol-Version: 2.0.0' --data '{"flowName" : "myflow1", "flowGroup" : "mygroup", "templateName" : "FS:///mytemplate.template", "schedule" : "", "properties" : {"prop1" : "value2"}}'

Delete a flow definition

DELETE /flowcofigs/(flowGroup:<groupName>,flowName:<flowName>)

Curli example:

curli "http://localhost:8080/flowconfigs/(flowGroup:mygroup,flowName:myflow1)" -X DELETE -H 'X-RestLi-Protocol-Version: 2.0.0'

Get flow definition

 GET /GobblinServiceFlows/(flowGroup:<groupName>,flowName:<flowName>)

Curli example:

curli "http://localhost:8080/flowconfigs/(flowGroup:mygroup,flowName:myflow)" -X GET -H 'X-RestLi-Protocol-Version: 2.0.0'

A FlowConfigsResource will service the above REST calls and register a handle with the FlowManager to process any corresponding CRUD operations on the FlowConfigs.

(Implementation detail: Internally, this registration will be in the form of Flow monitor which is listed in the Appendix).

Monitor Gobblin Flow   

The FlowStatus schema defines the resource that represents flow status in Gobblin-as-a-Service. The flowName and flowGroup provided by the client is used to look up the flow status.

FlowStatus schema

{
  "type" : "record",
  "name" : "FlowStatus",
  "namespace": "gobblin.service",
  "doc" : "Status of a flow",
  "fields" : [
    {
      "name" : "flowName",
      "type" : "string",
      "doc" : "Name of the flow"
    },
    {
      "name" : "flowGroup",
      "type" : "string",
      "doc" : "Group of the flow. This defines the namespace for the flow."
    },
    {
      "name" : "executionStartTime",
      "type" : "long",
      "doc" : "Epoch time of when the last execution began"
    },
    {
      "name" : "executionEndTime",
      "type" : "long",
      "doc" : "Epoch time of when the last execution ended"
    },
    {
      "name" : "executionStatus",
      "type" : "string",
      "doc" : "Execution status"
    },
    {
      "name" : "message",
      "type" : "string",
      "doc" : "Error or status message"
    },
    {
      "name" : "jobStatuses",
      "type" : {"type" : "array", "items" : "JobStatus"},
      "doc" : "Status of jobs belonging to the flow"
    }
  ]
}

JobStatus schema

{
 "type" : "record",
 "name" : "JobStatus",
 "namespace": "gobblin.service",
 "doc" : "Status of a job",
 "fields" : [
   {
     "name" : "flowName",
     "type" : "string",
     "doc" : "Name of the flow"
   },
   {
     "name" : "flowGroup",
     "type" : "string",
     "doc" : "Group of the flow. This defines the namespace for the flow."
   },
   {
     "name" : "jobName",
     "type" : "string",
     "doc" : "Name of the job"
   },
   {
     "name" : "jobGroup",
     "type" : "string",
     "doc" : "Group of the job. This defines the namespace for the job."
   },
   {
     "name" : "executionStartTime",
     "type" : "long",
     "doc" : "Epoch time of when the last execution began"
   },
   {
     "name" : "executionEndTime",
     "type" : "long",
     "doc" : "Epoch time of when the last execution ended"
   },
   {
     "name" : "executionStatus",
     "type" : "string",
     "doc" : "Execution status"
   },
   {
     "name" : "message",
     "type" : "string",
     "doc" : "Error or status message"
   },
   {
     "name" : "processedCount",
     "type" : "long",
     "doc" : "Number of records processed in last execution"
   },
   {
     "name" : "lowWatermark",
     "type" : "string",
     "doc" : "Low watermark after last execution"
   },
   {
     "name" : "highWatermark",
     "type" : "string",
     "doc" : "High watermark after last execution"
   }
 ]
}

Operations related to FlowStatus

Get the last known status of a flow

GET /FlowStatuses/(flowGroup:<groupName>,flowName:<flowName>)

Security

This section will cover various security aspects of the Gobblin-as-a-Service.

Authentication / Authorization

GaaS will provide implicit and explicit mechanisms to Authenticate and Authorize users of the service:

  • Authentication: Authentication will be handled via two mechanisms:

  1. RestLI: Implicitly handled by RestLI servlet filters which will determine the user principal by talking to Auth server.

  2. LDAP: Communicate with the LDAP server, and determine the user principal.

  • Authorization: A default authorization mechanism will be provided that can be extended upon to fetch authorization information for an authenticated user. It will also support determining whether a user can proxy as a super user. This will continue to be a pluggable module that will evolve with respective services around it:

  1. ACL Service: Extensible interfaces to authorize via different ACL services.

  2. Config Based ACLs: To start with, we can make use of ACLs hardcoded in config.

Key Encryption

While there are several ways to handle key encryption, we will keep it simple and follow the approach described below:

  • Master Key: Gobblin’s Master Key is expected to be made available to the Service via infrastructure dependent mechanism that is outside the scope of this document since different KMS / security systems have different way to deal with it.

  • Customer Key: Customers of GaaS can request their Key to be encrypted with Gobblin’s Master Key. Thereafter with each request, the customers can pass the job property values encrypted by their key, and their key encrypted by Gobblin’s master key. Gobblin will use its master key to decrypt their key and use their decrypted key to decrypt the job property values.

Note: To avoid saving the customer’s key to Gobblin persisted state, Gobblin will store the master key to an ACL protected shared FS (NFS, HDFS, etc.) location to decrypt the property values during execution time.

Template Pack ACLs

Gobblin-as-a-Service introduces the concept of  Template Packs that builds on Job Templates which are already available in Gobblin. Template Packs are a collection of Job Templates, and will help enhance security in the following ways:

  • Various teams can deploy their own Template Packs and control ACLs for templates. In initial days, templates will contain ACL information within them as a property, but later the support for different authorization mechanisms can be added.

  • Any changes to templates in a Template Pack will be auditable to ensure everything running on GaaS is secure.

Communication Mechanism

Gobblin-as-a-Service will communicate with various SpecExecutors using different communication mechanism to transmit and execute Gobblin Jobs:

  • Kafka: The messages over Kafka will be protected using Kafka security.

  • REST: The communication over REST with SpecExecutor will be over an https channel and GaaS will authenticate with SpecExecutor using its cert.

Flow Manager

FlowManager is a sub-component of Gobblin-as-a-Service that accepts User Requests for managing and processing of logical data Flows and forwards it to the Scheduler as well as persists it via FlowCatalog.

FlowManager will run as a JVM Service, and will host FlowCatalog and expose functionalities to manage flows as discussed below:

  • At startup, the FlowManager will initialize FlowCatalog and register listeners for any logical Flow management requests coming in via REST API.   

  • At shutdown, the FlowManager will ensure graceful shutdown of the FlowCatalog (including waiting for any FlowSpec being modified to be persisted to FlowStore)

  • During normal execution, FlowManager will accept User Requests from REST APIs for creation and manipulation of logical Flows and will persist them to FlowCatalog as well as delegate them to Scheduler to schedule for execution as per requested cron schedule.

Flow Catalog

FlowCatalog will provide a boutique of functionalities around cataloging of the FlowSpecs such as registering and notifying listeners of changes to FlowSpecs. It will also act as the assessor of SpecStore thus managing the persistent layer.

Implementation specific interface for FlowCatalog is listed in the Appendix.

Flow Spec

FlowSpec is the representation of the logical Flow that needs to be processed.

Implementation specific interface for FlowSpec is listed in the Appendix.

Flow Compiler

FlowCompiler is a pluggable module that Orchestrator will use to generate SpecExecutor to JobSpecs mapping from a FlowSpec. Orchestrator will then use this information to execute JobSpecs for a FlowSpec on the selected SpecExecutors.

Implementation specific interface for FlowCompiler is listed in the Appendix.

Flow Store

The FlowStore will be used by the FlowCatalog to persist the FlowSpec to a store (database / file-system), so that the FlowSpecs are durable across failures. After the FlowStore is successful in persisting the FlowSpec, it will be added to FlowCatalog and subsequently executed by Scheduler via Orchestrator.

Topology Manager

TopologyManager module will use a TopologyCatalog to manage the topology and run as a JVM Service. TopologyCatalog will maintain and expose relevant methods to manage the mapping of SpecExecutor to its Capabilities (supported Source and Destinations).

Initially, only Immutable TopologyCatalog module is planned which will load via config at startup time, but provision for mutable TopologyCatalog will be in place with relevant interface(s).

TopologyCatalogListener and TopologyCatalogListenersContainer will be available for Listeners to register and listen to any activity such as addition of Topology or modification of Topology and take relevant actions. In Gobblin-as-a-Service context, Orchestrator will listen to Topology changes and cache them to make use of it while compiling jobs.

Implementation specific interface for TopologyCatalog is listed in the Appendix.

Implementation specific interface for TopologySpec is listed in the Appendix.

Scheduler Module

SchedulerModule will run as a JVM Service and will listen to changes to TopologyCatalog and FlowCatalog and either run jobs immediately (if schedule or topology changes warrant it) or schedule the FlowSpecs for execution at a later time via the Orchestrator.  

Trigger Scenario: SchedulerModule will act as following for various activities:

  1. FlowCatalog activity: If a FlowSpec is added, updated or deleted then it indicates changes to a Flow triggered by the user and Scheduler will invoke Orchestrator immediately for execution if runImmediately is set in case of FlowSpec addition or update OR schedule the FlowSpec for execution at a later time if added / updated but not requested for immediate execution OR un-schedule the FlowSpec if it was deleted.

  2. TopologyCatalog activity: If any TopologySpec is added, updated or deleted then it indicates changes in our data infrastructure and Scheduler will thus try to execute any pending FlowSpec if a new topology becomes available that can execute such pending flows.

The Scheduler module will be run in master mode on one node only in the Gobblin-as-a-Service cluster. Rest of the nodes will run in slave mode and redirect scheduling requests to master via Helix messages. This would be done to ensure that only one node in the service cluster schedules the FlowSpec and coordinates their execution.

In initial version, during failover of Scheduler or redeployment when new Scheduler master is taking over the scheduling responsibilities: the intermittent schedules that become current will be skipped. However, in near future versions we will keep track of what all should have been scheduled and executed and reactively schedule and execute a flow should it get skipped for such reasons.

In future, we can distribute the set of FlowSpecs being executed between all the nodes using a better resource / partitioning recipe in Helix but that will be an overkill at the moment since the scheduler module is fairly lightweight module. The master election will use the standard Helix master-slave state-model and Zookeeper.

Orchestrator Module

OrchestratorModule will run as an on-demand module and will be invoked by SchedulerModule to compile and orchestrate a FlowSpec.

The compilation of FlowSpec into JobSpecs to SpecExecutors mapping will be performed via SpecCompiler by using information about available topologies.

The OrchestratorModule after compilation of JobSpecs will execute them via SpecExecutors.

Runtime guarantees for orchestration via Kafka based SpecExecutor:

  • at-least-once: We will provide at-least-once guarantee because the Orchestrator will ensure that the Kafka message is produced (written and acknowledged) when the schedule is current for a flow. Should all attempt fail for producing to Kafka, relevant alerts will be raised. Each scheduled execution has an execution id associated. So, if the consumer (data pipeline like Gobblin Cluster) that executes the job for a flow ensures that it de-dupes by execution id for a job, we will also ensure exactly-once (if persistence of state and execution launch is transactional on the consumer side). This however has caveats around Scheduler failover in its initial days, and Kafka reliability.

Implementation specific interface for SpecCompiler is listed in the Appendix.

Monitoring

Monitoring effort can be divided into two parts: Internal Monitoring (implemented and supported by Gobblin-as-a-Service) and External Monitoring (external systems and tools that can be provisioned to build more robust monitoring)

Internal Monitoring

The Monitoring module will run as a JVM Service and have the following sub-components:  

  1. Metrics Consumption (initially Kafka based with future possibilities of REST, etc) which will consume GobblinMetricsEvents and dump them into MetricsStore (Database such as MySQL, NoSQL DB or OLAP such as Pinot). This can be a simple Gobblin job that runs as a Gobblin application. The metrics emitted will include tags about the flow execution, debug information (such as link to execution and logs) and execution metrics.

  2. MetricAnalyzer that will analyze these metrics on the fly and aggregate them by Flows.

  3. SLAMonitor (for Flows) that will run a producer which will compute the expected Flow completions (using list of available Flows and their schedules) AND a consumer that takes them out of the queue whenever the MetricAnalyzer sees that an expected Flow execution has happened and is finished. In case the SLA is breached ie. the expected Flow completion has not happened until its timeout or a Flow execution has failed, then SLAMonitor will trigger an alert (such as PagerDuty).

  4. SLAMonitor (for Datasets) will also run as described in #3, but will further filter out based on Datasets for a more granular tracking.

  5. SpecExecutor Health Monitor: We will also run a SpecExecutor health monitor to check for current state of SpecExecutor and notify via alerts if it is unhealthy. Eventually, when we make use of mutable TopologyCatalog, we can also make corresponding topology changes if we have an alternative SpecExecutor that can execute the same JobSpec.

External Monitoring

ELK Dashboards: For various other Gobblin pipelines, we have made use of ELK based dashboards, and given its usefulness and ease of setup it is highly recommended as a good alternative for monitoring the Gobblin-as-a-Service pipelines.

Core Services 

CoreServices module is the application entry point and also the module that starts other modules.

CoreServices module is a simple wrapper that just ensures that other JVM Services are running fine and handles the shutdown notification gracefully when triggered.

Provisioner

Provisioner is a futuristic module, which will also be a JVM Service that will be started by the CoreServices module. It will expose relevant APIs and handle the launch and management of various SpecExecutors (such as Gobblin Standalone Clusters) from within the Gobblin-as-a-Service.

Launch SpecExecutor: When creation of a new SpecExecutor is requested with a verbose SpecExecutorSpec, the Provisioner will instantiate relevant SpecExecutorLauncher to launch and setup the SpecExecutor. Upon launch, the SpecExecutor and its capabilities will be registered in the TopologyCatalog.

This provisioning of SpecExecutor in parts has been worked into Gobblin’s existing code base via gobblin-aws and gobblin-yarn where either whole (in case of gobblin-aws) or partial (in case of gobblin-yarn) clusters are launched via the Gobblin code itself. This needs to be abstracted out and made generic.

Failure / Failover

Node Failure: Since all the nodes in the Gobblin-as-a-Service cluster are identical (except for Scheduler module), failure of any node does not causes a fatal failure. The Service continues to work fine as long as at least one node is up and available (provided that it can sustain the number of requests coming its way).

Scheduler module: If in case the node that fails is master for Scheduling, the remaining nodes will elect a new master and the Scheduler will resume on that node.  

Cluster Failure: If the Gobblin-as-a-Service cluster fails completely, the provisioned Jobs on the respective SpecExecutors will continue to run.

Cluster Restart: If we restart the Gobblin-as-a-Service cluster either after maintenance or a catastrophic cluster failure, the Service will read all the pre-provisioned FlowSpecs from the store (provided the FlowStore did not die as well) and will orchestrate the materialized JobSpecs on all the SpecExecutors. SpecExecutor will mostly ignore this new provisioning of duplicate jobs since they would already be running the same job, thus Gobblin-as-a-Service will rebuild its state without any side effect.

Network Split: In case of a network split of the Gobblin-as-a-Service cluster, the individual nodes can still continue to process user-request without any side-effect.

FlowStore Failure: If the FlowStore fails as well, it will restrict Gobblin-as-a-Service to read-only mode since we will not be able to persist any new FlowSpecs.

Throttling / Quota

Initially, there will not be any throttling or quota management for the Flows created using Gobblin-as-a-Service, however, these can be added on in future. The following changes will be required to support each:

Quota: Quota (eg.: no. of jobs per second, no. of containers, no. of cores, memory limit, etc.) can be associated with the queues and each Flow designated a queue. Only an authorized user would be able to associate a Flow to a queue. Once designated, this information will be broadcasted to all the SpecExecutors and they will redesignate the respective Jobs to these queues. Quota limitations will be applicable on the queue level.

Also, this is noteworthy that while we will have a host of quota criterions, not all of them will be applicable on all types of data pipeline / executors. Initially we will do a ‘best-effort’, however later-on we will provide for options to let operator specific if they want to consider an executor or not if all quota criteria are not enforceable.

Throttling: Throttling will stem based on quota usage, and in a separate design we are working towards building a global throttling system which can be leveraged by Gobblin-as-a-Service once ready.

Frontend UI

Frontend UI will be a basic wrapper of REST APIs discussed above to provide an easy UI based access to various functionalities supported by the Gobblin-as-Service.

Frontend UI will consist of the following main pages:

  • Authentication (LDAP)

  • Job Management (create, schedule, edit, delete)

  • In future: Topology Management (add, edit, delete) [for Admins]

  • Status (job, dataset)

Appendix

Logical Entities

A brief description of logical entities:

  • FlowSpec: Data model representation that describes a Logical Data Flow.  

  • JobSpec: Data model representation that describes a physical Gobblin Job.   

  • TopologySpec: Data model representation that describes a topology ie. a SpecExecutor and its capabilities tuple (SpecExecutor is described later)

  • GobblinMetricsEvents: Standard events that gobblin-metric emits to help track various execution events and metrics.

  • Job Templates: Pre-configured files with properties that can be overridden for various possible physical Gobblin Jobs.

  • Template Packs: Packaged archives that contain Gobblin Job Templates. Each use-case / team can have its own template pack. Template Packs are collection of ACL controlled templates which are available only to the restricted list of principals as whitelisted by template pack owners.

Key Components Overview

A brief overview of key components:

  • SpecExecutor: A SpecExecutor is a data processing pipeline (typically a Gobblin application such as Gobblin Standalone Cluster running in X-Network-Fabric OR Gobblin job running on Azkaban in Y-Network-Fabric, etc.) that can process physical Gobblin Jobs. A SpecExecutor typically has associated capabilities (ie. supported sources and sinks) which can be used to determine if a Gobblin Job can run on it.

  • FlowManager: A sub-component of Gobblin-as-a-Service that accepts an User Request for managing and processing Flow and forwards it to the Scheduler

  • TopologyManager: A sub-component of Gobblin-as-a-Service that discovers about the available topology of applications (typically Gobblin applications such as Gobblin Standalone Cluster running in X-Network-Fabric, Gobblin job running on Azkaban in Y-Network-Fabric) and any changes to the topology.

  • Scheduler: A sub-component of Gobblin-as-a-Service that schedules and executes Flows. At execution time, Scheduler executes the flow via Orchestrator

  • OrcestratorModule: A sub-component of Gobblin-as-a-Service which when invoked by the Scheduler compiles the Flow being executed into physical Gobblin Jobs using FlowCompiler and then orchestrates these Jobs on respective SpecExecutors.

  • FlowCompiler: A module that takes in a logical Flow and compiles it down to physical Gobblin Jobs that can be executed on SpecExecutors. The compilation process takes into account various available SpecExecutors and their processing capabilities (ie. supported source and sinks)

Key Components Description

Implementation oriented description of various key components:

  • SpecExecutor: A technology, location, communication mechanism triplet representative of a data pipeline or system that can process physical Gobblin Jobs. Examples are:  <Gobblin-DB-Ingest, X-Network-Fabric, REST>, <Gobblin Standalone Cluster, Y-Network-Fabric, Kafka>

  • SpecExecutor Capabilities: List of tuples of source and destination that a given SpecExecutor supports. Eg. List<Pair< SFDC, NoSQL DB>, Pair<NoSQL DB, HDFS> >

  • SimpleKafkaSpecExecutor: A SpecExecutor that supports communicating over Kafka.

  • SimpleRESTSpecExecutor: A SpecExecutor that supports communicating over REST.

  • FlowManager:  A sub-component of Gobblin-as-a-Service that runs as a JVM Service in BE and manages FlowCatalog.

  • FlowCatalog: A catalog of all the FlowSpecs that are currently known to Gobblin-as-a-Service.

  • FlowCatalogListener: A hook to listen to any FlowSpec changes such as addition, deletion or update of any FlowSpec.

  • FlowStore: A persistence layer where FlowSpecs are persisted for durability.

  • FlowCompiler: A logical module that takes in a FlowSpec and converts it into JobSpecs that are runnable on a specific SpecExecutor. It is worth noting that users do not need to care about which SpecExecutor their jobs will be resolved to be executed on. GaaS will decide on it implicitly and relocate jobs to a different SpecExecutor should need arise transparently at a later time.

  • TopologyManager: A sub-component of Gobblin-as-a-Service that runs as a JVM Service in BE and hosts TopologyCatalog.

  • TopologyCatalog: A catalog of all the TopologySpecs that are currently known to Gobblin-as-a-Service.

  • TopologyCatalogListener: A hook to listen to any TopologySpec changes such as addition, deletion or update of any TopologySpec.

  • Scheduler: A sub-component of Gobblin-as-a-Service that runs as a JVM Service in BE and is a FlowCatalogListener and TopologyCatalogListener. If any FlowSpec is added, removed or updated or any TopologySpec is added, removed or updated then the Scheuler schedules / unschedules or runs a FlowSpec using OrchestratorModule.

  • OrchestratorModule: A sub-component of Gobblin-as-a-Service that when called by Scheduler to execute a FlowSpec, invokes FlowCompiler to determine executable JobSpecs for a FlowSpec and provisions / runs them via respective SpecExecutors.

  • JobCompiler: A logical module that takes in a JobSpec and creates customized jobs that can be understood by respective SpecExecutor such as SimpleKafkaSpecExecutor and SimpleRESTSpecExecutor using available topologies and templates.  

  • Monitoring Module: A sub-component of Gobblin-as-a-Service that runs as a JVM Service in BE and tracks completion of FlowSpec and related metrics and SLAs.

  • Metrics Consumer Job: A Gobblin Job that consumes metrics from Kafka and writes them to MetricsStore for consumption via Monitoring Module. (We generally emit GobblinMetricsEvents to Kafka from all SpecExecutor)

  • MetricsAnalyzer: A logical module used by Monitoring to analyze consumed metrics and aggregate them by FlowSpec.

  • SLAMonitor: A logical module used by Monitoring to use the analysis performed by MetricsAnalyzer to ensure against SLA violations.

  • MetricsStore: A persistence layer where metrics are persisted for durability (such as NoSQL DB).

  • CoreServices: A sub-component of Gobblin-as-a-Service that is the application entry point for BE and starts other sub-components as JVM Services.

  • Provisioner: A futuristic sub-component of Gobblin-as-a-Service that will run as a JVM Service and provision ie. launch and setup SpecExecutors if required.

  • JobMonitor: A component that is a part of SpecExecutor on the consumer side (Gobblin Standalone Cluster / Azkaban) that listens for JobSpec being shipped by the Orchestrator module in Gobblin-as-a-Service.

  • KafkaJobMonitor: A Kafka JobMonitor that listens to Kafka for JobSpecs.

Key Interfaces

GobblinRestFlowMonitor

A representative class for GobblinRestFlowMonitor:

public class GobblinRestFlowMonitor implements FlowSpecMonitor {
  private final MutableFlowCatalog flowCatalog;
  GobblinRestFlowMonitor(MutableFlowCatalog flowCatalog);
  void createFlow(URI flowUri, String templateName, String schedule, Map<String, String> properties);
  void deleteFlow(URI flowUri);
  Map<String, String> getFlow(URI flowURI);
  void updateFlow(URI flowURI, String templateName, String schedule, Map<String, String> properties);
}

FlowCatalog

FlowCatalog will be an extension of new interface called SpecCatalog in Gobblin. The said interface will look like:

public interface SpecCatalog extends SpecCatalogListenersContainer,   Instrumentable {
 /** Returns an immutable {@link Collection} of {@link Spec}s that are known to the catalog. */
 Collection<Spec> getSpecs();
 
 /** Metrics for the spec catalog; null if
  * ({@link #isInstrumentationEnabled()}) is false. */
 SpecCatalog.StandardMetrics getMetrics();
 
 /**
  * Get a {@link Spec} by uri.
  * @throws SpecNotFoundException if no such Spec exists
  **/
 Spec getSpec(URI uri) throws SpecNotFoundException;
}

In general Gobblin follows a pattern to apply mutability as a behavior inherited via a mutable interface. So, following a similar model we will make FlowCatalog implement MutableSpecCatalog. The said interface will look like:

public interface MutableSpecCatalog extends SpecCatalog {
 /**
  * Registers a new Spec. If a Spec with the same {@link Spec#getUri()} exists,
  * it will be replaced.
  * */
 public void put(Spec flowSpec);
 
 /**
  * Removes an existing Spec with the given URI. A no-op if such Spec does not exist.
  */
 void remove(URI uri);
}

The FlowCatalog would thus look like (representative key fields):  

public class FlowCatalog extends AbstractIdleService implements MutableSpecCatalog {
 
 protected final SpecCatalogListenersList listeners;
 protected final Logger log;
 protected final MetricContext metricContext;
 protected final FlowCatalog.StandardMetrics metrics;
 protected final SpecStore specStore;
..
}

User Request Delegation: Any request coming in for create, delete, or update of any logical Flow will thus be delegated to put(), getSpec() and remove() methods of the above FlowCatalog 

FlowCatalogListener and FlowCatalogListenersContainer will be available for Listeners to register and listen to any activity such as addition of FlowSpecs or modification of FlowSpecs in FlowCatalog and take relevant actions. In Gobblin-as-a-Service context, Scheduler will listen to FlowSpec changes and schedule / re-schedule jobs.

FlowSpec

FlowSpec will be an implementation of the existing Spec interface. The Spec interface looks like:

public interface Spec extends Serializable {
 URI getUri();
 String getVersion();
 String getDescription();
}

The FlowSpec will look like:

public class FlowSpec implements Configurable, Spec {
 /** An URI identifying the flow. */
 final URI uri;
 
 /** The implementation-defined version of this spec. */
 final String version;
 
 /** Human-readable description of the flow / job spec */
 final String description;
 
 /** Flow config as a typesafe config object*/
 final Config config;
 
 /** Flow config as a properties collection for backwards compatibility */
 // Note that this property is not strictly necessary as it can be generated from the typesafe
 // config. We use it as a cache until typesafe config is more widely adopted in Gobblin.
 final Properties configAsProperties;
 
 /** URI of {@link gobblin.runtime.api.JobTemplate}s to use. */
 final Optional<Set<URI>> templateURIs;
 
 /** Child {@link Spec}s to this {@link FlowSpec} **/
 // Note that a FlowSpec can be materialized into multiple FlowSpec or JobSpec hierarchy
 final Optional<List<Spec>> childSpecs;
}

This representation gives us the ability to achieve a few things with little surgery:

  1. Enable a FlowSpec to materialize into a JobSpec since JobSpec is also a Spec. Thus if today A -> C is a Flow that needs to be materialized as A -> B and B -> C Jobs to be run on systems X1 and X2, tomorrow if we add a new system X3 that can handle data movement between A -> C, the same Flow can simply be used as a Job instead

  2. Flow can materialize into more Flows, which can then eventually materialize into Jobs

  3. Help Gobblin-as-a-Service plug and play in existing deployments of Gobblin

FlowCompiler

The interface for FlowCompiler will look like:

public interface SpecCompiler {
 /***
  * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
  * and the mapping to {@link SpecExecutor} that they can be run on.
  * @param spec {@link Spec} to compile.
  * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
  */
 Map<Spec, SpecExecutorProducer> compileFlow(Spec spec);
 
 /***
  * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler}
  * is aware about.
  * @return Map of {@link Spec} URI and {@link TopologySpec}
  */
 Map<URI, TopologySpec> getTopologySpecMap();
}

TopologyCatalog

Basic TopologyCatalog which is a MutableSpecCatalog (that was discussed earlier with FlowCatalog) would look like (representative key fields):

public class TopologyCatalog extends AbstractIdleService implements MutableSpecCatalog {
 protected final SpecCatalogListenersList listeners;
 protected final Logger log;
 protected final MetricContext metricContext;
 protected final TopologyCatalog.StandardMetrics metrics;
 protected final SpecStore specStore;
..
}

TopologySpec

The interface for TopologySpec will look like:  

public class TopologySpec implements Configurable, Spec {
 
 /** An URI identifying the topology. */
 final URI uri;
 
 /** Human-readable description of the topology spec */
 final String description;
 
 /** Other config associated with the Topology as a typesafe config object*/
 final Config config;
 
 /** Other config associated with the Topology as a properties collection for backwards compatibility */
 // Note that this property is not strictly necessary as it can be generated from the typesafe
 // config. We use it as a cache until typesafe config is more widely adopted in Gobblin.
 final Properties configAsProperties;
 
 /** {@link SpecExecutor} that will execute the flows / jobs submitted via this topology. */
 final SpecExecutor specExecutor;
}

SpecCompiler

The interface for SpecCompiler will look like:

public interface SpecCompiler {
 /***
  * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s
  * and the mapping to {@link SpecExecutor} that they can be run on.
  * @param spec {@link Spec} to compile.
  * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}.
  */
 Map<Spec, SpecExecutor> compileFlow(Spec spec);
 
 /***
  * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler}
  * is aware about.
  * @return Map of {@link Spec} URI and {@link TopologySpec}
  */
 Map<URI, TopologySpec> getTopologySpecMap();
}

SpecExecutor

The interface for the SpecExecutor will look like:

public interface SpecExecutor<V> {
 /** An URI identifying the SpecExecutor. */
 URI getUri();
 
 /** Human-readable description of the SpecExecutor .*/
 Future<String> getDescription();
 
 /** SpecExecutor config as a typesafe config object. */
 Future<Config> getConfig();
 
 /** Health of SpecExecutor. */
 Future<String> getHealth();
 
 /** Source : Destination processing capabilities of SpecExecutor. */
 Future<? extends Map<String, String>> getCapabilities();
 
 /** Add a serialized {@link JobSpec} for execution on 
  * {@link SpecExecutor}. */
 Future<?> addJob(V addedJob);
 
 /** Update a serialized {@link JobSpec} being executed on 
  * {@link SpecExecutor}. */
 Future<?> updateJob(V updatedJob);
 
 /** Delete a {@link JobSpec} being executed on 
  * {@link SpecExecutor}. */
 Future<?> deleteJob(URI deletedJobURI);
 
 /** List all {@link JobSpec} being executed on 
  * {@link SpecExecutor}. */
 Future<? extends List<V>> listJobs();
}


  • No labels