...
Note:
...
these
...
instructions
...
are
...
currently
...
for
...
the
...
S4-22
...
branch
Instructions now valid for the piper branch!
Install S4
There is currently no distribution package as such. So you need to download the source and build the platform.
- Download from Apache git repository. Follow instructions here. Checkout branch piper
- Compile and install S4 in the local maven repository: (you can also let the tests run, which is currently quite long: we're not yet using mocks)
Code Block S4:incubator-s4$ ./gradlew install -DskipTests .... verbose logs ...
...
- Build the startup scripts:
Code Block S4:incubator-s4$ ./gradlew s4-tools:installApp .... verbose logs ...:s4-tools:installApp
If you work with NFS, you may get some issues for building the apps. The ./s4 deploy
command currently may not work properly (depending on the file locking settings of your cluster). However you can still build applications, deploy them (as s4r, see in a further section) and run them, but you may have to tell the build tool (gradle) to use the local file system for caches and repositories, by appending the following options when using gradle commands:
Code Block |
---|
{code} (!) If you work with NFS, you may get some issues for _building_ the apps. The {{./s4 deploy}} command currently may not work properly (depending on the file locking settings of your cluster). However you can still build applications, deploy them (as s4r, see in a further section) and run them, but you may have to tell the build tool (gradle) to use the local file system for caches and repositories, by appending the following options when using gradle commands: {code} -g /<local-dir>/.gradle/ --project-cache-dir /<local-dir>/s4 {code} ---- h1. Start a new application S4 provides some scripts in order to simplify development and testing of applications. Let's see how to create a new project and start a sample application. h2. Create a new project # Create a new application template (here, we create it in the /tmp directory): {code} |
...
Start a new application
S4 provides some scripts in order to simplify development and testing of applications. Let's see how to create a new project and start a sample application.
Create a new project
- Create a new application template (here, we create it in the /tmp directory):
Code Block S4:incubator-s4$ ./s4 newApp myApp -parentDir=/tmp ... some instructions on how to start ...
...
- This creates a sample application in the specified directory, with the following structure:
Code Block build.gradle --> the template build file, that you'll need to customize gradlew --> references the gradlew script from the S4 installation s4 --> references the s4 script from the S4 installation, and adds an "adapter" task src/ --> sources (maven-like structure)
...
Have a look at the sample project content
The src/main/java/hello
...
directory
...
contains
...
3
...
files:
...
- HelloPE.java
...
- :
...
- a
...
- very
...
- simple
...
- PE
...
- that
...
- simply
...
- prints
...
- the
...
- name
...
- contained
...
- in
...
- incoming
...
- events
...
Code Block
...
// ProcessingElement provides integration with the S4 platform public class HelloPE extends ProcessingElement { // you should define downstream streams here and inject them in the app definition // PEs can maintain some state boolean seen = false; // This method is called upon a new Event on an incoming stream. // You may overload it for handling instances of your own specialized subclasses of Event public void onEvent(Event event) { System.out.println("Hello " + (seen ? "again " : "") + event.get("name") + "!"); seen = true; } // skipped remaining methods
...
- HelloApp.java:
...
- defines
...
- a
...
- simple
...
- application:
...
- exposes
...
- an
...
- input
...
- stream
...
- ("names"),
...
- connected
...
- to
...
- the
...
- HelloPE
...
Code Block
...
// App parent class provides integration with the S4 platform public class HelloApp extends App { @Override protected void onStart() { } @Override protected void onInit() { // That's where we define PEs and streams // create a prototype HelloPE helloPE = createPE(HelloPE.class); // Create a stream that listens to the "lines" stream and passes events to the helloPE instance. createInputStream("names", new KeyFinder<Event>() { // the KeyFinder is used to identify keys @Override public List<String> get(Event event) { return Arrays.asList(new String[] { event.get("name") }); } }, helloPE); } // skipped remaining methods
...
- HelloInputAdapter is a simple adapter that reads character lines from a socket, converts them into events, and sends the events to interested S4 apps, through the "names" stream
Run the sample app
In order to run an S4 application, you need :
- to set-up a cluster: provision a cluster and start S4 nodes for that cluster
- to package the app
- to publish the app on the cluster
- Start a Zookeeper server instance (log4j warnings come from Zookeeper and can be ignored here):
Code Block S4:incubator-s4$ ./s4 zkServer S4:myApp$ calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4 [main] INFO org.apache.s4.tools.ZKServer - Starting zookeeper server on port [2181] [main] INFO org.apache.s4.tools.ZKServer - cleaning existing data in [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/data] and [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/log]
...
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkServer). log4j:WARN Please initialize the log4j system properly.
- Define a new cluster. Say a cluster named "cluster1" with 2 partitions, nodes listening to ports starting from 12000:
Code Block S4:myApp$ ./s4 newCluster -c=cluster1 -nbTasks=2 -flp=12000 calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4 [main] INFO org.apache.s4.tools.DefineCluster - preparing new cluster [cluster1] with [2] node(s) [main] INFO org.apache.s4.tools.DefineCluster - New cluster configuration uploaded into zookeeper
...
- Start 2 S4 nodes with the default configuration, and attach them to cluster "cluster1" (you may start the other S4 node in a different console):
Code Block S4:myApp$ ./s4 node -c=cluster1 calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4 15:50:18.996 [main] INFO org.apache.s4.core.Main - Initializing S4 node with : - comm module class [org.apache.s4.comm.DefaultCommModule] - comm configuration file [default.s4.comm.properties from classpath] - core module class [org.apache.s4.core.DefaultCoreModule] - core configuration file[default.s4.core.properties from classpath] -extra modules: [] [main] INFO org.apache.s4.core.Main - Starting S4 node. This node will automatically download applications published for the cluster it belongs to
...
- Build,
...
- package
...
- and
...
- publish
...
- the
...
- app
...
- to
...
- cluster1:
...
- You
...
- may
...
- do
...
- that
...
- in
...
- a
...
- single
...
- step
...
- (currently,
...
- you
...
- must
...
- use
...
- the
...
- name
...
- of
...
- the
...
- current
...
- project,
...
- and
...
- you
...
- need
...
- to
...
- specify
...
- the
...
- gradle
...
- build
...
- file
...
- with
...
- a
...
- complete
...
- path).
...
Note
...
- that
...
- specifying
...
- the
...
- app
...
- class
...
- is
...
- optional
...
- but
...
- avoids
...
- issues
...
- when
...
- the
...
- scripts
...
- tries
...
- to
...
- guess
...
- automatically
...
- the
...
- app
...
- class:
...
Code Block
...
S4:myApp$ ./s4 deploy -appName=myApp -c=cluster1 -b=`pwd`/build.gradle -a=hello.HelloApp .... verbose logs for compiling, building the package, and publishing it to Zookeeper... 15:46:16.486 [main] INFO org.apache.s4.tools.Deploy - uploaded application [myApp] to cluster [cluster1], using zookeeper znode [/s4/clusters/cluster1/apps/myApp]
...
- You may also do that in 2 separate steps:
- Create an s4r archive. The following creates an archive named myApp.s4r (here you may specify an arbitrary name) in build/libs.
Again specifying the app class is optional :Code Block ./s4 s4r -a=hello.HelloApp -b=`pwd`/build.gradle myApp
- Create an s4r archive. The following creates an archive named myApp.s4r (here you may specify an arbitrary name) in build/libs.
...
- Publish the s4r archive (you may first copy it to a more adequate place). The name of the app is arbitrary:
Code Block ./s4 deploy -s4r=`pwd`/build/libs/myApp.s4r -c=cluster1 -appName=myApp
...
- S4 nodes will detect the new application, download it, load it and start it. You will get something like:
Code Block [ZkClient-EventThread-15-localhost:2181] INFO o.a.s.d.DistributedDeploymentManager - Detected new application(s) to deploy {}[myApp] [ZkClient-EventThread-15-localhost:2181] INFO org.apache.s4.core.Server - Local app deployment: using s4r file name [myApp] as application name [ZkClient-EventThread-15-localhost:2181] INFO org.apache.s4.core.Server - App class name is: hello.HelloApp [ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]} from null [ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Adding topology change listener:org.apache.s4.comm.tcp.TCPEmitter@79b2591c [ZkClient-EventThread-15-localhost:2181] INFO o.a.s.comm.topology.AssignmentFromZK - New session:87684175268872203; state is : SyncConnected [ZkClient-EventThread-19-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]} from { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]} [ZkClient-EventThread-15-localhost:2181] INFO o.a.s.comm.topology.AssignmentFromZK - Successfully acquired task:Task-1 by myMachine.myNetwork [ZkClient-EventThread-19-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]} [ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872205 [ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClustersFromZK - Detected new stream [names] [ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872206 [ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from null [ZkClient-EventThread-15-localhost:2181] INFO org.apache.s4.core.Server - Loaded application from file /tmp/deploy-test/cluster1/myApp.s4r [ZkClient-EventThread-15-localhost:2181] INFO o.a.s.d.DistributedDeploymentManager - Successfully installed application myApp [ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s.c.g.OverloadDispatcherGenerator - Dumping generated overload dispatcher class for PE of class [class hello.HelloPE] [ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s4.comm.topology.ClustersFromZK - Adding input stream [names] for app [-1] in cluster [cluster1] [ZkClient-EventThread-15-localhost:2181] INFO org.apache.s4.core.App - Init prototype [hello.HelloPE].
...
Great
...
!
...
The
...
application
...
is
...
now
...
deployed
...
on
...
2
...
S4
...
nodes.
...
It
...
needs
...
some
...
input
...
though...
...
We
...
can
...
get
...
input
...
through
...
an
...
adapter,
...
i.e.
...
an
...
S4
...
app
...
that
...
converts
...
an
...
external
...
stream
...
into
...
S4
...
events,
...
and
...
injects
...
the
...
events
...
into
...
S4
...
clusters.
...
In
...
the
...
sample
...
application,
...
the
...
adapter
...
is
...
a
...
very
...
basic
...
class,
...
that
...
extends
...
App,
...
listens
...
to
...
an
...
input
...
socket,
...
and
...
converts
...
each
...
received
...
line
...
of
...
characters
...
into
...
a
...
generic
...
S4
...
event,
...
in
...
which
...
the
...
line
...
data
...
is
...
kept
...
in
...
a
...
"name"
...
field.
...
We
...
specify
...
:
...
- the
...
- adapter
...
- class
...
- the
...
- name
...
- of
...
- the
...
- output
...
- stream
...
- the
...
- cluster
...
- where
...
- to
...
- deploy
...
- this
...
- app
...
For
...
easy
...
testing,
...
we
...
provide
...
a
...
facility
...
to
...
start
...
a
...
node
...
with
...
an
...
adapter
...
app
...
without
...
having
...
to
...
package
...
and
...
deploy
...
the
...
adapter
...
app.
...
- First,
...
- we
...
- need
...
- to
...
- define
...
- a
...
- new
...
- S4
...
- subcluster
...
- for
...
- that
...
- app:
...
Code Block
...
S4:myApp$ ./s4 newCluster -c=cluster2 -nbTasks=1 -flp=13000
...
- Then we can start the adapter, and we use "names"
...
- for
...
- identifying
...
- the
...
- output
...
- stream
...
- (this
...
- is
...
- the
...
- same
...
- name
...
- used
...
- as
...
- input
...
- by
...
- the
...
- myApp
...
- app)
...
Code Block
...
./s4 adapter -appClass=hello.HelloInputAdapter -c=cluster2 -namedStringParameters=adapter.output.stream:names
...
- Now let's
...
- just
...
- provide
...
- some
...
- data
...
- to
...
- the
...
- external
...
- stream:
...
Code Block
...
S4:~$ echo "Bob" | nc localhost 15000
...
- One of the nodes should output in its console:
Code Block Hello Bob!
If you keep sending messages, nodes will alternatively display the "hello" messages because the adapter app sends keyless events on the "names" stream in a round-robin fashion by default.
What happened?
The following figures illustrate the various steps we have taken. The local file system is used as the S4 application repository in our example.
...
Run the Twitter trending example
Let's have a look at another application, that computes trendy Twitter topics by listening to the spritzer stream from the Twitter API. This application was adapted from a previous example in S4 0.3.
Overview
This application is divided into:
- twitter-counter , in test-apps/twitter-counter/
...
- :
...
- extracts
...
- topics
...
- from
...
- tweets
...
- and
...
- maintains
...
- a
...
- count
...
- of
...
- the
...
- most
...
- popular
...
- ones,
...
- periodically
...
- dumped
...
- to
...
- disk
...
- twitter-adapter,
...
- in
...
- test-apps/twitter-adapter/
...
- :
...
- listens
...
- to
...
- the
...
- feed
...
- from
...
- Twitter,
...
- converts
...
- status
...
- text
...
- into
...
- S4
...
- events,
...
- and
...
- passes
...
- them
...
- to
...
- the
...
- "RawStatus"
...
- stream
...
Have
...
a
...
look
...
at
...
the
...
code
...
in
...
these
...
directories.
...
You'll
...
note
...
that:
...
- the
...
- build.gradle
...
- file
...
- must
...
- be
...
- tailored
...
- to
...
- include
...
- new
...
- dependencies
...
- (twitter4j
...
- libs
...
- in
...
- twitter-adapter)
...
- events
...
- are
...
- partitioned
...
- through
...
- various
...
- keys
...
Run
...
it
...
!
Note:
...
You
...
need
...
a
...
twitter4j.properties
...
file
...
in
...
your
...
home
...
directory
...
with
...
the
...
following
...
content
...
(debug
...
is
...
optional):
...
Code Block
...
debug=true user=<a twitter username> password=<matching password>
...
- Start a Zookeeper instance. From the S4 base directory, do:
Code Block ./s4 zkServer
...
- Define 2 clusters : 1 for deploying the twitter-counter
...
- app,
...
- and
...
- 1
...
- for
...
- the
...
- adapter
...
- app
...
Code Block
...
./s4 newCluster -c=cluster1 -nbTasks=2 -flp=12000; ./s4 newCluster -c=cluster2 -nbTasks=1 -flp=13000
...
- Start 2 app nodes (you may want to start each node in a separate console) :
Code Block ./s4 node -c=cluster1
...
- Start 1 node for the adapter app:
Code Block ./s4 node -c=cluster2 -namedStringParameters=adapter.output.stream:RawStatus
...
- Deploy twitter-counter
...
- app
...
- (you
...
- may
...
- also
...
- first
...
- build
...
- the
...
- s4r
...
- then
...
- publish
...
- it,
...
- as
...
- described
...
- in
...
- the
...
- previous
...
- section)
...
Code Block
...
./s4 deploy -appName=twitter-counter -c=cluster1 -b=`pwd`/test-apps/twitter-counter/build.gradle
...
- Deploy twitter-adapter
...
- app.
...
- In
...
- this
...
- example,
...
- we
...
- don't
...
- directly
...
- specify
...
- the
...
- app
...
- class
...
- of
...
- the
...
- adapter,
...
- we
...
- use
...
- the
...
- deployment
...
- approach
...
- for
...
- apps
...
- (remember,
...
- the
...
- adapter
...
- is
...
- also
...
- an
...
- app).
...
Code Block
...
./s4 deploy -appName=twitter-adapter -c=cluster2 -b=`pwd`/test-apps/twitter-adapter/build.gradle
...
- Observe the current 10 most popular topics in file TopNTopics.txt.
...
- The
...
- file
...
- gets
...
- updated
...
- at
...
- regular
...
- intervals,
...
- and
...
- only
...
- outputs
...
- topics
...
- with
...
- a
...
- minimum
...
- of
...
- 10
...
- occurrences,
...
- so
...
- you
...
- may
...
- have
...
- to
...
- wait
...
- a
...
- little
...
- before
...
- the
...
- file
...
- is
...
- updated
...
- :
...
Code Block
...
tail -f TopNTopics.txt
...
...
What next?
You have now seen some basics applications, and you know how to run them, and how to get events into the system. You may now try to code your own apps with your own data.
This page will help for specifying your own dependencies.
There are more parameters available for the scripts (typing the name of the task will list the options). In particular, if you want distributed deployments, you'll need to pass the Zookeeper connection strings when you start the nodes.
You may also customize the communication and the core layers of S4 by tweaking configuration files and modules.
In conclusion, edges are still a bit rough, more aspects need to be documented, and this is not a final version, but that should let you start, and we're happy to help!