Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note:

...

these

...

instructions

...

are

...

currently

...

for

...

the

...

S4-22

...

branch

Installation

There is currently no real distribution package. So you need to download the source and build the platform.

  1. Download from Apache git repository. Follow instructions here. Checkout branch S4-22
  2. Compile and install S4 in the local maven repository: (you can also let the tests run, which is currently quite long)
    Code Block
    
    S4:incubator-s4$ ./gradlew install -DskipTests
    .... verbose logs ...
    

...

  1. Build the startup scripts: 
    Code Block
    
    S4:incubator-s4$ ./gradlew s4-tools:installApp
    .... verbose logs 
    ...:s4-tools:installApp
    

...

Create a new application

S4 provides some scripts in order to simplify development and testing of applications. Let's see how to create a new template project and start a sample application.

Create a new project

  1. Create a new application template (here, we create it in the /tmp directory): 
    Code Block
    
    S4:incubator-s4$ ./s4 newApp myApp -parentDir=/tmp
    
    ... some instructions on how to start ...
    

...

  1. This creates a sample application in the specified directory, with the following structure:
    Code Block
    
    build.gradle  --> the template build file, that you'll need to customize
         gradlew --> references the gradlew script from the S4 installation
              s4 --> references the s4 script from the S4 installation, and adds an "adapter" task
            src/ --> sources (maven-like structure)
    

...

Have a look at the sample project content

The src/main/java/hello

...

directory

...

contains

...

3

...

files:

...

 

  • HelloPE.java

...

  • :

...

  • a

...

  • very

...

  • simple

...

  • PE

...

  • that

...

  • simply

...

  • prints

...

  • the

...

  • name

...

  • contained

...

  • in

...

  • incoming

...

  • events

...

  • Code Block

