storm1.1.0 drpc 部署和调用测试

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm1.1.0 drpc 部署和调用测试相关的知识,希望对你有一定的参考价值。

一、配置集群storm.yaml文件,配置drpc.server

技术分享

 技术分享

 

二、部署到linux上,开启nimbus,drpc,supervisor 等服务 

/opt/module/storm-1.1.0/bin/storm nimbus &

/opt/module/storm-1.1.0/bin/storm drpc &

/opt/module/storm-1.1.0/bin/storm supervisor &

/opt/module/storm-1.1.0/bin/storm ui &

 

三、编写DrpcTopology程序。如下:

技术分享
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.eyecool.framework.olive.compute;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.utils.Utils;

import com.eyecool.framework.olive.compute.bolt.ExclamationBolt;

public class XFaceTopologyTest {

    public static void main(String[] args) throws Exception {
        run(args);
    }

    public static String spout_name = "raw-spout";

    protected static int run(String[] args) throws Exception {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("lookup");
        builder.addBolt(new ExclamationBolt(),3);
        Config conf = new Config();
        conf.setDebug(false);
        conf.setNumWorkers(1);
        if (args != null && args.length > 0) {
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
        } else {
            LocalDRPC drpc = new LocalDRPC();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("drpc-XFace", conf, builder.createLocalTopology(drpc));
            System.out.println("Results for ‘hello‘:" + drpc.execute("lookup", "hello"));
            System.out.println("Results for ‘hello‘:" + drpc.execute("lookup", "hello12"));

            Utils.sleep(1000000000);
            cluster.killTopology("drpc-XFace");
            cluster.shutdown();
        }
        return 0;
    }
}
View Code
技术分享
package com.eyecool.framework.olive.compute.bolt;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class ExclamationBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "return-info"));
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        Object arg = tuple.getValue(0);
        String retInfo = tuple.getString(1);
        System.out.println("v0: "+arg +" v1: "+retInfo);
        collector.emit(new Values(arg, retInfo + "!!!"));
    }

}
View Code

 

 

四、提交执行任务

技术分享

/opt/module/storm-1.1.0/bin/storm jar olive-computeservice-storm-drpc-1.0.0-jar-with-dependencies.jar com.eyecool.framework.olive.compute.XFaceTopologyTest olive

 

五、页面可以看到系统成功运行

技术分享

 

 

六、客户端代码调用测试,常遇到以下错误:

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:108)
at org.apache.storm.security.auth.ThriftClient.<init>(ThriftClient.java:69)
at org.apache.storm.utils.DRPCClient.<init>(DRPCClient.java:44)
at org.apache.storm.utils.DRPCClient.<init>(DRPCClient.java:39)
at ClientTest.main(ClientTest.java:16)
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
at org.apache.storm.security.auth.AuthUtils.GetTransportPlugin(AuthUtils.java:267)
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:89)
... 4 more
Caused by: java.lang.NullPointerException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.storm.security.auth.AuthUtils.GetTransportPlugin(AuthUtils.java:263)
... 5 more

技术分享

 

技术分享
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.utils.DRPCClient;
import org.apache.storm.utils.Utils;

public class ClientTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try {
            Config conf = new Config();
            conf.setDebug(false);
            Map config = Utils.readDefaultConfig();
            @SuppressWarnings("resource")
            DRPCClient client = new DRPCClient(conf, "192.168.0.188", 3772);// drpc
            String result = client.execute("lookup", "hello world ");// 调用drpcTest函数,传递参数为hello
        
            System.out.println(result);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
View Code

 

正确的读取storm yaml 默认配置文件

Map config = Utils.readDefaultConfig();

 技术分享

 

 就这么简单,完成!!!

 















以上是关于storm1.1.0 drpc 部署和调用测试的主要内容,如果未能解决你的问题,请参考以下文章

2017.4.7 storm1.0.3

如何从 PHP 调用 DRPC Storm?

Storm DRPC

Trident中的DRPC实现

Storm入门Twitter Storm: DRPC简介

风暴中的 DRPC 服务器错误