Netty + ZooKeeper 实现简单的服务注册与发现

Posted Java与Android技术栈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty + ZooKeeper 实现简单的服务注册与发现相关的知识,希望对你有一定的参考价值。


一. 背景

最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。

二. Netty 的使用

在接收到派单任务之后,通过 Netty 推送到指定门店相关的设备。在我们的系统中 Netty 实现了消息推送、长连接以及心跳机制。


2.1 Netty Server 端:

每个 Netty 服务端通过 ConcurrentHashMap 保存了客户端的 clientId 以及它连接的 SocketChannel。

服务器端向客户端发送消息时,只要获取 clientId 对应的 SocketChannel,往 SocketChannel 里写入相应的 message 即可。

 
   
   
 
  1. EventLoopGroup boss = new NioEventLoopGroup(1);

  2. EventLoopGroup worker = new NioEventLoopGroup();

  3. ServerBootstrap bootstrap = new ServerBootstrap();

  4. bootstrap.group(boss, worker)

  5. .channel(NioserverSocketChannel.class)

  6. .option(ChannelOption.SO_BACKLOG, 128)

  7. .option(ChannelOption.TCP_NODELAY, true)

  8. .childOption(ChannelOption.SO_KEEPALIVE, true)

  9. .childHandler(new ChannelInitializer() {

  10. @Override

  11. protected void initChannel(Channel channel) throws Exception {

  12. ChannelPipeline p = channel.pipeline();

  13. p.addLast(new MessageEncoder());

  14. p.addLast(new MessageDecoder());

  15. p.addLast(new PushServerHandler());

  16. }

  17. });


  18. ChannelFuture future = bootstrap.bind(host,port).sync();

  19. if (future.isSuccess()) {

  20. logger.info("server start...");

  21. }

2.2 Netty Client 端:

客户端用于接收服务端的消息,随即进行业务处理。客户端还有心跳机制,它通过 IdleEvent 事件定时向服务端放送 Ping 消息以此来检测 SocketChannel 是否中断。

 
   
   
 
  1. public PushClientBootstrap(String host, int port) throws InterruptedException {


  2. this.host = host;

  3. this.port = port;


  4. start(host,port);

  5. }


  6. private void start(String host, int port) throws InterruptedException {


  7. bootstrap = new Bootstrap();

  8. bootstrap.channel(NioSocketChannel.class)

  9. .option(ChannelOption.SO_KEEPALIVE, true)

  10. .group(workGroup)

  11. .remoteAddress(host, port)

  12. .handler(new ChannelInitializer(){


  13. @Override

  14. protected void initChannel(Channel channel) throws Exception {

  15. ChannelPipeline p = channel.pipeline();

  16. p.addLast(new IdleStateHandler(20, 10, 0)); // IdleStateHandler 用于检测心跳

  17. p.addLast(new MessageDecoder());

  18. p.addLast(new MessageEncoder());

  19. p.addLast(new PushClientHandler());

  20. }

  21. });

  22. doConnect(port, host);

  23. }


  24. /**

  25. * 建立连接,并且可以实现自动重连.

  26. * @param port port.

  27. * @param host host.

  28. * @throws InterruptedException InterruptedException.

  29. */

  30. private void doConnect(int port, String host) throws InterruptedException {


  31. if (socketChannel != null && socketChannel.isActive()) {

  32. return;

  33. }


  34. final int portConnect = port;

  35. final String hostConnect = host;


  36. ChannelFuture future = bootstrap.connect(host, port);


  37. future.addListener(new ChannelFutureListener() {


  38. @Override

  39. public void operationComplete(ChannelFuture futureListener) throws Exception {

  40. if (futureListener.isSuccess()) {

  41. socketChannel = (SocketChannel) futureListener.channel();

  42. logger.info("Connect to server successfully!");

  43. } else {

  44. logger.info("Failed to connect to server, try connect after 10s");


  45. futureListener.channel().eventLoop().schedule(new Runnable() {

  46. @Override

  47. public void run() {

  48. try {

  49. doConnect(portConnect, hostConnect);

  50. } catch (InterruptedException e) {

  51. e.printStackTrace();

  52. }

  53. }

  54. }, 10, TimeUnit.SECONDS);

  55. }

  56. }

  57. }).sync();

  58. }

三. 借助 ZooKeeper 实现简单的服务注册与发现

3.1 服务注册

在我们项目中采用了 ZooKeeper 实现服务注册。

 
   
   
 
  1. public class ServiceRegistry {


  2. private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);


  3. private CountDownLatch latch = new CountDownLatch(1);


  4. private String registryAddress;


  5. public ServiceRegistry(String registryAddress) {

  6. this.registryAddress = registryAddress;

  7. }


  8. public void register(String data) {

  9. if (data != null) {

  10. ZooKeeper zk = connectServer();

  11. if (zk != null) {

  12. createNode(zk, data);

  13. }

  14. }

  15. }


  16. /**

  17. * 连接 zookeeper 服务器

  18. * @return

  19. */

  20. private ZooKeeper connectServer() {

  21. ZooKeeper zk = null;

  22. try {

  23. zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {

  24. @Override

  25. public void process(WatchedEvent event) {

  26. if (event.getState() == Event.KeeperState.SyncConnected) {

  27. latch.countDown();

  28. }

  29. }

  30. });

  31. latch.await();

  32. } catch (IOException | InterruptedException e) {

  33. logger.error("", e);

  34. }

  35. return zk;

  36. }


  37. /**

  38. * 创建节点

  39. * @param zk

  40. * @param data

  41. */

  42. private void createNode(ZooKeeper zk, String data) {

  43. try {

  44. byte[] bytes = data.getBytes();

  45. String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

  46. logger.debug("create zookeeper node ({} => {})", path, data);

  47. } catch (KeeperException | InterruptedException e) {

  48. logger.error("", e);

  49. }

  50. }

  51. }

