Storm DRPC
Posted 真诚的程序员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm DRPC相关的知识,希望对你有一定的参考价值。
DRPC简介
DRPC是为了实现并发的RPC架构而实现的,其中D就是Distributed,利用Storm分布式、并发的能力,实现RPC的高性能。
DRPC架构
DRPC的架构如图:
- 客户端:用来发起DRPC的调用
- DRPC Server:实现与客户端的对接,传递参数给Storm,返回结果给客户端。
- DPRCSpout: 用于连接DRPC Server和Topology,传递参数给Topology。
- Topology:实现实际的函数功能。
- ReturnResults:用于连接DRPC Server和Topology,用于返回参数给DRPC Server。
流程描述如下:
- 客户端发送函数的参数给DRPC Server
- DRPC Server生成发送函数调用的相关信息给DRPC Spout,包括请求ID,请求参数,返回结果的信息。
- DRPC Spout发送[“id”, “request”]给Topology的第一个Bolt,其中id代表请求ID,request代表请求参数。
- Topology最后一个Bolt发送[“id”, “result”]给ReturnResults,其中id代表请求ID,result代表返回结果
- ReturnResults将请求ID和结果传递给DRPC Server
- DRPC Server将结果返回给DRPC客户端。
注意:Topology中emit的tuple,第一个field必须是”id”,值为请求ID。
我们只需要实现Topology部分就可以了。
Storm DPRC API介绍
我们先看一下DRPC客户端的API:
DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");
先构建一个DRPCClient对象,代表DRPC客户端。第一个参数是主机地址,第二个参数为端口号。
然后用这个client调用execute函数,远程执行函数。execute第一个参数表示函数名称,第二个参数为函数参数。
我们再来看一下DRPC服务器端的API:
本地模式的DRPC:
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
cluster.shutdown();
drpc.shutdown();
- LinearDRPCTopologyBuilder用来生成一个builder,参数用来指定函数的名字,也就是你在客户端调用execute时,指定的函数名字。用builder来构建topology
- LocalDRPC用来构建一个本地的DRPC Server。
- LocalCluster用来构建本地的集群
- builder.createLocalTopology(drpc)时需要指定一个DRPC Server,用来让Storm知道DRPC Server的信息。
远程模式的DRPC:
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
远程模式的DRPC与本地模式的DRPC不同之处在于:
- 远程模式DRPC不需要模拟DRPC Server,而是通过在真实的Storm集群中配置DRPC Server来完成
- 远程模式通过调用builder的createRemoteTopology方法来构建topology。
Trident DRPC API介绍
- Trident DRPC的客户端和普通Storm的DRPC客户端一样
- Trident DRPC服务端API介绍
TridentTopology topology = new TridentTopology();
topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
先用TridentTopology构造一个topology,然后调用newDRPCStream来构造一个DRPC流,参数是函数名字。后面的内容和普通Trident API一样。
以上是关于Storm DRPC的主要内容,如果未能解决你的问题,请参考以下文章