创建Spout发送递增数字数列
private static class NumSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int num = 1;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
collector.emit(new Values(num));
System.err.println("+ " + num);
Utils.sleep(1000);
num++;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}
}
创建Bolt负责计算累加结果
private static class SumBolt extends BaseRichBolt {
// private OutputCollector collector;
private int sum = 0;
@Override
public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {
// this.collector = collector;
}
@Override
public void execute(Tuple input) {
//int number = input.getInteger(0);
int number = input.getIntegerByField("num");
sum += number;
// collector.emit(new Values(sum));
System.err.println("= " + sum);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declarer.declare(new Fields("sum"));
}
}
本地运行
本地运行完整代码
public class MySumTopology {
public static void main throws Exception (String[] args) {
Config config = new Config();
config.setNumWorkers(1);
config.setDebug(true);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("num-spout", new NumSpout());
topologyBuilder.setBolt("sum-bolt", new SumBolt()).shuffleGrouping("num-spout");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("sum-topo", config, topologyBuilder.createTopology());
// 运行1分钟停止
Utils.sleep(60000);
localCluster.shutdown();
}
}
保证数据可靠处理
- Spout在使用nextTuple()方法发送数据时需要传入消息ID
@Override
public void nextTuple() {
collector.emit(new Values(num), num);
...
}
- Bolt中execute()方法中标记tuple是否处理成功
-
处理成功
collector.ack(input) -
处理失败
collector.fail(input)注意:
ack和fail方法需要锚定到发射过来的tuple上.
private static class SumBolt extends BaseRichBolt {
private OutputCollector collector;
private Random rand;
private int sum = 0;
@Override
public void execute(Tuple input) {
if (rand.nextInt(10) < 2) {
collector.fail(input); // 20%几率 处理失败
System.err.println("!搞错了");
return;
}
int number = input.getIntegerByField("num");
sum += number;
// collector.emit(new Values(sum));
System.err.println("= " + sum);
collector.ack(input); // 处理成功: Acknowledge
}
...
-
Spout中对处理失败的元组触发回调
这里把处理失败的数字重新发射出去
@Override
public void fail(Object msgId) {
int oldNum = num - 1;
collector.emit(new Values(oldNum), msgId);
System.out.println("重来 + " + oldNum);
Utils.sleep(1000);
}
远程运行模式
如果是需要提交到集群, 需要使用StormSubmitter替换new LocalCluster()来调用submitTopology(..)方法
// local mode运行 - LocalCluster
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("sum-topo", config, builder.createTopology());
// remote mode - run on a production cluster
// StormSubmitter.submitTopology("sum-topo",config,builder.createTopology());
然后打成jar包上传到集群运行
Views: 206