有了服务注册,在 Netty 服务端启动之后,将 Netty 服务端的 ip 和 port 注册到 ZooKeeper。

 
   
   
 
  1. EventLoopGroup boss = new NioEventLoopGroup(1);

  2. EventLoopGroup worker = new NioEventLoopGroup();

  3. ServerBootstrap bootstrap = new ServerBootstrap();

  4. bootstrap.group(boss, worker)

  5. .channel(NioServerSocketChannel.class)

  6. .option(ChannelOption.SO_BACKLOG, 128)

  7. .option(ChannelOption.TCP_NODELAY, true)

  8. .childOption(ChannelOption.SO_KEEPALIVE, true)

  9. .childHandler(new ChannelInitializer() {

  10. @Override

  11. protected void initChannel(Channel channel) throws Exception {

  12. ChannelPipeline p = channel.pipeline();

  13. p.addLast(new MessageEncoder());

  14. p.addLast(new MessageDecoder());

  15. p.addLast(new PushServerHandler());

  16. }

  17. });


  18. ChannelFuture future = bootstrap.bind(host,port).sync();

  19. if (future.isSuccess()) {

  20. logger.info("server start...");

  21. }


  22. if (serviceRegistry != null) {

  23. serviceRegistry.register(host + ":" + port);

  24. }

3.2 服务发现

这里我们采用的是客户端的服务发现,即服务发现机制由客户端实现。

 
   
   
 
  1. public class ServiceDiscovery {


  2. private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);


  3. private CountDownLatch latch = new CountDownLatch(1);


  4. private volatile List<String> serviceAddressList = new ArrayList<>();



  5. public ServiceDiscovery(String registryAddress) {

  6. this.registryAddress = registryAddress;


  7. ZooKeeper zk = connectServer();

  8. if (zk != null) {

  9. watchNode(zk);

  10. }

  11. }


  12. /**

  13. * @return

  14. */

  15. public String discover() {

  16. String data = null;

  17. int size = serviceAddressList.size();

  18. if (size > 0) {

  19. if (size == 1) { //只有一个服务提供方

  20. data = serviceAddressList.get(0);

  21. logger.info("unique service address : {}", data);

  22. } else { //使用随机分配法。简单的负载均衡法

  23. data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));

  24. logger.info("choose an address : {}", data);

  25. }

  26. }

  27. return data;

  28. }


  29. /**

  30. * 连接 zookeeper

  31. * @return

  32. */

  33. private ZooKeeper connectServer() {


  34. ZooKeeper zk = null;

  35. try {

  36. zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {

  37. @Override

  38. public void process(WatchedEvent event) {

  39. if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {

  40. latch.countDown();

  41. }

  42. }

  43. });

  44. latch.await();

  45. } catch (IOException | InterruptedException e) {

  46. logger.error("", e);

  47. }

  48. return zk;

  49. }


  50. /**

  51. * @param zk

  52. */

  53. private void watchNode(final ZooKeeper zk) {


  54. try {

  55. //获取子节点列表

  56. List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {

  57. @Override

  58. public void process(WatchedEvent event) {

  59. if (event.getType() == Event.EventType.NodeChildrenChanged) {

  60. watchNode(zk);

  61. }

  62. }

  63. });

  64. List<String> dataList = new ArrayList<>();

  65. for (String node : nodeList) {

  66. byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false, null);

  67. dataList.add(new String(bytes));

  68. }

  69. logger.debug("node data: {}", dataList);

  70. this.serviceAddressList = dataList;

  71. } catch (KeeperException | InterruptedException e) {

  72. logger.error("", e);

  73. }

  74. }

  75. }

Netty 客户端启动之后,通过服务发现获取 Netty 服务端的 ip 和 port。

 
   
   
 
  1. /**

  2. * 支持通过服务发现来获取 Socket 服务端的 host、port

  3. * @param discoveryAddress

  4. * @throws InterruptedException

  5. */

  6. public PushClientBootstrap(String discoveryAddress) throws InterruptedException {


  7. serviceDiscovery = new ServiceDiscovery(discoveryAddress);

  8. serverAddress = serviceDiscovery.discover();


  9. if (serverAddress!=null) {

  10. String[] array = serverAddress.split(":");

  11. if (array!=null && array.length==2) {


  12. String host = array[0];

  13. int port = Integer.parseInt(array[1]);


  14. start(host,port);

  15. }

  16. }

  17. }

四. 总结

服务注册和发现一直是分布式的核心组件。本文介绍了借助 ZooKeeper 做注册中心,如何实现一个简单的服务注册和发现。其实,注册中心的选择有很多,例如 Etcd、Eureka 等等。选择符合我们业务需求的才是最重要的。


关注【Java与Android技术栈】

更多精彩内容请关注扫码


以上是关于Netty + ZooKeeper 实现简单的服务注册与发现的主要内容,如果未能解决你的问题,请参考以下文章

Netty入门介绍

轻量级RPC设计与实现第二版

7.6 服务远程暴露 - 注册服务到zookeeper

阿里云服务器部署zookeeper集群+kafka集群

NIO Netty 视频教程免费下载

实现分布式服务注册及简易的netty聊天