HelloPE
{gliffy:name=HelloApp|align=left|size=L|version=4}
* HelloInputAdapter is a simple adapter that reads character lines from a socket, converts them into events, and sends the events to interested S4 apps, through the "names" stream
h2. Run the sample project
In order to run an S4 application, you need :
* to set-up a cluster: provision a cluster and start S4 nodes for that cluster
* to package the app
* to publish the app on the cluster
# Start a Zookeeper server instance (log4j warnings come from Zookeeper and can be ignored here):
{code} S4:incubator-s4$ ./s4 zkServer
S4:myApp$ ./s4 zkServer &
[1] 68981
S4:myApp$ calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
[main] INFO org.apache.s4.tools.ZKServer - Starting zookeeper server on port [2181]
[main] INFO org.apache.s4.tools.ZKServer - cleaning existing data in [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/data] and [/var/folders/8V/8VdgKWU3HCiy2yV4dzFpDk+++TI/-Tmp-/tmp/zookeeper/log]
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkServer).
log4j:WARN Please initialize the log4j system properly.{code}
# Define a new cluster. Say a cluster named "cluster1" with 2 partitions, nodes listening to ports starting from 12000:
{code}S4:myApp$ ./s4 newCluster -c=cluster1 -nbTasks=2 -flp=12000
calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
[main] INFO org.apache.s4.tools.DefineCluster - preparing new cluster [cluster1] with [2] node(s)
[main] INFO org.apache.s4.tools.DefineCluster - New cluster configuration uploaded into zookeeper
{code}
# Start 2 S4 nodes with the default configuration, and attach them to cluster "cluster1":
{node}S4:myApp$ ./s4 node -c=cluster1
calling referenced s4 script : /Users/S4/tmp/s4-22/incubator-s4/s4
15:50:18.996 [main] INFO org.apache.s4.core.Main - Initializing S4 node with :
- comm module class [org.apache.s4.comm.DefaultCommModule]
- comm configuration file [default.s4.comm.properties from classpath]
- core module class [org.apache.s4.core.DefaultCoreModule]
- core configuration file[default.s4.core.properties from classpath]
-extra modules: []
[main] INFO org.apache.s4.core.Main - Starting S4 node. This node will automatically download applications published for the cluster it belongs to
{node}
# Build, package and publish the app to cluster1 (currently, you must use the name of the current project, and you need to specify the gradle build file with a complete path}:
{code}S4:myApp$ ./s4 deploy -appName=myApp -c=cluster1 -b=`pwd`/build.gradle
.... verbose logs for compiling, building the package, and publishing it to Zookeeper...
15:46:16.486 [main] INFO org.apache.s4.tools.Deploy - uploaded application [myApp] to cluster [cluster1], using zookeeper znode [/s4/clusters/cluster1/apps/myApp]
{code}
# S4 nodes will detect the new application, download it, load it and start it. You will get something like:
{code}[ZkClient-EventThread-15-localhost:2181] INFO o.a.s.d.DistributedDeploymentManager - Detected new application(s) to deploy {}[myApp]
[ZkClient-EventThread-15-localhost:2181] INFO org.apache.s4.core.Server - Local app deployment: using s4r file name [myApp] as application name
[ZkClient-EventThread-15-localhost:2181] INFO org.apache.s4.core.Server - App class name is: hello.HelloApp
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]} from null
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Adding topology change listener:org.apache.s4.comm.tcp.TCPEmitter@79b2591c
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s.comm.topology.AssignmentFromZK - New session:87684175268872203; state is : SyncConnected
[ZkClient-EventThread-19-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]} from { nbNodes=0,name=unknown,mode=unicast,type=,nodes=[]}
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s.comm.topology.AssignmentFromZK - Successfully acquired task:Task-1 by myMachine.myNetwork
[ZkClient-EventThread-19-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from { nbNodes=1,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}]}
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872205
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClustersFromZK - Detected new stream [names]
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClustersFromZK - New session:87684175268872206
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s4.comm.topology.ClusterFromZK - Changing cluster topology to { nbNodes=2,name=cluster1,mode=unicast,type=,nodes=[{partition=0,port=12000,machineName=myMachine.myNetwork,taskId=Task-0}, {partition=1,port=12001,machineName=myMachine.myNetwork,taskId=Task-1}]} from null
[ZkClient-EventThread-15-localhost:2181] INFO org.apache.s4.core.Server - Loaded application from file /tmp/deploy-test/cluster1/myApp.s4r
[ZkClient-EventThread-15-localhost:2181] INFO o.a.s.d.DistributedDeploymentManager - Successfully installed application myApp
[ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s.c.g.OverloadDispatcherGenerator - Dumping generated overload dispatcher class for PE of class [class hello.HelloPE]
[ZkClient-EventThread-15-localhost:2181] DEBUG o.a.s4.comm.topology.ClustersFromZK - Adding input stream [names] for app [-1] in cluster [cluster1]
[ZkClient-EventThread-15-localhost:2181] INFO org.apache.s4.core.App - Init prototype [hello.HelloPE].
{code}
Great, this application is deployed on 2 S4 nodes. It needs some input though...
We can get input through an adapter, i.e. an S4 app that converts an external stream into S4 events, and injects the events into S4 clusters. In the sample application, the adapter is a very basic class, that extends App, listens to an input socket, and converts each received line of characters into a generic S4 event, in which the line data is kept in a "name" field. We specify :
#* the adapter class
#* the name of the output stream
#* the cluster where to deploy this app
For easy testing, we provide a facility to start a node with an adapter app without having to package and deploy the adapter app.
# First, we need to define a new S4 subcluster for that app:
{code}S4:myApp$ ./s4 newCluster -c=cluster2 -nbTasks=1 -flp=13000{code}
# Then we can start the adapter, and we use "names" for identifying the output stream (this is the same name used as input by the myApp app)
{code}./s4 adapter -appClass=hello.HelloInputAdapter -c=cluster2 -namedStringParameters=adapter.output.stream:names{code}
# Now let's just provide some data to the external stream:
{code}S4:~$ echo "Bob" | nc localhost 15000{code}
# Nodes should output in their console:
{code}Hello Bob!{code}
|