Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Application examples

heipacker edited this page Feb 13, 2014 · 6 revisions

This article helps readers to quickly implement a JStorm example.

Example source

The simplest JStorm example is divided into four steps:

Generate Topology

Map conf = new HashMp();
//all custom configurations of topology are placed in the Map

TopologyBuilder builder = new TopologyBuilder();
//create topology builder

int spoutParal = get("spout.parallel", 1);
//get spout concurrency settings

SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
                new SequenceSpout(), spoutParal);
//create Spout, new SequenceSpout() is spout object,SequenceTopologyDef.SEQUENCE_SPOUT_NAME is spout name, note that name do not contain space

int boltParal = get("bolt.parallel", 1);
//get bolt concurrency settings

BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//create bolt, SequenceTopologyDef.TOTAL_BOLT_NAME is bolt name,TotalCount is bolt object,boltParal is the number of bolt concurrent,
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), 
//receive the data of SequenceTopologyDef.SEQUENCE_SPOUT_NAME by shuffle,
//That is, each spout random polling send tuple to the next bolt

int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//Set the The number of acker concurrent

int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//indicates the number of worker topology to be used

conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//set topology model for distributed,this topology can put runs on a JStorm cluster

StormSubmitter.submitTopology(streamName, conf, builder.createTopology());
//submit topology

IRichSpout

IRichSpout is the easiest Spout Interface

 IRichSpout{

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    }

    @Override
    public void close() {
    }

    @Override
    public void activate() {
    }

    @Override
    public void deactivate() {
    }

    @Override
    public void nextTuple() {
    }

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

Note:

  • Spout object must inherit Serializable interface, thus requiring all the data structures within the spout must be serialized.
  • Spout can have a constructor, but the constructor is executed only once when submitting the task to create spout object, Therefore, initialization can be done here before task is assigned to specific worker, once completed, the contents of the initialization will bring to each task (because when you submit task, spout object will be serialized to a file, and then spout object will be deserialized from file when worker start).
  • open action is the initialization action after the task start up.
  • close action is the close action after the task shutdown.
  • activate action triggered after the task is activated.
  • deactivate action triggered after the task is activated.
  • nextTuple is the core implements of a spout, nextuple is the logic which you need completed, collector emit the message out when taking a message.
  • ack action triggered after the task taking a ack message, the details of ack mechanism.
  • fail action triggered after the task taking a fail message, the details of ack mechanism.
  • declareOutputFields defines the meaning of each field which emitted out by spout.
  • getComponentConfiguration is the interface of component configuration for spout.

Bolt

IRichBolt {

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    }

    @Override
    public void execute(Tuple input) {
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

Note:

  • Bolt object must inherit serializable, thus requiring all the data structures within the bolt must be serialized.
  • Bolt can have a constructor, but the constructor is executed only once when submitting the task to create a bolt object, Therefore, initialization can be done here before task is assigned to a specific worker, once completed, the contents of the initialization will bring to each task (because when you submit, bolt object will be serialized to a file, and then bolt object will be deserialized from file when the worker start).
  • prepare action is the initialization action after the task start up.
  • cleanup action is the close action after the task shutdown.
  • execute is the core implements of a blot, execute is the logic which you need completed, collector may emit the message out when taking a message. In the executor, when program process a message, you need to perform collector.ack, details can refer ack mechanism, when program unable to process a message or have an error, you need to perform collector.fail, details can refer ack mechanism.
  • declareOutputFields defines the meaning of each field which emitted out by bolt.
  • getComponentConfiguration is the interface of component configuration for bolt.

Compile

Configuration in Maven

         <dependency>
	    <groupId>com.alibaba.jstorm</groupId>
	    <artifactId>jstorm-client</artifactId>
	    <version>0.9.0</version>
	    <scope>provided</scope>
	 </dependency> 
         <dependency>
            <groupId>com.alibaba.jstorm</groupId>
            <artifactId>jstorm-client-extension</artifactId>
            <version>0.9.0</version>
            <scope>provided</scope>
        </dependency>

If you can not find jstorm-client and jstorm-client-extension package, you can download the source code of JStorm to compile, please refer to the source code compiler When packing, you need packing all dependent to one package.

<build>
        <plugins>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>storm.starter.SequenceTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

Submit jar

jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter

  • xxxx.jar is packaged jar.
  • com.alibaba.xxxx.xx is the entry of the task class.
  • parameter is the arguments of topology.
Clone this wiki locally