安装maven
官方示例位于storm安装文件夹下面example下的storm-starter下
安装maven3.6 (3.5也可)
配置环境变量 vi ~/.bash_profile
1 2 3 |
#MAVEN export M2_HOME=/home/hadoop/app/apache-maven-3.6.3 PATH=$M2_HOME/bin:$PATH |
修改配置
/conf/settings.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
<?xml version="1.0" encoding="UTF-8"?> <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"> <localRepository>/home/hadoop/app/apache-maven-repo/</localRepository> <pluginGroups> <pluginGroup>org.mortbay.jetty</pluginGroup> </pluginGroups> <proxies></proxies> <servers></servers> <mirrors> <mirror> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>clojars</id> <name>clojar-maven</name> <!--url>http://clojars.org/repo/</url--> <url>https://mirrors.tuna.tsinghua.edu.cn/clojars/</url> <mirrorOf>clojars</mirrorOf> </mirror> </mirrors> <profiles> <profile> <id>jdk-1.4</id> <activation> <jdk>1.4</jdk> </activation> <repositories> <repository> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>clojars</id> <url>https://mirrors.tuna.tsinghua.edu.cn/clojars/</url> </repository> </repositories> </profile> </profiles> </settings> |
注意:
默认的本地仓库(localRepository)是在~/.m2/repository/
也可以根据需要自定义在合适的位置。
编译打包starter项目
编译前,如果你的虚拟机可用内存小于4G,需要首先修改一下文件的并行度,避免虚拟机资源不足.
starter项目根目录/src/jvm/org.apache.storm.starter.WordCountTopology
在storm-starter下面执行
1 |
mvn clean |
此时会启动依赖下载过程,根据网络情况可能十几分钟到半个小时…
1 |
mvn package -Dmaven.test.skip=true |
一定要忽略测试过程,不然一定会报错
找到build出来的胖包(体积最大的那个)
运行项目
首先运行集群,保证相应的进程都开启
本地模式运行
1 |
[hadoop@hadoop00 target]$ storm local original-storm-starter-2.1.0.jar org.apache.storm.starter.WordCountTopology WCTopology |
上传到集群运行
1 |
[hadoop@hadoop00 target]$ storm jar original-storm-starter-2.1.0.jar org.apache.storm.starter.WordCountTopology WCTopology |
过一会显示上传成功
查看UI界面
查看UI
由于开启了logviewer点击worker端口号可以查看日志(如果没有开启日志则打开链接显示无法访问)
如果关闭了logviewer, 则ui界面中nimbus日志不能看
日志分析
要注意的是:
分配worker进程数量, 日志中显示的slots就是worker进程数, 这里是2个
分配executor线程, 此时注意每一个线程里面都有两个数字[m,n] 代表这个线程里面会执行n-m+1个任务, 编号分别为n~m
一个是6个任务线程, 任务线程编号可以从后面的日志分析出来分别是51-56
Spout Executor只有1个, 任务号为11
Bolt Executor有10个
其中
split 开启2个线程, 一共4个任务, 因此1个线程分配有2个任务
count 开启3个线程, 一共6个任务, 因此1个线程分配有2个任务
比如这里是分词bolt的日志
统计bolt的日志
代码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
package org.apache.storm.starter; import java.util.HashMap; import java.util.Map; import org.apache.storm.starter.spout.RandomSentenceSpout; import org.apache.storm.task.ShellBolt; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.ConfigurableTopology; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * This topology demonstrates Storm's stream groupings and multilang * capabilities. */ public class WordCountTopology extends ConfigurableTopology { public static void main(String[] args) throws Exception { ConfigurableTopology.start(new WordCountTopology(), args); } @Override protected int run(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); conf.setDebug(true); String topologyName = "word-count"; conf.setNumWorkers(3); if (args != null && args.length > 0) { topologyName = args[0]; } return submit(topologyName, conf, builder); } public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } } |
问题解决
运行storm-starter项目的WordCountTopplogy实例出现如下报错
1 |
java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read. Serializer Exception: Traceback (most recent call last): File "splitsentence.py", line 16, in import storm ImportError: No module named storm |
解决方案
下载storm.py 放在目录 /apache-storm-2.1.0/examples/storm-starter/multilang/resources/
下面 , 修改WordCountTopology.java
的主函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
public static void main(String[] args) throws Exception { SplitSentence pythonSplit = new SplitSentence(); Map env = new HashMap(); env.put("PYTHONPATH", "/apache-storm-2.1.0/examples/storm-starter/multilang/resources/"); pythonSplit.setEnv(env); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split",pythonSplit, 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(600000); cluster.shutdown(); } } |
修改完上述文件后, maven构建报错
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.0.0:checdate) on project storm-starter: You have 5 Checkstyle violations. -> [Help 1]
解决方案:
修改pom.xml, 放开代码风格检查插件的限制即可
1 2 3 4 5 6 7 8 |
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> <maxAllowedViolations>100</maxAllowedViolations> </configuration> </plugin> |
Views: 362