Storm DRPC

Posted 真诚的程序员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm DRPC相关的知识,希望对你有一定的参考价值。

DRPC简介

DRPC是为了实现并发的RPC架构而实现的,其中D就是Distributed,利用Storm分布式、并发的能力,实现RPC的高性能。

DRPC架构

DRPC的架构如图:

  • 客户端:用来发起DRPC的调用
  • DRPC Server:实现与客户端的对接,传递参数给Storm,返回结果给客户端。
  • DPRCSpout: 用于连接DRPC Server和Topology,传递参数给Topology。
  • Topology:实现实际的函数功能。
  • ReturnResults:用于连接DRPC Server和Topology,用于返回参数给DRPC Server。
    流程描述如下:
    1. 客户端发送函数的参数给DRPC Server
    2. DRPC Server生成发送函数调用的相关信息给DRPC Spout,包括请求ID,请求参数,返回结果的信息。
    3. DRPC Spout发送[“id”, “request”]给Topology的第一个Bolt,其中id代表请求ID,request代表请求参数。
    4. Topology最后一个Bolt发送[“id”, “result”]给ReturnResults,其中id代表请求ID,result代表返回结果
    5. ReturnResults将请求ID和结果传递给DRPC Server
    6. DRPC Server将结果返回给DRPC客户端。

注意:Topology中emit的tuple,第一个field必须是”id”,值为请求ID。

我们只需要实现Topology部分就可以了。

Storm DPRC API介绍

我们先看一下DRPC客户端的API:

    DRPCClient client = new DRPCClient("drpc-host", 3772);
    String result = client.execute("reach", "http://twitter.com");

先构建一个DRPCClient对象,代表DRPC客户端。第一个参数是主机地址,第二个参数为端口号。
然后用这个client调用execute函数,远程执行函数。execute第一个参数表示函数名称,第二个参数为函数参数。

我们再来看一下DRPC服务器端的API:

本地模式的DRPC:
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);

    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();

    cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

    System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));

    cluster.shutdown();
    drpc.shutdown();
  • LinearDRPCTopologyBuilder用来生成一个builder,参数用来指定函数的名字,也就是你在客户端调用execute时,指定的函数名字。用builder来构建topology
  • LocalDRPC用来构建一个本地的DRPC Server。
  • LocalCluster用来构建本地的集群
  • builder.createLocalTopology(drpc)时需要指定一个DRPC Server,用来让Storm知道DRPC Server的信息。
远程模式的DRPC:
    StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

远程模式的DRPC与本地模式的DRPC不同之处在于:

  • 远程模式DRPC不需要模拟DRPC Server,而是通过在真实的Storm集群中配置DRPC Server来完成
  • 远程模式通过调用builder的createRemoteTopology方法来构建topology。

Trident DRPC API介绍

  • Trident DRPC的客户端和普通Storm的DRPC客户端一样
  • Trident DRPC服务端API介绍
    TridentTopology topology = new TridentTopology();
    topology.newDRPCStream("words")
      .each(new Fields("args"), new Split(), new Fields("word"))
      .groupBy(new Fields("word"))
      .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
      .each(new Fields("count"), new FilterNull())
      .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

先用TridentTopology构造一个topology,然后调用newDRPCStream来构造一个DRPC流,参数是函数名字。后面的内容和普通Trident API一样。

以上是关于Storm DRPC的主要内容,如果未能解决你的问题,请参考以下文章

storm1.1.0 drpc 部署和调用测试

如何从 PHP 调用 DRPC Storm?

Trident中的DRPC实现

风暴中的 DRPC 服务器错误

.NET Core 现已支持DRPC,同时带来Apache Thrift

Storm实践:集群搭建