在本节中,我们创建一个名字叫WordCountApp的Topology。这个Topology的作用是统计一个名为words.txt文本中每个单词出现的次数。我们一步一步的分析,是如何根据我们需求,最终创建出Topology。
一、Topology组件分析
假设我们的words.txt中内容如下:
A Storm Cluster can run many topologies at the same time Topology consists of spouts and bolts This is the first topology example
这个Topology可能会由以下的组件组成:
WordReader(Spout):我们需要从外数据源wrods.txt中获取数据,这里的words.txt其实就是外部数据源。为了明确这个Spout的作用,我们将其称之为WordReader。
WordNormalizer(Bolt):这个Bolt作用是格式化单词。通常情况下,我们统计一个单词出现次数的时候,是不区分大小写的。所以我们,必须要有这样一个环节来转换大小写。事实上,如果我们的文本中,还出现了标点符号",","."等,我们还要将这些符号去除,这就是WordNormalizer的作用。为了简单,我们这里并没有出现符号。
WordCounter(Bolt):用于统计经过WordNormalizer处理后的单词的出现次数。
因次,我们的WordCountApp这个Topology可能如下所示:
二、API简介
Components组件API介绍
构建一个Topology有很多,根据我们需求的不同,我们可能会使用不同的API,下图列出了,本节中我们会提到的,或者在WordCountApp案例中会涉及到的API。
我们之前提过,Topology中的Spout和Bolt称之为Topology的组件(Components)。
在Storm API中,定义了一个IComponent接口表示组件,用ISpout表示一个Spout,IBolt表示一个Bolt。IRichSpout接口分别继承了IComponent和ISpout接口,意味着这个接口的实现类,既是Spout,又是Topology的组件。
IRichBolt接口集成了IComponent接口和IBolt接口,表示这个接口的实现类即是Component,又是Bolt。
因此在开发中,我们通常只要实现IRichSpout和IRichBolt即可。由于IRichBolt和IRichSpout接口中定义的方法比较多。有些方法我们可能并不想实现,因此分别提供了一个实现类,BaseRichSpout和BaseRichBolt。把一些并不是一定要用户实现的方法,提供了一个默认的实现,来简化我们的开发,这实际上就适配器设计模式。
作为第一个案例,为了尽量熟悉各个API中的方法,我们编写的WordReader会去实现IRichSpout,因此其作用就是从外部数据源获取数据,WordNormalizer 和WordCounter 会实现IRichBolt,它们的作用是分析处理数据。
三 编写Topology
准备工作,创建一个maven工程wordcountapp,pom.xml如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion> 4.0.0</modelVersion > <groupId> com.tianshouzhi</groupId > <artifactId> wordcountapp</artifactId > <version> 0.0.1-SNAPSHOT</version > <dependencies> <dependency> <groupId> org.apache.storm</groupId > <artifactId> storm-core</artifactId > <version> 0.9.2-incubating</version > </dependency> </dependencies> </project>
1、创建WordReader(Spout)
我们的WordReader要实现IRichSpout接口,这个接口中,定义的方法包括:
目前我们最关心的是,用红色框标记出的三个方法,对于其他的方法,我们只打印出一句话。
WordReader.java
package com.tianshouzhi.study.wordcountapp.spouts;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* WordReader(Spout),用于从外部数据源words.txt中获取数据
*/
public class WordReader implements IRichSpout {
private SpoutOutputCollector collector ;
private FileReader fileReader ;
BufferedReader reader;
private boolean completed = false;
/**
* 这个方法做的惟一一件事情就是分发文件中的文本行
*/
public void nextTuple() {
/**
* 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回。
*/
if (completed ) {
try {
Thread. sleep(1000);
} catch (InterruptedException e ) {
// 什么也不做
}
return;
}
String str;
try {
int i = 0;
// 读所有文本行
while ((str = reader.readLine()) != null) {
System. out.println("WordReader.nextTuple(),emits time:" + i++);
/**
* 按行发布一个新值
*/
this.collector .emit(new Values( str), str );
}
} catch (Exception e ) {
throw new RuntimeException("Error reading tuple", e);
} finally {
completed = true ;
}
}
/**
*
* 当Spout被创建之后,这个方法会被条用
*/
public void open(Map conf, TopologyContext context , SpoutOutputCollector collector ) {
System.out.println( "WordReader.open(Map conf, TopologyContext context, SpoutOutputCollector collector)");
String fileName = conf .get("fileName").toString();
InputStream inputStream=WordReader.class.getClassLoader().getResourceAsStream( fileName);
reader =new BufferedReader(new InputStreamReader(inputStream ));
this.collector = collector;
}
/**
* 声明数据格式,即输出的一个Tuple中,包含几个字段
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System. out.println("WordReader.declareOutputFields(OutputFieldsDeclarer declarer)");
declarer.declare(new Fields("line"));
}
@Override
public void activate() {
System. out.println("WordReader.activate()" );
}
@Override
public void deactivate() {
System. out.println("WordReader.deactivate()" );
}
@Override
public Map<String, Object> getComponentConfiguration() {
System. out.println("WordReader.getComponentConfiguration()" );
return null ;
}
/**
* 当一个Tuple处理成功时,会调用这个方法
*/
public void ack(Object msgId) {
System. out.println("WordReader.ack(Object msgId):" + msgId);
}
/**
* 当Topology停止时,会调用这个方法
*/
public void close() {
System. out.println("WordReader.close()" );
}
/**
* 当一个Tuple处理失败时,会调用这个方法
*/
public void fail(Object msgId) {
System. out.println("WordReader.fail(Object msgId):" + msgId);
}
}目前我们主要关注的是以下三个方法:
1、open方法
这个方法的声明如下:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector )
当Spout被实例化之后,open方法会被调用一次。通常情况下,这个方法中,我们会做一些初始化的动作。例如建立与外部数据源的链接,在本例中,就是获取到了外部数据文件words.txt的读取器。
这个方法还传递进来了3个参数。
参数1:Map conf
conf对象中维护的是一些配置信息。例如在这个方法中,我们获取words.txt文件的位置就是通过
conf .get("wordsFile")这种方式来进行的。conf对象中还维护了一些其他的默认参数,读者自己可以将这个对象中的参数打印出来进行观察。
参数2:TopologyContext context
这个参数中,包含了Topology运行时的上下文信息。要知道这个对象的作用,最简单的方式就是查看这个对象中,有哪些方法和字段,此处不做过多讲解。
参数3:SpoutOutputCollector collector
从名字中,我们就可以看出,这个对象的作用,Spout输出收集器,我们之前说过Spout需要将数据以Tuple的形式发送给Bolt,就是通过这个对象的emit方法来实现的。
2、declareOutputFields方法
方法的声明如下:
public void declareOutputFields(OutputFieldsDeclarer declarer)
这个方法的作用是,声明由这个Spout输出的流中的每一个Tuple,包含哪些字段。 之前我们提到过,在Topology中,不同的Spout和Bolt以及Bolt与Bolt之间形成数据流中,同一个Stream中Tuple的数据格式一定是相同的,不同的数据流中的Tuple的数据格式可能相同也可能不同。那么一个Tuple中的数据格式到底是怎么样的呢?就是在这个方法中定义的。
例如在这里,我们声明了一个line字段。declarer .declare(new Fields( "line"));,这就相当于宣告,每个Tuple中只能含有一个字段,这个字段的名字叫做line。如果一个Tuple中有多个字段,那么我们在这里就要声明多个字段。
3、nextTuple方法
这个方法会被不断的调用,因为Spout需要不断的从外部数据源中获取最新的数据,然后使用SpoutOutputCollector的emit方法来进行发射。
补充:SpoutOutputCollector的emit方法
SpoutOutputCollector的emit方法有很多种重载形式,比较常用的是红色框中的几种。
在本例中,我们使用的是第二个emit方法。这个方法第一个参数是List集合。,但是我们的代码中使用的却是Values这个对象。如下:
this .collector.emit( new Values(str ), str);
这是因为Values对象是ArrayList集合的子类。读者可以自行查看这个类的源码。这里主要说明,我们构造Values对象时,其构造方法接受一个可变参数,我们这里只传递了一个str,说明最终List集合中只有一个元素。而且只能是一个元素。因为我们我们已经在declareOutputFields这个方法中声明过,只有一个字段line。
对于其他的几个方法,后面我们也会详细介绍,目前只是在每句话里面打印出了一句话。
2、创建WordNormalizer(实现IRichBolt)
WordNormalizer.java
package com.tianshouzhi.study.wordcountapp.bolts;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt{
/**
*
*/
private static final long serialVersionUID = 3644849073824009317L;
private OutputCollector collector ;
/**
* *bolt*从单词文件接收到文本行,并标准化它。
* 文本行会全部转化成小写,并切分它,从中得到所有单词。
*/
public void execute(Tuple input ){
System.out.println( "WordNormalizer.execute()" );
String sentence = input .getString(0);
String[] words = sentence .split(" ");
for(String word : words){
word = word .trim();
if(!word .isEmpty()){
word=word .toLowerCase();
/*//发布这个单词*/
collector.emit(input ,new Values(word));
}
}
//对元组做出应答
collector.ack(input );
}
public void prepare(Map stormConf, TopologyContext context , OutputCollector collector ) {
System. out.println("WordNormalizer.prepare()" );
this. collector=collector ;
}
/**
* 这个*bolt*只会发布“word”域
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System. out.println("WordNormalizer.declareOutputFields()" );
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
System. out.println("WordNormalizer.getComponentConfiguration()" );
return null ;
}
public void cleanup(){
System. out.println("WordNormalizer.cleanup()" );
}
}在这个Bolt中,我们关心的方法是prepare、execute和declareOutputFields,对于其他方法,我们依然是打印出一句话。
prepare 方法:
声明如下:
public void prepare( Map stormConf, TopologyContext context, OutputCollector collector )
这个方法与Spout的open方法参数很类似,唯一不同的就是最后一个参数,Spout中的open方法传递的是SpoutOutPutCollector,这里传递的是OutputCollector。不过都是数据收集器。主要的区别是,emit方法可以多出一个anchor或者anchors的参数,这与消息的可靠性保证有关,我们将会在后面详细讲解。如下红色框中全部的部分:
declareOutputFields方法:
因为Bolt还可以发送数据到下一级Bolt,因此,我们同样要指定这个Tuple中的数据格式。如果这个Bolt不要输出的话,我们就可以不声明。通常最后一个Bolt,因为没有下一级Bolt,这个方法就可以直接返回null。如我们下面要编写的WordCounter。
execute方法:
声明如下:
public void execute(Tuple input)
每当Bolt接受到一个Tuple的时候,就会调用一次execute方法。Tuple就是Spout传递给Bolt的数据。额..等等,我们在之前刚说过,本案例中WordReader发射数据时,使用的是下面这个方法:
明明发送的是一个List集合。但是这里接受参数却是Tuple类型。实际上是没有错的,Tuple是一个接口,我们可以看看其实现类TupleImpl中的源码。
相信看到这个读者应该明白了,Tuple实际上是将Spout端发送的数据进行了一层封装,原来的List集合还在,此外还封装了一些元数据。
因为我们原来的Spout每次发送的List<Object>集合中,只有一个元素,因此,在这里,就可以通过input.getString(0);来获取。
3、编写WordCounter(实现IRichBolt)
这个Bolt将统计到的最终结果放到一个map集合中。并在Bolt销毁时,调用cleanup方法时,打印出统计结果。
WordCounter.java
package com.tianshouzhi.study.wordcountapp.bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class WordCounter implements IRichBolt{
Integer id;
String name;
Map<String,Integer> counters;
private OutputCollector collector;
/**
* 当Bolt销毁时,我们会显示单词数量
*/
@Override
public void cleanup(){
for(Map.Entry<String,Integer> entry : counters.entrySet()){
System. out.println(entry .getKey()+": "+ entry.getValue());
}
System.out.println( "WordCounter.cleanup()" );
}
/**
* 为每个单词计数
*/
@Override
public void execute(Tuple input ) {
System.out.println( "WordCounter.execute()" );
String str=input .getString(0);
/**
* 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1
*/
if(!counters .containsKey(str)){
counters.put(str ,1);
} else{
Integer c = counters.get(str ) + 1;
counters.put(str ,c );
}
//对元组作为应答
collector.ack(input );
}
/**
* 初始化
*/
@Override
public void prepare(Map stormConf, TopologyContext context , OutputCollector collector ){
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System. out.println("WordCounter.declareOutputFields()" );
}
@Override
public Map<String, Object> getComponentConfiguration() {
System. out.println("WordCounter.getComponentConfiguration()" );
return null ;}
}这个Bolt因为不需要继续输出,所以declareOutputFields方法中仅仅是打印了一句话。
4、创建WordCountApp类
前面我们已经将构建Topology的各个组件都编写好了。现在我们需要编写一个类,来将组件组合成一个真正的Topology。
WordCountApp.java
package com.tianshouzhi.study.wordcountapp;
import com.tianshouzhi.study.wordcountapp.bolts.WordCounter;
import com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer;
import com.tianshouzhi.study.wordcountapp.spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountApp {
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
//定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader" , new WordReader());
builder.setBolt("word-normalizer" , new WordNormalizer()).shuffleGrouping("word-reader" );
builder.setBolt("word-counter" , new WordCounter()).fieldsGrouping("word-normalizer" , new Fields("word"));
StormTopology topology = builder .createTopology();
//配置
Config conf = new Config();
String fileName ="words.txt" ;
conf.put("fileName" , fileName );
conf.setDebug(false);
//运行拓扑
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Topologie" , conf , topology );
Thread. sleep(5000);
cluster.shutdown();
}
}TopologyBuilder对象用于创建Topology。在我们设置完Spout和Bolt之后,最后通过对象的createTopology()方法来创建StormTopology对象。
Config对象是Storm中提供的一个配置类,其实Map的子类,还记得我们之前在WordReader中的open方法中,是通过conf对象来获取words.txt的文件的路径,从而创建BufferedReader。
Storm支持两种运行方式,本地模式和远程模式。在本例中,我们使用LocalCluster这个类,模拟一个本地集群,从而使我们创建的代码在本地就可以运行,不用提交到Storm集群中。通常情况下,在开发的时候,我们会使用本地模式,在提测和产品环境下,我们才会将代码真正的提交到Storm集群去运行。
5、添加日志配置文件
Storm使用的日志框架是logback,默认的日志输出级别是INFO,因此在运行的时候会输出很多日志,影响我们的观察。因此,此处使用一个简单的logback.xml配置文件,将日志级别调整为ERROR。在后面,我们将会有一个章节,详细的来说明Storm集群中的日志相关问题。
logback.xml
<?xml version="1.0" encoding= "UTF-8"?> <configuration> <root level="ERROR" /> </configuration>
最终,我们的项目目录结构如下所示:
四、本地模式运行Topology
本地模式运行很简单,我们只需要右击-->Run as-->JavaApplication即可
运行后,控制台输出
WordReader.getComponentConfiguration() WordNormalizer.getComponentConfiguration() WordCounter.getComponentConfiguration() WordCounter.declareOutputFields() WordNormalizer.declareOutputFields() WordReader.declareOutputFields(OutputFieldsDeclarer declarer) filepath:/D:/git/cloud/storm/target/classes/wordsFile.dic WordNormalizer.prepare() WordReader.open(Map conf, TopologyContext context, SpoutOutputCollector collector) WordReader.activate() WordReader.nextTuple(),emits time:0 WordReader.nextTuple(),emits time:1 WordReader.nextTuple(),emits time:2 WordNormalizer.execute() WordNormalizer.execute() WordNormalizer.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordReader.ack(Object msgId): WordReader.ack(Object msgId):A Storm Cluster can run many topologies at the same time WordReader.ack(Object msgId):Topology consists of spouts and bolts WordReader.ack(Object msgId):This is the first topology example a: 1 cluster: 1 storm: 1 topology: 2 bolts: 1 this: 1 run: 1 is: 1 many: 1 topologies: 1 example: 1 the: 2 can: 1 at: 1 same: 1 and: 1 of: 1 consists: 1 spouts: 1 time: 1 first: 1 WordCounter.cleanup() WordNormalizer.cleanup() WordReader.close()
从输出结果中,我们可以看到,单词个数的确被准确的统计出来了。而且从打印的日志中,我们也可以看到Spout和Bolt方法被调用的顺序。
Spout实例方法被调用的顺序为:getComponentConfiguration()-->declareOutputFields----->open-->active-->nextTuple(死循环)-->close。
我们可以类比J2EE中Servlet生命周期方法,把这个几个方法看做是一个IRichSpout实例生命周期方法。所谓声明周期方法,指的是,一个类实例从创建到销毁时,必定会执行的方法,而且有着一定的顺序。ack方法、fail方法、deactive等方法,因为不是Spout生命周期方法,所以并不一定会执行,根据设置,有些情况下会执行,有些情况下不会执行。(注意Storm官方并没有Spout生命周期这种说法,本人是为了方便读者理解,进行了对比)。
类似的,我们可以找出IRichBolt实例的生命周期方法为:getComponentConfiguration-->declareOutputFields()-->execute(执行多次)-->cleanup。prepare方法并不是生命周期方法。为什么这么说?我们可以看到在打印的日志中,WordNormalizer.prepare()执行了,但是WordCounter的prepare方法并没有执行。通常情况下,如果一个IRichBolt如果没有声明任何输出,即declareOutputFields方法返回的是null,则prepare方法不会执行。在案例中,Toplogy中数据流向的最后一个WordCounter因为不要发送数据给其他Bolt了,declareOutputFields返回的结果就是null,因此prepare方法没执行。分析这个有用吗?其实还是有点用的,因为prepare方法的名字很具有诱惑性,我们通常认为可以在这个方法中做一些初始化操作。不过我们现在已经理解了,如果是最后一个Bolt,这个代码是不会被调用的。
最后我们要注意的是,IRichSpout的active方法必须是在Spout数据流向经过的所有Bolt都实例化完成之后,才会被调用。举例来说:
对于这张图中,Spout1必须要在Bolt1、Bolt2、Bolt3、Bolt4都准备好的情况下,active方法才会被调用;而Spout2,只需要Bolt3准备好了之后,就会被调用。
在下一节中,我们将讲解如何以远程模式运行WordCountApp。