分布式远程过程调用
所谓过程调用就是指方法调用, 讲解分布式远程过程调用前先看一看什么是普通方法调用和远程过程调用.
- 普通方法调用
以一个普通Java类中的方法为例:
// 方法定义
public String exclamation(String arg) {
return arg + "!";
}
// 方法调用
instance.exclamation("hi"); // hi!
- 远程过程调用
RPC = Remote Procedure Call(远程过程调用)
RPC即在一台机器以远程的方式调用另外一台机器中的应用中的功能(方法或函数)
Storm中使用的Thrift就是一个RPC框架, Thrift由C++编写,但可以支持很多语言。要创建一个Thrift服务,必须写一些Thrift文件来描述它,为目标语言生成代码,并且写一些代码来启动服务器及从客户端调用它。其实Topology 的定义就是一个 Thrift 结构,并且 Nimbus 就是一个 Thrift 服务,你可以提交任务语言创建的Topology到Nimbus。
- 分布式远程过程调用
DRPC, 即分布式RPC(distributed RPC,DRPC),即远程调用的结果是通过集群中的并行计算返回的.
Storm中的DRPC只是通过以下基本元素streams,spout,bolt,topology衍生的一个模式, 算不上Storm的主要特性。Storm的DRPC可以单独作为一个独立于Storm的库发布,但由于其重要性还是和Storm捆绑在了一起。
Storm的DRPC就是每一次函数调用都是通过DRPC客户端以请求的形式提交给集群中的DRPC服务器, 然后在提交的DRPC拓扑中寻找到对应的函数在集群中进行并行计算, DRPC拓扑接收调用函数的参数信息作为输入流,并将计算结果作为输出流发射出去,最终返回给DRPC客户端。
总体概述
DRPC通过DRPC Server来实现,DRPC Server的整体工作过程如下:
- 接收到一个RPC调用请求
- 发送请求到Storm上的拓扑
- 从Storm上接收计算结果
- 将计算结果返回给客户端

- Client向DRPC Server发送被调用执行的DRPC函数名称及参数
- Storm上的topology通过DRPCSpout实现这一函数,从DPRC Server接收到函数调用流
- DRPC Server会为每次函数调用生成唯一的id
- Storm上运行的topology开始计算结果,最后通过一个ReturnResults的Bolt连接到DRPC Server,发送指定id的计算结果
- DRPC Server通过使用之前为每个函数调用生成的id,将结果关联到对应的发起调用的client,将计算结果返回给client
LinearDRPCTopologyBuilder
Storm提供了一个topology builder——LinearDRPCTopologyBuilder,它可以自动完成几乎所有的DRPC步骤。包括:
- 构建spout;
- 向DRPC Server返回结果;
- 为Bolt提供函数用于对tuples进行聚集。
下面是一个简单的例子,这个DRPC拓扑只是简单的在输入参数后追加"!"后返回:
public static class ExclaimBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
// ...
}
由上述例子可见,我们只需很少的工作即可完成拓扑。当创建LinearDRPCTopologyBuilder的时候,需要指定拓扑中DRPC函数的名称exclamation。一个DRPC Server可以协调多个函数,每个函数有不同的函数名称。拓扑中的第一个bolt的输入是两个字段:第一个是请求的id;第二个是请求的参数。
LinearDRPCTopologyBuilder同时需要最后一个bolt发射一个包含两个字段的输出流:第一个字段是请求id;第二个字段是计算结果。因此,所有的中间tuple必须包含请求作为第一个字段。id
例子中,ExclaimBolt在输入tuple的第二个字段后面追加"!",LinearDRPCTopologyBuilder负责处理其余的协调工作:与DRPC Server建立连接,发送结果给DRPC Server。
Local Mode DRPC
DRPC可以以本地模式运行,下面的代码是如何在本地模式运行上面的例子:
package storm.example.drpc;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* @Author: deLucia
* @Version: 1.0
* @Description: Locally DRPC
* Program arguments, eg.: drpc drpc-fun 1 2 3 one two three
*/
public class BasicDRPCTopologyLocally {
// id : request id
// result: request parameter
public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
Object id = tuple.getValue(0);
String input = tuple.getString(1);
collector.emit(new Values(id, input + "!"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
String topoName = "DRPCExample";
String function = "exclamation";
if (args != null) {
if (args.length > 0) {
topoName = args[0];
}
if (args.length > 1) {
function = args[1];
}
}
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(function);
builder.addBolt(new ExclaimBolt(), 2);
conf.setNumWorkers(1);
// LOCAL DRPC
LocalDRPC localDRPC = new LocalDRPC();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topoName, conf, builder.createLocalTopology(localDRPC));
if (args != null && args.length > 2) {
// DRPC Client
for (int i = 2; i < args.length; i++) {
String word = args[i];
System.out.println("Result for \"" + word + "\": " + localDRPC.execute(function, word));
}
}
localCluster.shutdown();
localDRPC.shutdown();
}
}
Remote Mode DRPC
在实际的Storm集群上运行DRPC也一样很简单。只需完成以下步骤:
-
配置DRPC Server(s)地址:
drpc.servers: - "hadoop000" drpc.http.port: 8081 #storm.thrift.transport: "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin" nimbus.thrift.port: 16627注意
nimbus.thrift.port的默认端口为6627, 这里改成了16627 -
启动DRPC Server(s)
$ bin/storm drpc -
向Storm集群提交DRPC拓扑。
$ bin/storm jar ...
首先,通过storm脚本启动DRPC Server:
然后,在Storm集群中配置DRPC Server地址,这就是DRPCSpout读取函数调用请求的地方。这一步的配置可以通过storm.yaml文件或者拓扑的配置来完成。通过storm.yaml文件的配置方式如下:`
drpc.servers:
- "drpc1.foo.com"
- "drpc2.foo.com"
最后,通过StormSubmitter启动拓扑。为了以远程模式运行上面的例子,代码如下:DRPC
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology被用于为Storm集群创建合适的拓扑。
package storm.example.drpc;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.DRPCClient;
/**
* @Author: deLucia
* @Version: 1.0
* @Description: Upload to Storm Cluster
*/
public class BasicDRPCTopologyRemotely {
public static void main(String[] args) throws Exception {
Config conf = new Config();
String topoName = "DRPCExample";
String function = "exclamation";
if (args != null) {
if (args.length > 0) {
topoName = args[0];
}
if (args.length > 1) {
function = args[1];
}
}
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(function);
builder.addBolt(new ExclaimBolt(), 2);
conf.setNumWorkers(1);
// REMOTE DPRC
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, builder.createRemoteTopology());
if (args != null && args.length > 2) {
// REMOTE DRPC
try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) {
for (int i = 2; i < args.length; i++) {
String word = args[i];
System.out.println("Result for \"" + word + "\": " + drpc.execute(function, word));
}
}
}
}
public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
}
发起DRPC请求的三种方式
要向DRPC服务器提交请求有三种方式:
- 服务器上通过 shell 命令调用
语法:bin/storm drpc-client function-name function-args - 通过http的方式发起Get请求 http://hostname:drpc-http-port/drpc/function-name/function-args, 如果是在浏览器的地址栏输入则结果将直接显示在页面上(注意一些特殊字符需要urlencode)
- 客户端程序远程调用(3772为drpc server默认的端口号):
DRPCClient client = new DRPCClient("hostname", 3772);
Views: 383
