This repository has been archived by the owner on Jun 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Application examples
heipacker edited this page Feb 13, 2014
·
6 revisions
This article helps readers to quickly implement a JStorm example.
The simplest JStorm example is divided into four steps:
Map conf = new HashMp();
//all custom configurations of topology are placed in the Map
TopologyBuilder builder = new TopologyBuilder();
//create topology builder
int spoutParal = get("spout.parallel", 1);
//get spout concurrency settings
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
new SequenceSpout(), spoutParal);
//create Spout, new SequenceSpout() is spout object,SequenceTopologyDef.SEQUENCE_SPOUT_NAME is spout name, note that name do not contain space
int boltParal = get("bolt.parallel", 1);
//get bolt concurrency settings
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//create bolt, SequenceTopologyDef.TOTAL_BOLT_NAME is bolt name,TotalCount is bolt object,boltParal is the number of bolt concurrent,
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME),
//receive the data of SequenceTopologyDef.SEQUENCE_SPOUT_NAME by shuffle,
//That is, each spout random polling send tuple to the next bolt
int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//Set the The number of acker concurrent
int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//indicates the number of worker topology to be used
conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//set topology model for distributed,this topology can put runs on a JStorm cluster
StormSubmitter.submitTopology(streamName, conf, builder.createTopology());
//submit topology
IRichSpout is the easiest Spout Interface
IRichSpout{
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
Note:
- Spout object must inherit Serializable interface, thus requiring all the data structures within the spout must be serialized.
- Spout can have a constructor, but the constructor is executed only once when submitting the task to create spout object, Therefore, initialization can be done here before task is assigned to specific worker, once completed, the contents of the initialization will bring to each task (because when you submit task, spout object will be serialized to a file, and then spout object will be deserialized from file when worker start).
- open action is the initialization action after the task start up.
- close action is the close action after the task shutdown.
- activate action triggered after the task is activated.
- deactivate action triggered after the task is activated.
- nextTuple is the core implements of a spout, nextuple is the logic which you need completed, collector emit the message out when taking a message.
- ack action triggered after the task taking a ack message, the details of ack mechanism.
- fail action triggered after the task taking a fail message, the details of ack mechanism.
- declareOutputFields defines the meaning of each field which emitted out by spout.
- getComponentConfiguration is the interface of component configuration for spout.
IRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Note:
- Bolt object must inherit serializable, thus requiring all the data structures within the bolt must be serialized.
- Bolt can have a constructor, but the constructor is executed only once when submitting the task to create a bolt object, Therefore, initialization can be done here before task is assigned to a specific worker, once completed, the contents of the initialization will bring to each task (because when you submit, bolt object will be serialized to a file, and then bolt object will be deserialized from file when the worker start).
- prepare action is the initialization action after the task start up.
- cleanup action is the close action after the task shutdown.
- execute is the core implements of a blot, execute is the logic which you need completed, collector may emit the message out when taking a message. In the executor, when program process a message, you need to perform collector.ack, details can refer ack mechanism, when program unable to process a message or have an error, you need to perform collector.fail, details can refer ack mechanism.
- declareOutputFields defines the meaning of each field which emitted out by bolt.
- getComponentConfiguration is the interface of component configuration for bolt.
Configuration in Maven
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
</dependency>
If you can not find jstorm-client and jstorm-client-extension package, you can download the source code of JStorm to compile, please refer to the source code compiler When packing, you need packing all dependent to one package.
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>storm.starter.SequenceTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
- xxxx.jar is packaged jar.
- com.alibaba.xxxx.xx is the entry of the task class.
- parameter is the arguments of topology.