Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • HelloPE.java : a very simple PE that simply prints the name contained in incoming events
    Code Block
    // ProcessingElement provides integration with the S4 platform
    public class HelloPE extends ProcessingElement {
    
        // you should define downstream streams here and inject them in the app definition
    
        // PEs can maintain some state
        boolean seen = false;
    
        // This method is called upon a new Event on an incoming stream. 
        // You may overload it for handling instances of your own specialized subclasses of Event
        public void onEvent(Event event) {
            System.out.println("Hello " + (seen ? "again " : "") + event.get("name") + "!");
            seen = true;
        }
    // skipped remaining methods
    

...

  1. Start a Zookeeper server instance (log4j warnings come from Zookeeper and can be ignored here):
    Code Block
     S4:incubator-s4$ ./s4 zkServer 
    S4:myApp$ calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    [main] INFO  org.apache.s4.tools.ZKServer - Starting zookeeper server on port [2181]
    [main] INFO  org.apache.s4.tools.ZKServer - cleaning existing data in [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/data] and [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/log]
    log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkServer).
    log4j:WARN Please initialize the log4j system properly.
  2. Define a new cluster. Say a cluster named "cluster1" with 2 partitions, nodes listening to ports starting from 12000:
    Code Block
    S4:myApp$ ./s4 newCluster -c=cluster1 -nbTasks=2 -flp=12000
    calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    [main] INFO  org.apache.s4.tools.DefineCluster - preparing new cluster [cluster1] with [2] node(s)
    [main] INFO  org.apache.s4.tools.DefineCluster - New cluster configuration uploaded into zookeeper
    
  3. Start 2 S4 nodes with the default configuration, and attach them to cluster "cluster1":
    Code Block
    S4:myApp$ ./s4 node -c=cluster1
    calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
    15:50:18.996 [main] INFO  org.apache.s4.core.Main - Initializing S4 node with : 
    - comm module class [org.apache.s4.comm.DefaultCommModule]
    - comm configuration file [default.s4.comm.properties from classpath]
    - core module class [org.apache.s4.core.DefaultCoreModule]
    - core configuration file[default.s4.core.properties from classpath]
    -extra modules: []
    [main] INFO  org.apache.s4.core.Main - Starting S4 node. This node will automatically download applications published for the cluster it belongs to
    
  4. Build, package and publish the app to cluster1:
    1. You may do that in a single step (currently, you must use the name of the current project, and you need to specify the gradle build file with a complete path}:
      Code Block
      S4:myApp$ ./s4 deploy -appName=myApp -c=cluster1 -b=`pwd`/build.gradle
      .... verbose logs for compiling, building the package, and publishing it to Zookeeper...
      15:46:16.486 [main] INFO  org.apache.s4.tools.Deploy - uploaded application [myApp] to cluster [cluster1], using zookeeper znode [/s4/clusters/cluster1/apps/myApp]
      
    2. You may also do that in 2 separate steps:
      1. Create an s4r archive:
        Code Block
        ./gradlew s4r
      2. Publish the s4r archive (you may first copy it to a more adequate place):
        Code Block
        ./s4 deploy -s4r=`pwd`/build/libs/myApp.s4r -c=cluster1 -appName=myApp
  5. S4 nodes will detect the new application, download it, load it and start it. You will get something like:
    Code Block
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.d.DistributedDeploymentManager - Detected new application(s) to deploy {}[myApp]
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - Local app deployment: using s4r file name [myApp] as application name
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - App class name is: hello.HelloApp
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]} from null
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Adding topology change listener:org.apache.s4.comm.tcp.TCPEmitter@79b2591c
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.comm.topology.AssignmentFromZK - New session:87684175268872203; state is : SyncConnected
    [ZkClient-EventThread-19-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]} from { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]}
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.comm.topology.AssignmentFromZK - Successfully acquired task:Task-1 by myMachine.myNetwork
    [ZkClient-EventThread-19-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]}
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872205
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - Detected new stream [names]
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872206
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from null
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.Server - Loaded application from file /tmp/deploy-test/cluster1/myApp.s4r
    [ZkClient-EventThread-15-localhost:2181] INFO  o.a.s.d.DistributedDeploymentManager - Successfully installed application myApp
    [ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s.c.g.OverloadDispatcherGenerator - Dumping generated overload dispatcher class for PE of class [class hello.HelloPE]
    [ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s4.comm.topology.ClustersFromZK - Adding input stream [names] for app [-1] in cluster [cluster1]
    [ZkClient-EventThread-15-localhost:2181] INFO  org.apache.s4.core.App - Init prototype [hello.HelloPE].
    

...

  1. First, we need to define a new S4 subcluster for that app:
    Code Block
    S4:myApp$ ./s4 newCluster -c=cluster2 -nbTasks=1 -flp=13000
  2. Then we can start the adapter, and we use "names" for identifying the output stream (this is the same name used as input by the myApp app)
    Code Block
    ./s4 adapter -appClass=hello.HelloInputAdapter -c=cluster2 -namedStringParameters=adapter.output.stream:names
  3. Now let's just provide some data to the external stream:
    Code Block
    S4:~$ echo "Bob" | nc localhost 15000
  4. Nodes One of the node should output in their its console:
    Code Block
    Hello Bob!

both nodes If you keep sending messages, nodes will alternatively display the same messages because we did not specify any partitioning scheme for "hello" messages because the adapter app sends keyless events on the "names" stream in the producera round-robin fashion by default.

What happened?

The following figures illustrate the various steps we have taken. The local file system is used as the S4 application repository in our example.

...