...

  • 
    // ProcessingElement provides integration with the S4 platform
    public class HelloPE extends ProcessingElement {
    
        // you should define downstream streams here and inject them in the app definition
    
        // PEs can maintain some state
        boolean seen = false;
    
        // This method is called upon a new Event on an incoming stream. 
        // You may overload it for handling instances of your own specialized subclasses of Event
        public void onEvent(Event event) {
            System.out.println("Hello " + (seen ? "again " : "") + event.get("name") + "!");
            seen = true;
        }
    // skipped remaining methods
    

...

  • HelloApp.java:

...

  • defines

...

  • a

...

  • simple

...

  • application:

...

  • exposes

...

  • an

...

  • input

...

  • stream

...

  • ("names"),

...

  • connected

...

  • to

...

  • the

...

  • HelloPE

...

  • Code Block

...

  • 
    // App parent class provides integration with the S4 platform
    public class HelloApp extends App {
    
        @Override
        protected void onStart() {
        }
    
        @Override
        protected void onInit() {
            // That's where we define PEs and streams
            // create a prototype
            HelloPE helloPE = createPE(HelloPE.class);
            // Create a stream that listens to the "lines" stream and passes events to the helloPE instance.
            createInputStream("names", new KeyFinder<Event>() {
                    // the KeyFinder is used to identify keys
                @Override
                public List<String> get(Event event) {
                    return Arrays.asList(new String[] { event.get("name") });
                }
            }, helloPE);
        }
    

...

  • // skipped remaining 

...

  • methods
    
  • HelloInputAdapter is a simple adapter that reads character lines from a socket, converts them into events, and sends the events to interested S4 apps, through the "names" stream

Run the sample project

In order to run an S4 application, you need :

  • to set-up a cluster: provision a cluster and start S4 nodes for that cluster
  • to package the app
  • to publish the app on the cluster
  1. Start a Zookeeper server instance (log4j warnings come from Zookeeper and can be ignored here):
    Code Block
     S4:incubator-s4$ ./s4 zkServer 

...

  1. 
    

...

  1. S4:myApp$ calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    [main] INFO  org.apache.s4.tools.ZKServer - Starting zookeeper server on port [2181]
    [main] INFO  org.apache.s4.tools.ZKServer - cleaning existing data in [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/data] and [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/log]
    log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkServer).
    log4j:WARN Please initialize the log4j system properly.

...

  1. Define a new cluster.

...

  1. Say

...

  1. a

...

  1. cluster

...

  1. named

...

  1. "cluster1"

...

  1. with

...

  1. 2

...

  1. partitions,

...

  1. nodes

...

  1. listening

...

  1. to

...

  1. ports

...

  1. starting

...

  1. from

...

  1. 12000:

...

  1. Code Block

...

  1. S4:myApp$ ./s4 newCluster -c=cluster1 -nbTasks=2 -flp=12000
    calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    [main] INFO  org.apache.s4.tools.DefineCluster - preparing new cluster [cluster1] with [2] node(s)
    [main] INFO  org.apache.s4.tools.DefineCluster - New cluster configuration uploaded into zookeeper
    

...

  1. Start 2 S4 nodes with the default configuration, and attach them to cluster "cluster1":

...

  1. Code Block

...

  1. S4:myApp$ ./s4 node -c=cluster1
    calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    15:50:18.996 [main] INFO  org.apache.s4.core.Main - Initializing S4 node with : 
    - comm module class [org.apache.s4.comm.DefaultCommModule]
    - comm configuration file [default.s4.comm.properties from classpath]
    - core module class [org.apache.s4.core.DefaultCoreModule]
    - core configuration file[default.s4.core.properties from classpath]
    -extra modules: []
    [main] INFO  org.apache.s4.core.Main - Starting S4 node. This node will automatically download applications published for the cluster it belongs to
    

...

  1. Build,

...

  1. package

...

  1. and

...

  1. publish

...

  1. the

...

  1. app

...

  1. to

...

  1. cluster1

...

  1. (currently,

...

  1. you

...

  1. must

...

  1. use

...

  1. the

...

  1. name

...

  1. of

...

  1. the

...

  1. current

...

  1. project,

...

  1. and

...

  1. you

...

  1. need

...

  1. to

...

  1. specify

...

  1. the

...

  1. gradle

...

  1. build

...

  1. file

...

  1. with

...

  1. a

...

  1. complete

...

  1. path}:

...

  1. Code Block

...

  1. S4:myApp$ ./s4 deploy -appName=myApp -c=cluster1 -b=`pwd`/build.gradle
    .... verbose logs for compiling, building the package, and publishing it to Zookeeper...
    15:46:16.486 [main] INFO  org.apache.s4.tools.Deploy - uploaded application [myApp] to cluster [cluster1], using zookeeper znode [/s4/clusters/cluster1/apps/myApp]
    

...

  1. S4 nodes will detect the new application, download it, load it and start it. You will get something like:
    Code Block
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.d.DistributedDeploymentManager - Detected new application(s) to deploy {}[myApp]
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - Local app deployment: using s4r file name [myApp] as application name
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - App class name is: hello.HelloApp
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]} from null
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Adding topology change listener:org.apache.s4.comm.tcp.TCPEmitter@79b2591c
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.comm.topology.AssignmentFromZK - New session:87684175268872203; state is : SyncConnected
    [ZkClient-EventThread-19-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]} from { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]}
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.comm.topology.AssignmentFromZK - Successfully acquired task:Task-1 by myMachine.myNetwork
    [ZkClient-EventThread-19-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]}
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872205
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - Detected new stream [names]
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872206
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from null
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - Loaded application from file /tmp/deploy-test/cluster1/myApp.s4r
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.d.DistributedDeploymentManager - Successfully installed application myApp
    [ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s.c.g.OverloadDispatcherGenerator - Dumping generated overload dispatcher class for PE of class [class hello.HelloPE]
    [ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s4.comm.topology.ClustersFromZK - Adding input stream [names] for app [-1] in cluster [cluster1]
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.App - Init prototype [hello.HelloPE].
    

...

Great,

...

this

...

application

...

is

...

deployed

...

on

...

2

...

S4

...

nodes.

...

It

...

needs

...

some

...

input

...

though...

...

We

...

can

...

get

...

input

...

through

...

an

...

adapter,

...

i.e.

...

an

...

S4

...

app

...

that

...

converts

...

an

...

external

...

stream

...

into

...

S4

...

events,

...

and

...

injects

...

the

...

events

...

into

...

S4

...

clusters.

...

In

...

the

...

sample

...

application,

...

the

...

adapter

...

is

...

a

...

very

...

basic

...

class,

...

that

...

extends

...

App,

...

listens

...

to

...

an

...

input

...

socket,

...

and

...

converts

...

each

...

received

...

line

...

of

...

characters

...

into

...

a

...

generic

...

S4

...

event,

...

in

...

which

...

the

...

line

...

data

...

is

...

kept

...

in

...

a

...

"name"

...

field.

...

We

...

specify

...

:

...

    • the

...

    • adapter

...

    • class

...

    • the

...

    • name

...

    • of

...

    • the

...

    • output

...

    • stream

...

    • the

...

    • cluster

...

    • where

...

    • to

...

    • deploy

...

    • this

...

    • app

...

For

...

easy

...

testing,

...

we

...

provide

...

a

...

facility

...

to

...

start

...

a

...

node

...

with

...

an

...

adapter

...

app

...

without

...

having

...

to

...

package

...

and

...

deploy

...

the

...

adapter

...

app.

...

  1. First,

...

  1. we

...

  1. need

...

  1. to

...

  1. define

...

  1. a

...

  1. new

...

  1. S4

...

  1. subcluster

...

  1. for

...

  1. that

...

  1. app:

...

  1. Code Block

...

  1. S4:myApp$ ./s4 newCluster -c=cluster2 -nbTasks=1 -flp=13000

...

  1. Then we can start the adapter, and we use "names"

...

  1. for

...

  1. identifying

...

  1. the

...

  1. output

...

  1. stream

...

  1. (this

...

  1. is

...

  1. the

...

  1. same

...

  1. name

...

  1. used

...

  1. as

...

  1. input

...

  1. by

...

  1. the

...

  1. myApp

...

  1. app)

...

  1. Code Block

...

  1. ./s4 adapter -appClass=hello.HelloInputAdapter -c=cluster2 -namedStringParameters=adapter.output.stream:names

...

  1. Now let's

...

  1. just

...

  1. provide

...

  1. some

...

  1. data

...

  1. to

...

  1. the

...

  1. external

...

  1. stream:

...

  1. Code Block

...

  1. S4:~$ echo "Bob" | nc localhost 15000

...

  1. Nodes should output in their console:
    Code Block
    Hello Bob!

...