3-使用storm-starter测试集群

安装maven

官方示例位于storm安装文件夹下面example下的storm-starter下

安装maven3.6 (3.5也可)

下载地址

配置环境变量 vi ~/.bash_profile

#MAVEN
export M2_HOME=/home/hadoop/app/apache-maven-3.6.3
PATH=$M2_HOME/bin:$PATH

修改配置

/conf/settings.xml



  /home/hadoop/app/apache-maven-repo/
  
    org.mortbay.jetty
  
  
  
  
    
      alimaven
      aliyun maven
      http://maven.aliyun.com/nexus/content/groups/public/
      central
    
    
      clojars
      clojar-maven
      
      https://mirrors.tuna.tsinghua.edu.cn/clojars/
      clojars
    
  
  
  
    jdk-1.4
    
      1.4
    
    
      
        alimaven
        aliyun maven
        http://maven.aliyun.com/nexus/content/groups/public/
        
          true
        
        
          false
        
      
      
        clojars
        https://mirrors.tuna.tsinghua.edu.cn/clojars/
      
    
  
  

注意:
默认的本地仓库(localRepository)是在~/.m2/repository/
也可以根据需要自定义在合适的位置。

编译打包starter项目

编译前,如果你的虚拟机可用内存小于4G,需要首先修改一下文件的并行度,避免虚拟机资源不足.

starter项目根目录/src/jvm/org.apache.storm.starter.WordCountTopology

image-20200915002404862

在storm-starter下面执行

mvn clean

此时会启动依赖下载过程,根据网络情况可能十几分钟到半个小时...

mvn package -Dmaven.test.skip=true

一定要忽略测试过程,不然一定会报错

找到build出来的胖包(体积最大的那个)

image-20200915002448201

运行项目

首先运行集群,保证相应的进程都开启

image-20200915002535892

本地模式运行

[hadoop@hadoop00 target]$ storm local original-storm-starter-2.1.0.jar org.apache.storm.starter.WordCountTopology WCTopology

file

上传到集群运行

[hadoop@hadoop00 target]$ storm jar original-storm-starter-2.1.0.jar org.apache.storm.starter.WordCountTopology WCTopology

image-20200915002609935

过一会显示上传成功

image-20200915002623430

image-20200915002637640

查看UI界面

查看UI

file

由于开启了logviewer点击worker端口号可以查看日志(如果没有开启日志则打开链接显示无法访问)

file

如果关闭了logviewer, 则ui界面中nimbus日志不能看

日志分析

要注意的是:

分配worker进程数量, 日志中显示的slots就是worker进程数, 这里是2个

分配executor线程, 此时注意每一个线程里面都有两个数字[m,n] 代表这个线程里面会执行n-m+1个任务, 编号分别为n~m

一个是6个任务线程, 任务线程编号可以从后面的日志分析出来分别是51-56

img

Spout Executor只有1个, 任务号为11

img

Bolt Executor有10个

其中

split 开启2个线程, 一共4个任务, 因此1个线程分配有2个任务

count 开启3个线程, 一共6个任务, 因此1个线程分配有2个任务

img

比如这里是分词bolt的日志

img

统计bolt的日志

img

代码分析

package org.apache.storm.starter;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.starter.spout.RandomSentenceSpout;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.ConfigurableTopology;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
  * This topology demonstrates Storm's stream groupings and multilang
  * capabilities.
  */

public class WordCountTopology extends ConfigurableTopology {
    public static void main(String[] args) throws Exception {
        ConfigurableTopology.start(new WordCountTopology(), args);
    }

    @Override
    protected int run(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 5);
        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
        conf.setDebug(true);
        String topologyName = "word-count";
        conf.setNumWorkers(3);

        if (args != null && args.length > 0) {
            topologyName = args[0];
        }

        return submit(topologyName, conf, builder);

    }

    public static class SplitSentence extends ShellBolt implements IRichBolt {

        public SplitSentence() {
            super("python", "splitsentence.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map getComponentConfiguration() {
            return null;
        }
    }

    public static class WordCount extends BaseBasicBolt {
        Map counts = new HashMap();

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) {
                count = 0;
            }
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }
}

file

问题解决

运行storm-starter项目的WordCountTopplogy实例出现如下报错

java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read. Serializer Exception: Traceback (most recent call last): File "splitsentence.py", line 16, in import storm ImportError: No module named storm

解决方案

下载storm.py 放在目录 /apache-storm-2.1.0/examples/storm-starter/multilang/resources/下面 , 修改WordCountTopology.java的主函数如下:

public static void main(String[] args) throws Exception {

    SplitSentence pythonSplit = new SplitSentence();
    Map env = new HashMap();
    env.put("PYTHONPATH", "/apache-storm-2.1.0/examples/storm-starter/multilang/resources/");
    pythonSplit.setEnv(env);

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new RandomSentenceSpout(), 5);

    builder.setBolt("split",pythonSplit, 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

      Thread.sleep(600000);

      cluster.shutdown();
    }
  }

修改完上述文件后, maven构建报错

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.0.0:checdate) on project storm-starter: You have 5 Checkstyle violations. -> [Help 1]

解决方案:

修改pom.xml, 放开代码风格检查插件的限制即可


  org.apache.maven.plugins
  maven-checkstyle-plugin
  
  
      100
  

Views: 364

Index