package me.cqc.hadoop.rpc;
import org.apache.hadoop.ipc.ProtocolSignature;
import java.io.IOException;
public class ClientProtocolImpl implements ClientProtocol {
@Override
public int add(int a, int b) {
System.out.printf("client request add method. a = %d, b = %d \n", a, b);
return a + b;
}
@Override
public long getProtocolVersion(String s, long l) throws IOException {
return versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
return new ProtocolSignature(versionID, null);
}
}
package me.cqc.hadoop.rpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
import java.net.InetSocketAddress;
public class Client {
public static void main(String[] args) throws IOException {
int port = 13073;
String address = "xxx.xxx.xxx.xxx";
Configuration conf = new Configuration();
ClientProtocol client = RPC.getProxy(ClientProtocol.class
, ClientProtocol.versionID
, new InetSocketAddress(address, port)
, conf);
System.out.println(client.add(1,2));
}
}
package me.cqc.hadoop.rpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
import java.net.InetAddress;
public class Server {
public static void main(String[] args) throws IOException {
String address = InetAddress.getLocalHost().getHostAddress();
int port = 13073;
Configuration conf = new Configuration();
System.out.printf("server starting, address : %s, port : %d \n", address, port);
RPC.Server server = new RPC.Builder(conf)
.setProtocol(ClientProtocol.class)
.setInstance(new ClientProtocolImpl())
.setBindAddress(address)
.setPort(port)
.setNumHandlers(1)
.build();
server.start();
System.out.println("server started");
}
}
package me.cqc.hadoop.rpc;
import org.apache.hadoop.ipc.VersionedProtocol;
import java.io.IOException;
public interface ClientProtocol extends VersionedProtocol{
long versionID = 1L; //这个字段名称不能改,框架底层被限定了
int add(int a, int b) throws IOException;
}