An initial goal was to have a concise mechanism to allow users to write simple configurations or macros that would expand to a specification of low level policy decorators and sinks. It needed to support recursive chaining and tree-like fanout (for failover and fanout handlers). A declarative language with manipulable abstract syntax tree (AST) mapped well to these requirements and was chosen as the data structure to manage this and where transformations and macros would be handled. The generated AST's are used to instantiate all of the Sources and potentially long chains of Sinks.

The parse is broken into a few phases – lexing, parsing to AST, translating the AST, and then instantiation to a flume source or sink chain. While some prefer the concise syntax a more explicit syntax could be created as long as the generated AST is the same.

The core classes responsible for this include:

  • FlumeBuilder (responsible for calling antlr generated code, and ast instantiation)
  • FlumeSpecGen (responsible for converting ast's back to flume config language)
  • FlumePatterns (ast tree-pattern matching library useful for ast transformations)
  • SinkFactory / SinkFactoryImpl (calls the parsing functions to instantiate sinks and decos from strings. Main extension point for new sinks and decos)
  • SoureFactory / SourceFactoryImpl (calls the parsing functions to instantiate sources from strings. Main extension point for new sinks and decos)

Here is the core antlr3-specific parser grammar for the v0.9.4 language. The main difference from standard grammars is the -> ^(TOKEN arg1 arg2) constructs. These are antlr specific rewrite mechanisms that allow us to restructure concrete syntax trees to abstract syntax trees as the parse occurs. The whole file lives in ./flume-core/src/main/antlr3/com/cloudera/flume/conf/FlumeDeploy.g

def     :       host ':' source '|' sink  ';' -> ^(NODE host source sink);

host: Identifier | IPLiteral ;

// This is currently a place holder for nodes that have multiple
// independent source sink pairs.
connection
        :        source '|' sink -> ^(NODE BLANK source sink);

source          :       singleSource            -> singleSource ;
sourceEof       :       source EOF              -> source;
singleSource    :       Identifier args?        -> ^(SOURCE Identifier args?);
multiSource     :       singleSource (',' singleSource)* -> singleSource+ ;

        
sink            :       simpleSink -> simpleSink;

singleSink      :       Identifier args?        -> ^(SINK Identifier args?);

sinkEof         :       simpleSink EOF;

simpleSink      :       '[' multiSink ']'       -> ^(MULTI multiSink) 
        |   singleSink simpleSink?  -> ^(DECO singleSink simpleSink?) 
                |       '{' decoratedSink '}'   -> ^(DECO decoratedSink)
                |       '<' failoverSink '>'    -> ^(BACKUP failoverSink)
        |   rollSink                -> rollSink
        |   genCollectorSink        -> genCollectorSink
                ; 
                        

decoratedSink   :  singleSink '=>' sink                 -> singleSink sink;
multiSink       :  simpleSink (',' simpleSink)*         -> simpleSink* ;
failoverSink    :  simpleSink ('?' simpleSink)+ -> simpleSink+; 
rollSink        :  'roll' args '{' simpleSink '}'
                                  -> ^(ROLL simpleSink args);
genCollectorSink       :  'collector' args '{' simpleSink '}'
                                  -> ^(GEN 'collector' simpleSink args?);

function: Identifier args? -> ^(FUNC Identifier args?);

arg     : literal
        | function
        ;

args    : '(' ( arglist (',' kwarglist)?  ) ')' -> arglist kwarglist?
        | '(' kwarglist ')' -> kwarglist?
        | '(' ')' ->
        ;

arglist :       arg (',' arg)* -> arg+ ;

kwarglist : kwarg (',' kwarg)* -> kwarg+;

kwarg   :   Identifier '=' arg -> ^(KWARG Identifier arg)  ;

// Basic Java-style literals  (taken from Java grammar)
literal
    :   integerLiteral
    |   StringLiteral           -> ^(STRING StringLiteral)
    |   booleanLiteral
    |   FloatingPointLiteral    -> ^(FLOAT FloatingPointLiteral)
    ;
integerLiteral
    :   HexLiteral      -> ^(HEX HexLiteral)
    |   OctalLiteral    -> ^(OCT OctalLiteral)
    |   DecimalLiteral  -> ^(DEC DecimalLiteral)    
    ;

booleanLiteral
    :   'true'          -> ^(BOOL 'true')
    |   'false'         -> ^(BOOL 'false')
    ;

There are current a few points of "hackery" in the BNF.

  • all sinks are wrapped with a DECO token ast node – thus we always get (DECO (SINK foo )) instead of just (SINK foo)
  • The special collector construct and roll construct should probably consolidated
  • No labels

1 Comment

  1. It looks so complecated,maybe common users like me can't understand it easily.