安装maven
官方示例位于storm安装文件夹下面example下的storm-starter下
安装maven3.6 (3.5也可)
配置环境变量 vi ~/.bash_profile
#MAVEN
export M2_HOME=/home/hadoop/app/apache-maven-3.6.3
PATH=$M2_HOME/bin:$PATH
修改配置
/conf/settings.xml
/home/hadoop/app/apache-maven-repo/
org.mortbay.jetty
alimaven
aliyun maven
http://maven.aliyun.com/nexus/content/groups/public/
central
clojars
clojar-maven
https://mirrors.tuna.tsinghua.edu.cn/clojars/
clojars
jdk-1.4
1.4
alimaven
aliyun maven
http://maven.aliyun.com/nexus/content/groups/public/
true
false
clojars
https://mirrors.tuna.tsinghua.edu.cn/clojars/
注意:
默认的本地仓库(localRepository)是在~/.m2/repository/
也可以根据需要自定义在合适的位置。
编译打包starter项目
编译前,如果你的虚拟机可用内存小于4G,需要首先修改一下文件的并行度,避免虚拟机资源不足.
starter项目根目录/src/jvm/org.apache.storm.starter.WordCountTopology

在storm-starter下面执行
mvn clean
此时会启动依赖下载过程,根据网络情况可能十几分钟到半个小时...
mvn package -Dmaven.test.skip=true
一定要忽略测试过程,不然一定会报错
找到build出来的胖包(体积最大的那个)

运行项目
首先运行集群,保证相应的进程都开启

本地模式运行
[hadoop@hadoop00 target]$ storm local original-storm-starter-2.1.0.jar org.apache.storm.starter.WordCountTopology WCTopology

上传到集群运行
[hadoop@hadoop00 target]$ storm jar original-storm-starter-2.1.0.jar org.apache.storm.starter.WordCountTopology WCTopology

过一会显示上传成功


查看UI界面
查看UI

由于开启了logviewer点击worker端口号可以查看日志(如果没有开启日志则打开链接显示无法访问)

如果关闭了logviewer, 则ui界面中nimbus日志不能看
日志分析
要注意的是:
分配worker进程数量, 日志中显示的slots就是worker进程数, 这里是2个
分配executor线程, 此时注意每一个线程里面都有两个数字[m,n] 代表这个线程里面会执行n-m+1个任务, 编号分别为n~m
一个是6个任务线程, 任务线程编号可以从后面的日志分析出来分别是51-56

Spout Executor只有1个, 任务号为11

Bolt Executor有10个
其中
split 开启2个线程, 一共4个任务, 因此1个线程分配有2个任务
count 开启3个线程, 一共6个任务, 因此1个线程分配有2个任务

比如这里是分词bolt的日志

统计bolt的日志

代码分析
package org.apache.storm.starter;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.starter.spout.RandomSentenceSpout;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.ConfigurableTopology;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class WordCountTopology extends ConfigurableTopology {
public static void main(String[] args) throws Exception {
ConfigurableTopology.start(new WordCountTopology(), args);
}
@Override
protected int run(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
conf.setDebug(true);
String topologyName = "word-count";
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
return submit(topologyName, conf, builder);
}
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt {
Map counts = new HashMap();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
}

问题解决
运行storm-starter项目的WordCountTopplogy实例出现如下报错
java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read. Serializer Exception: Traceback (most recent call last): File "splitsentence.py", line 16, in import storm ImportError: No module named storm
解决方案
下载storm.py 放在目录 /apache-storm-2.1.0/examples/storm-starter/multilang/resources/下面 , 修改WordCountTopology.java的主函数如下:
public static void main(String[] args) throws Exception {
SplitSentence pythonSplit = new SplitSentence();
Map env = new HashMap();
env.put("PYTHONPATH", "/apache-storm-2.1.0/examples/storm-starter/multilang/resources/");
pythonSplit.setEnv(env);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split",pythonSplit, 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(600000);
cluster.shutdown();
}
}
修改完上述文件后, maven构建报错
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.0.0:checdate) on project storm-starter: You have 5 Checkstyle violations. -> [Help 1]
解决方案:
修改pom.xml, 放开代码风格检查插件的限制即可
org.apache.maven.plugins
maven-checkstyle-plugin
100
Views: 364
