创建Spout发送递增数字数列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
private static class NumSpout extends BaseRichSpout { private SpoutOutputCollector collector; private int num = 1; @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { collector.emit(new Values(num)); System.err.println("+ " + num); Utils.sleep(1000); num++; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } } |
创建Bolt负责计算累加结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
private static class SumBolt extends BaseRichBolt { // private OutputCollector collector; private int sum = 0; @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { // this.collector = collector; } @Override public void execute(Tuple input) { //int number = input.getInteger(0); int number = input.getIntegerByField("num"); sum += number; // collector.emit(new Values(sum)); System.err.println("= " + sum); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // declarer.declare(new Fields("sum")); } } |
本地运行
本地运行完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public class MySumTopology { public static void main throws Exception (String[] args) { Config config = new Config(); config.setNumWorkers(1); config.setDebug(true); TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("num-spout", new NumSpout()); topologyBuilder.setBolt("sum-bolt", new SumBolt()).shuffleGrouping("num-spout"); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("sum-topo", config, topologyBuilder.createTopology()); // 运行1分钟停止 Utils.sleep(60000); localCluster.shutdown(); } } |
保证数据可靠处理
- Spout在使用nextTuple()方法发送数据时需要传入消息ID
1 2 3 4 5 |
@Override public void nextTuple() { collector.emit(new Values(num), num); ... } |
- Bolt中execute()方法中标记tuple是否处理成功
-
处理成功
collector.ack(input)
-
处理失败
collector.fail(input)
注意:
ack
和fail
方法需要锚定到发射过来的tuple上.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
private static class SumBolt extends BaseRichBolt { private OutputCollector collector; private Random rand; private int sum = 0; @Override public void execute(Tuple input) { if (rand.nextInt(10) < 2) { collector.fail(input); // 20%几率 处理失败 System.err.println("!搞错了"); return; } int number = input.getIntegerByField("num"); sum += number; // collector.emit(new Values(sum)); System.err.println("= " + sum); collector.ack(input); // 处理成功: Acknowledge } ... |
-
Spout中对处理失败的元组触发回调
这里把处理失败的数字重新发射出去
1 2 3 4 5 6 7 |
@Override public void fail(Object msgId) { int oldNum = num - 1; collector.emit(new Values(oldNum), msgId); System.out.println("重来 + " + oldNum); Utils.sleep(1000); } |
远程运行模式
如果是需要提交到集群, 需要使用StormSubmitter
替换new LocalCluster()
来调用submitTopology(..)
方法
1 2 3 4 5 6 |
// local mode运行 - LocalCluster LocalCluster cluster = new LocalCluster(); cluster.submitTopology("sum-topo", config, builder.createTopology()); // remote mode - run on a production cluster // StormSubmitter.submitTopology("sum-topo",config,builder.createTopology()); |
然后打成jar包上传到集群运行
Views: 206