Storm 累加拓扑示例

创建Spout发送递增数字数列

private static class NumSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private int num = 1;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

        this.collector = collector;
    }

    @Override
    public void nextTuple() {

        collector.emit(new Values(num));
        System.err.println("+ " + num);
        Utils.sleep(1000);
        num++;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("num"));
    }
}

创建Bolt负责计算累加结果

    private static class SumBolt extends BaseRichBolt {

        // private OutputCollector collector;

        private int sum = 0;
        @Override
        public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {

            // this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            //int number = input.getInteger(0);
            int number = input.getIntegerByField("num");
            sum += number;
            // collector.emit(new Values(sum));
            System.err.println("= " + sum);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

            // declarer.declare(new Fields("sum"));
        }
    }

本地运行

本地运行完整代码

public class MySumTopology {

    public static void main throws Exception (String[] args) {

        Config config = new Config();
        config.setNumWorkers(1);
        config.setDebug(true);

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("num-spout", new NumSpout());
        topologyBuilder.setBolt("sum-bolt", new SumBolt()).shuffleGrouping("num-spout");

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("sum-topo", config, topologyBuilder.createTopology());
        // 运行1分钟停止
        Utils.sleep(60000);
        localCluster.shutdown();
    }
}

保证数据可靠处理

  1. Spout在使用nextTuple()方法发送数据时需要传入消息ID
    @Override
    public void nextTuple() {
        collector.emit(new Values(num), num);
        ...
    }
  1. Bolt中execute()方法中标记tuple是否处理成功
  • 处理成功 collector.ack(input)

  • 处理失败collector.fail(input)

    注意: ackfail方法需要锚定到发射过来的tuple上.

    private static class SumBolt extends BaseRichBolt {

        private OutputCollector collector;
        private Random rand;

        private int sum = 0;

        @Override
        public void execute(Tuple input) {

            if (rand.nextInt(10) < 2) { 
                collector.fail(input); // 20%几率 处理失败 
                System.err.println("!搞错了");
                return;
            }

            int number = input.getIntegerByField("num");
            sum += number;
            // collector.emit(new Values(sum));
            System.err.println("= " + sum);
            collector.ack(input); // 处理成功: Acknowledge
        }
    ...
  1. Spout中对处理失败的元组触发回调

    这里把处理失败的数字重新发射出去

    @Override
    public void fail(Object msgId) {
        int oldNum = num - 1;
        collector.emit(new Values(oldNum), msgId);
        System.out.println("重来 + " + oldNum);
        Utils.sleep(1000);
    }

远程运行模式

如果是需要提交到集群, 需要使用StormSubmitter替换new LocalCluster()来调用submitTopology(..)方法

// local mode运行 - LocalCluster
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("sum-topo", config, builder.createTopology());

// remote mode - run on a production cluster
// StormSubmitter.submitTopology("sum-topo",config,builder.createTopology());

然后打成jar包上传到集群运行

Views: 206