Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Download from Apache git repository. Follow instructions here. Checkout branch S4-22
  2. Compile and install S4 in the local maven repository: (you can also let the tests run, which is currently quite long: we're not yet using mocks)
    Code Block
    S4:incubator-s4$ ./gradlew install -DskipTests
    .... verbose logs ...
    
  3. Build the startup scripts: 
    Code Block
    S4:incubator-s4$ ./gradlew s4-tools:installApp
    .... verbose logs 
    ...:s4-tools:installApp
    

...

  1. Start a Zookeeper server instance (log4j warnings come from Zookeeper and can be ignored here):
    Code Block
     S4:incubator-s4$ ./s4 zkServer 
    S4:myApp$ calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    [main] INFO  org.apache.s4.tools.ZKServer - Starting zookeeper server on port [2181]
    [main] INFO  org.apache.s4.tools.ZKServer - cleaning existing data in [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/data] and [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/log]
    log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkServer).
    log4j:WARN Please initialize the log4j system properly.
  2. Define a new cluster. Say a cluster named "cluster1" with 2 partitions, nodes listening to ports starting from 12000:
    Code Block
    S4:myApp$ ./s4 newCluster -c=cluster1 -nbTasks=2 -flp=12000
    calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    [main] INFO  org.apache.s4.tools.DefineCluster - preparing new cluster [cluster1] with [2] node(s)
    [main] INFO  org.apache.s4.tools.DefineCluster - New cluster configuration uploaded into zookeeper
    
  3. Start 2 S4 nodes with the default configuration, and attach them to cluster "cluster1":
    Code Block
    S4:myApp$ ./s4 node -c=cluster1
    calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    15:50:18.996 [main] INFO  org.apache.s4.core.Main - Initializing S4 node with : 
    - comm module class [org.apache.s4.comm.DefaultCommModule]
    - comm configuration file [default.s4.comm.properties from classpath]
    - core module class [org.apache.s4.core.DefaultCoreModule]
    - core configuration file[default.s4.core.properties from classpath]
    -extra modules: []
    [main] INFO  org.apache.s4.core.Main - Starting S4 node. This node will automatically download applications published for the cluster it belongs to
    
  4. Build, package and publish the app to cluster1 (currently, you must use the name of the current project, and you need to specify the gradle build file with a complete path}:
    Code Block
    S4:myApp$ ./s4 deploy -appName=myApp -c=cluster1 -b=`pwd`/build.gradle
    .... verbose logs for compiling, building the package, and publishing it to Zookeeper...
    15:46:16.486 [main] INFO  org.apache.s4.tools.Deploy - uploaded application [myApp] to cluster [cluster1], using zookeeper znode [/s4/clusters/cluster1/apps/myApp]
    
  5. S4 nodes will detect the new application, download it, load it and start it. You will get something like:
    Code Block
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.d.DistributedDeploymentManager - Detected new application(s) to deploy {}[myApp]
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - Local app deployment: using s4r file name [myApp] as application name
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - App class name is: hello.HelloApp
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]} from null
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Adding topology change listener:org.apache.s4.comm.tcp.TCPEmitter@79b2591c
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.comm.topology.AssignmentFromZK - New session:87684175268872203; state is : SyncConnected
    [ZkClient-EventThread-19-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]} from { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]}
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.comm.topology.AssignmentFromZK - Successfully acquired task:Task-1 by myMachine.myNetwork
    [ZkClient-EventThread-19-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]}
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872205
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - Detected new stream [names]
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872206
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from null
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - Loaded application from file /tmp/deploy-test/cluster1/myApp.s4r
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.d.DistributedDeploymentManager - Successfully installed application myApp
    [ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s.c.g.OverloadDispatcherGenerator - Dumping generated overload dispatcher class for PE of class [class hello.HelloPE]
    [ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s4.comm.topology.ClustersFromZK - Adding input stream [names] for app [-1] in cluster [cluster1]
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.App - Init prototype [hello.HelloPE].
    

Great, this ! The application is now deployed on 2 S4 nodes. It needs some input though...

...

  1. First, we need to define a new S4 subcluster for that app:
    Code Block
    S4:myApp$ ./s4 newCluster -c=cluster2 -nbTasks=1 -flp=13000
  2. Then we can start the adapter, and we use "names" for identifying the output stream (this is the same name used as input by the myApp app)
    Code Block
    ./s4 adapter -appClass=hello.HelloInputAdapter -c=cluster2 -namedStringParameters=adapter.output.stream:names
  3. Now let's just provide some data to the external stream:
    Code Block
    S4:~$ echo "Bob" | nc localhost 15000
  4. Nodes should output in their console:
    Code Block
    Hello Bob!

What happened?

The following figures illustrate the various steps we have taken.
Image Added

Run the Twitter trending example

Let's have a look at another application, that computes trendy Twitter topics by listening to the spritzer stream from the Twitter API. This application was adapted from a previous example in S4 0.3.

Overview

This application is divided into:

  • twitter-counter , in test-apps/twitter-counter/ : extracts topics from tweets and maintains a count of the most popular ones, periodically dumped to disk
  • twitter-adapter, in test-apps/twitter-adapter/ : listens to the feed from Twitter, converts status text into S4 events, and passes them to the "RawStatus" stream

Have a look at the code in these directories. You'll note that:

  • the build.gradle file must be tailored to include new dependencies (twitter4j libs in twitter-adapter)
  • events are partitioned through various keys

Run it!

Note: You need a twitter4.properties file in your home directory with the following content (debug is optional):

Code Block

debug=true
user=<a twitter username>
password=<matching password>
  1. Start a Zookeeper instance. From the S4 base directory, do:
    Code Block
    ./s4 zkServer
  2. Define 2 clusters : 1 for deploying the twitter-counter app, and 1 for the adapter app
    Code Block
    ./s4 newCluster -c=cluster1 -nbTasks=2 -flp=12000; ./s4 newCluster -c=cluster2 -nbTasks=1 -flp=13000
  3. Start 2 app nodes:
    Code Block
    ./s4 node -c=cluster1
  4. Start 1 node for the adapter app:
    Code Block
    ./s4 node -c=cluster2 -namedStringParameters=adapter.output.stream:RawStatus
  5. Deploy twitter-counter app
    Code Block
    ./s4 deploy -appName=twitter-counter -c=cluster1 -b=`pwd`/test-apps/twitter-counter/build.gradle
  6. Deploy twitter-adapter app. In this example, we don't directly specify the app class of the adapter, we use the deployment approach for apps (remember, the adapter is also an app).
    Code Block
    ./s4 deploy -appName=twitter-adapter -c=cluster2 -b=`pwd`/test-apps/twitter-adapter/build.gradle
  7. Observe the current 10 most popular topics in file TopNTopics.txt:
    Code Block
    tail -f TopNTopics.txt

What next?

You have now seen some basics applications, and you know how to run them, and how to get events into the system. You may now try to code your own apps with your own data.

There are more parameters available for the scripts (typing the name of the task will list the options). In particular, if you want distributed deployments, you'll need to pass the Zookeeper connection strings when you start the nodes.

You may also customize the communication and the core layers of S4 by tweaking configuration files and modules.

In conclusion, edges are still a bit rough, more aspects need to be documented, and this is not a final version, but that should let you start, and we're happy to help!