springboot~jgroups实现节点间的通讯
Posted 敢于对过去告一个段落,才有信心掀开新的篇章!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot~jgroups实现节点间的通讯相关的知识,希望对你有一定的参考价值。
JGroups概念
在 JGroups 中,集群(cluster)是一个由多个节点组成的逻辑实体,节点可以通过一个共享的集群名称来进行连接和通信。这个集群名称可以在配置中指定或在运行时动态创建。
JGroups 提供了多种方式来创建集群,并使节点能够加入到相同的集群中。下面是一些常见的方式:
-
静态配置:通过配置文件指定集群的名称和成员节点。你可以使用 XML 或属性文件定义一个静态的集群配置,其中包含集群名称和成员节点的信息。然后,在应用程序中加载该配置文件,节点将根据配置文件中的信息自动加入到指定的集群。
-
动态发现:使用动态发现机制,使节点能够自动发现并加入集群。这通常涉及使用一种外部的发现协议或服务来获取集群成员的信息。例如,可以使用 TCP、UDP、DNS 或者其他的发现协议来获取集群成员的 IP 地址和端口号,并将其作为 JGroups 的动态发现机制的输入,使节点能够加入到相应的集群。
-
编程方式:通过编程方式,动态创建和管理集群。你可以在应用程序中使用 JGroups 提供的 API 来创建一个新的集群,并将节点加入到该集群中。这种方式允许你根据特定的业务逻辑和需求来灵活管理集群。
在上面提到的代码示例中,channel.connect("myCluster")
是使用编程方式连接到名为 "myCluster" 的集群。这意味着节点将尝试加入到具有该名称的集群中。如果该集群不存在,则会自动创建一个新的集群,并将该节点作为第一个成员加入。
具体来说,在运行应用程序时,你可以在不同的节点上执行相同的代码,它们将尝试连接到相同的集群(使用相同的集群名称)。这样,多个节点就可以通过 JGroups 进行通信和协调。
需要注意的是,集群的配置和创建方式取决于你的具体需求和环境。你可以根据 JGroups 的文档和示例来选择合适的方法,并根据你的需求进行配置和实现。
实现
如果你希望在一个由多个 Spring Boot 实例组成的 Web 集群中,使用 JGroups 进行节点间通信,是可以实现的。
JGroups 是一个开源的 Java 库,用于构建群集通信系统。它提供了一种可靠的组播(multicast)和单播(unicast)通信机制,用于在集群中的节点之间进行通信和协调。
使用 JGroups,你可以在 Spring Boot 应用程序中集成它,实现节点间的通知和消息传递。下面是一个简单的示例代码,展示如何使用 JGroups 发送和接收消息:
首先,添加 JGroups 依赖项到你的项目中(可以使用 Maven 或 Gradle 进行依赖管理):
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
<version>4.2.6.Final</version>
</dependency>
然后,在 Spring Boot 应用程序中配置和使用 JGroups:
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class JGroupsExample
private JChannel channel;
@PostConstruct
public void init() throws Exception
channel = new JChannel(); // 创建 JGroups 通道
// 设置 ReceiverAdapter 作为消息接收器
channel.setReceiver(new ReceiverAdapter()
public void receive(Message msg)
System.out.println("Received message: " + msg.getObject());
);
channel.connect("myCluster"); // 连接到指定的集群名称
public void sendMessage(String message) throws Exception
Message msg = new Message(null, message); // 创建消息
channel.send(msg); // 发送消息
在上面的示例中,我们创建了一个名为 JGroupsExample
的 Spring 组件。在 @PostConstruct
方法中,我们创建了一个 JGroups 通道,并设置了一个 ReceiverAdapter
作为消息接收器。然后,通过调用 channel.connect("myCluster")
连接到指定的集群(使用名称 "myCluster")。最后,我们定义了一个 sendMessage()
方法来发送消息。
你可以在你的应用程序中使用 JGroupsExample
组件来发送和接收消息。通过调用 sendMessage()
方法,你可以发送消息到集群中的其他节点,并在 ReceiverAdapter
的 receive()
方法中处理接收到的消息。
请注意,JGroups 还提供了其他高级功能,如可靠性保证、分布式状态传输等。你可以根据需要进一步探索和配置 JGroups 的功能。
总结来说,使用 JGroups 可以在 Spring Boot 集群中实现节点间的通知和消息传递。它提供了一种灵活且可靠的通信机制,适用于构建分布式系统和群集应用程序。
JGroups通过xml的方式静态配置集群
通过 XML 静态配置 JGroups 集群,你可以创建一个 XML 配置文件,其中包含集群的名称、协议栈配置和成员节点信息。以下是一个示例:
<!-- jgroups-config.xml -->
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
<TCP bind_addr="localhost" bind_port="7800" />
<TCPPING initial_hosts="localhost[7800],localhost[7801],localhost[7802]" />
<MERGE3 />
<FD_SOCK />
<FD_ALL />
<VERIFY_SUSPECT />
<pbcast.NAKACK2 />
<UNICAST3 />
<pbcast.STABLE />
<pbcast.GMS />
<UFC />
<MFC />
<FRAG2 />
<SEQUENCER />
<STATE_TRANSFER />
</config>
在上述示例中,我们创建了一个名为 jgroups-config.xml
的配置文件。它使用 JGroups 的 XML 命名空间和相应的架构位置。
在 <config>
元素中,我们定义了一系列 JGroups 协议栈组件,这些组件构成了 JGroups 的通信协议栈。具体来说,示例中包含了 TCP、TCPPING、MERGE3、FD_SOCK、FD_ALL、VERIFY_SUSPECT、pbcast.NAKACK2、UNICAST3、pbcast.STABLE、pbcast.GMS、UFC、MFC、FRAG2、SEQUENCER、STATE_TRANSFER 等组件。
其中,<TCP>
元素指定了 TCP 传输协议的配置,<TCPPING>
元素定义了初始成员节点的信息,<pbcast.GMS>
元素处理成员节点的管理等等。
你可以根据需要调整和配置这些协议栈组件,以满足你的集群需求。更多有关 JGroups 配置的详细信息,可以参考 JGroups 官方文档。
在你的 Spring Boot 应用程序中,可以加载这个配置文件并应用于 JGroups:
import org.jgroups.JChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class JGroupsExample
private JChannel channel;
@PostConstruct
public void init() throws Exception
channel = new JChannel("jgroups-config.xml"); // 加载配置文件创建 JGroups 通道
channel.connect("myCluster"); // 连接到指定的集群名称
// 其他代码...
在上面的示例中,我们在 JGroupsExample
类的 init()
方法中,使用 JChannel
的构造函数加载 jgroups-config.xml
配置文件创建了 JGroups 通道。然后,通过调用 channel.connect("myCluster")
连接到指定的集群。
当应用程序启动时,将会使用指定的配置文件创建 JGroups 通道,并将节点加入到名为 "myCluster" 的集群中。
请确保
在你的项目中正确配置并放置了 jgroups-config.xml
配置文件,并根据你的需求进行适当的调整和配置。
JGroups 入门实践(转)
前言
JGroups是一个开源的纯java编写的可靠的群组通讯工具。其工作模式基于IP多播,但可以在可靠性和群组成员管理上进行扩展。其结构上设计灵活,提供了一种灵活兼容多种协议的协议栈。
JGroups 多线程的方式实现了多个协议之间的协同工作,常见工作线程有心跳检测,诊断等等。
JGroups实现多机器之间的通信一般都会包含维护群组状态、群组通信协议、群组数据可靠性传输这样的一些主题。
JGroups群组的各个节点是存在"管理节点"的,至少可以说某个节点提供了在一段时间内维护状态信息和消息可靠性检测的功能(一般是最先启动的节点)。
目前Jboss、Ecache的分布式缓存是基于Groups通信。
若JGroups通信基于Udp,则可能需要开启机器上UDP相关的设置,比如Open udp。
温馨提示:JGroups各个协议相关的配置文件都可以从JGroups-x.x.x.Final.jar中找到。
JGroups 资料
http://www.jgroups.org/tutorial/index.html(官网)
http://sourceforge.net/projects/javagroups/(JGroups工程&讨论组(Discussion))
JGroups 入门示例
1,节点通信(tcp/ip,udp)方式.
2,通道和消息传送.
3,节点状态同步.
tcp/ip与udp协议
通常我们都知道tcp和udp最大的区别在于可靠性,tcp是基于可靠连接的传输,udp则属非连接,具体可参考百度百科(http://baike.baidu.com/view/1161229.htm?fr=aladdin)。
JGroups当中,udp是比较推荐的通信方式,其特点是不需要知道另一个节点的ip,通过多播网络发现就可以“找到”相应的节点,而tcp则需要在配置文件中固定配置。
示例代码(之后的测试基于tcp,因为不同机器的测试由于udp端口的问题未成功)
tcp配置文件network-tcp.xml
<!-- TCP based stack, with flow control and message bundling. This is usually used when IP multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast). Note that TCP.bind_addr and TCPPING.initial_hosts should be set, possibly via system properties, e.g. -Djgroups.bind_addr=192.168.5.2 and -Djgroups.tcpping.initial_hosts=192.168.5.2[7800] author: Bela Ban --> <config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.3.xsd"> <TCP bind_addr="192.168.19.112" bind_port="7800" loopback="false" recv_buf_size="${tcp.recv_buf_size:5M}" send_buf_size="${tcp.send_buf_size:640K}" max_bundle_size="64K" max_bundle_timeout="30" use_send_queues="true" sock_conn_timeout="300" timer_type="new3" timer.min_threads="4" timer.max_threads="10" timer.keep_alive_time="3000" timer.queue_max_size="500" thread_pool.enabled="true" thread_pool.min_threads="1" thread_pool.max_threads="10" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="false" thread_pool.queue_max_size="100" thread_pool.rejection_policy="discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="1" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="5000" oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="100" oob_thread_pool.rejection_policy="discard"/> <TCPPING timeout="3000" initial_hosts="${jgroups.tcpping.initial_hosts:192.168.19.112[7800],192.168.19.112[7801]}" port_range="1" num_initial_members="10"/> <MERGE2 min_interval="10000" max_interval="30000"/> <FD_SOCK/> <FD timeout="3000" max_tries="3" /> <VERIFY_SUSPECT timeout="1500" /> <BARRIER /> <pbcast.NAKACK2 use_mcast_xmit="false" discard_delivered_msgs="true"/> <UNICAST3 /> <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="4M"/> <pbcast.GMS print_local_addr="true" join_timeout="3000" view_bundling="true"/> <MFC max_credits="2M" min_threshold="0.4"/> <FRAG2 frag_size="60K" /> <!--RSVP resend_interval="2000" timeout="10000"/--> <pbcast.STATE_TRANSFER/> </config>
udp配置文件network-udp.xml
<config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.5.xsd"> <UDP mcast_addr="${jgroups.udp.mcast_addr:235.5.5.5}" mcast_port="${jgroups.udp.mcast_port:45588}" tos="8" ucast_recv_buf_size="20M" ucast_send_buf_size="640K" mcast_recv_buf_size="25M" mcast_send_buf_size="640K" loopback="true" max_bundle_size="64K" max_bundle_timeout="30" ip_ttl="${jgroups.udp.ip_ttl:2}" enable_diagnostics="true" thread_naming_pattern="cl" timer_type="new" timer.min_threads="4" timer.max_threads="10" timer.keep_alive_time="3000" timer.queue_max_size="500" thread_pool.enabled="true" thread_pool.min_threads="2" thread_pool.max_threads="8" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="10000" thread_pool.rejection_policy="discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="1" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="5000" oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="100" oob_thread_pool.rejection_policy="Run"/> <PING timeout="2000" num_initial_members="3"/> <MERGE2 max_interval="30000" min_interval="10000"/> <FD_SOCK/> <FD_ALL/> <VERIFY_SUSPECT timeout="1500" /> <BARRIER /> <pbcast.NAKACK use_mcast_xmit="true" retransmit_timeout="300,600,1200" discard_delivered_msgs="true"/> <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="4M"/> <pbcast.GMS print_local_addr="true" print_physical_addrs="true" join_timeout="3000" view_bundling="true" max_join_attempts="3"/> <UFC max_credits="2M" min_threshold="0.4"/> <MFC max_credits="2M" min_threshold="0.4"/> <FRAG2 frag_size="60K" /> <pbcast.STATE_TRANSFER /> </config>
数据节点Node.java
package org.wit.ff; import java.io.InputStream; import java.io.OutputStream; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; import org.jgroups.Address; import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.ReceiverAdapter; import org.jgroups.View; import org.jgroups.util.Util; /** * * <pre> * 节点. * </pre> * * @author F.Fang * @version $Id: CacheNode.java, v 0.1 2014年10月17日 上午5:27:11 F.Fang Exp $ */ public class Node extends ReceiverAdapter { private final static Logger LOG = Logger.getLogger(Node.class); /** * 配置文件. */ private static final String CONFIG_XML = "network-tcp.xml"; /** * 集群名称. */ private static final String CLUSTER_NAME = "FF"; /** * 节点通道. */ private JChannel channel = null; /** * 以此作为节点间初始化的同步数据. */ private Map<String, String> cacheData = new HashMap<String, String>(); private ReentrantLock lock = new ReentrantLock(); public Node() { InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML); try { channel = new JChannel(is); channel.setReceiver(this); channel.connect(CLUSTER_NAME);
channel.getState(null,50000); } catch (Exception e) { LOG.error("启动节点异常!", e); // 最好是自定义RuntimeException! throw new RuntimeException("启动节点异常!", e); } } /** * * <pre> * 发送消息给目标地址. * </pre> * * @param dest * 为空表示发给所有节点. * @param textMsg * 消息. */ public void sendMsg(Address dest, Object textMsg) { Message msg = new Message(dest, null, textMsg); try { channel.send(msg); } catch (Exception e) { LOG.error("消息发送失败!", e); // 应自定异常,最好是自定义Exception类型! throw new RuntimeException("消息发送失败!", e); } } @Override public void getState(OutputStream output) throws Exception { //cacheData过大可能会造成节点的状态同步时间过长.
lock.lock();
try {
Util.objectToStream(state, new DataOutputStream(output));
}catch(Exception e){
throw e;
}finally{
lock.unlock();
}
} @Override public void receive(Message msg) { //当前节点不接收自己发送到通道当中的消息. if (msg.getSrc().equals(channel.getAddress())) { return; } LOG.info(msg.getObject()); } @Override public void setState(InputStream input) throws Exception { lock.lock(); try { @SuppressWarnings("unchecked") Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(input)); this.cacheData.putAll(cacheData); } catch (Exception e) { LOG.error("从主节点同步状态到当前节点发生异常!", e); } finally { lock.unlock(); } } @Override public void viewAccepted(View view) { LOG.info("当前成员[" + this.channel.getAddressAsString() + "]"); LOG.info(view.getCreator()); LOG.info(view.getMembers());
LOG.info("当前节点数据:" + cacheData);
}
/** * * <pre> * 提供一个简单的初始化数据的方法. * </pre> * */ public void addData(String key,String val){ if(key!=null&&!key.isEmpty()){ cacheData.put(key, val); } } }
实例节点1 Node1.java
package org.wit.ff; import java.util.Scanner; import java.util.concurrent.TimeUnit; import org.wit.ff.Node; /** * * <pre> * tcp模式下: * 如果是同一台机器测试,请注意在 * TCPPING 元素下修改 initial_hosts的配置端口: * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.100[7801]} * 如果是多台机器测试,请注意在 * TCPPING 元素下修改 initial_hosts的ip,端口随意: * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.178[7800]} * * udp模式下: * 同一台机器的不同端口(端口是动态的)可通信. * 不同机器之间的ip多播可能会受到一些因素限制而造成节点之间无法彼此发现. * </pre> * * @author F.Fang * @version $Id: Node1.java, v 0.1 2014年10月15日 上午5:31:32 F.Fang Exp $ */ public class Node1 { public static void main(String[] args) { Node node = new Node(); node.addData("hello", "world"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } // 使用控制台发送消息给Node2. Scanner scanner = new Scanner(System.in); while(true){ String text = scanner.next(); if("exit".equals(text)){ break; } node.sendMsg(null,"hello "+text+",node2!"); } } }
实例节点2 Node2.java
package org.wit.ff; import java.util.Scanner; import java.util.concurrent.TimeUnit; /** * * <pre> * tcp模式下: * 如果是同一台机器测试,请注意在 * TCPPING 元素下修改 initial_hosts的配置端口: * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.100[7801]} * 如果是多台机器测试,请注意在 * TCPPING 元素下修改 initial_hosts的ip,端口随意: * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.178[7800]} * * @author F.Fang * @version $Id: Node2.java, v 0.1 2014年10月15日 上午5:31:44 F.Fang Exp $ */ public class Node2 { public static void main(String[] args) { Node node = new Node(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } // 使用控制台发送消息给Node1. Scanner scanner = new Scanner(System.in); while (true) { String text = scanner.next(); if ("exit".equals(text)) { break; } node.sendMsg(null,"hello " + text + ",node1!"); } } }
测试Case
启动Node1,Node1平稳后启动Node2。
Node1运行信息如下:
DEBUG Configurator - set property TCP.diagnostics_addr to default value /ff0e:0:0:0:0:0:75:75 ------------------------------------------------------------------- GMS: address=DSH07fFang-18185, cluster=FF, physical address=192.168.19.112:7800 ------------------------------------------------------------------- DEBUG NAKACK2 - [DSH07fFang-18185 setDigest()] existing digest: [] new digest: DSH07fFang-18185: [0 (0)] resulting digest: DSH07fFang-18185: [0 (0)] DEBUG GMS - DSH07fFang-18185: installing view [DSH07fFang-18185|0] (1) [DSH07fFang-18185] DEBUG STABLE - resuming message garbage collection DEBUG FD_SOCK - VIEW_CHANGE received: [DSH07fFang-18185] INFO Node - 当前成员[DSH07fFang-18185] INFO Node - DSH07fFang-18185 INFO Node - [DSH07fFang-18185] INFO Node - 当前节点数据:{} DEBUG STABLE - resuming message garbage collection DEBUG GMS - created cluster (first member). My view is [DSH07fFang-18185|0], impl is org.jgroups.protocols.pbcast.CoordGmsImpl DEBUG STABLE - suspending message garbage collection DEBUG STABLE - DSH07fFang-18185: resume task started, max_suspend_time=33000 DEBUG GMS - DSH07fFang-18185: installing view [DSH07fFang-18185|1] (2) [DSH07fFang-18185, DSH07fFang-2882] DEBUG FD_SOCK - VIEW_CHANGE received: [DSH07fFang-18185, DSH07fFang-2882] INFO Node - 当前成员[DSH07fFang-18185] INFO Node - DSH07fFang-18185 INFO Node - [DSH07fFang-18185, DSH07fFang-2882] INFO Node - 当前节点数据:{hello=world} DEBUG FD_SOCK - ping_dest is DSH07fFang-2882, pingable_mbrs=[DSH07fFang-18185, DSH07fFang-2882] DEBUG STABLE - resuming message garbage collection DEBUG FD - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882 DEBUG FD - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882 DEBUG FD - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882 DEBUG FD - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882 DEBUG FD - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882 DEBUG FD - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
主要包括ip通信信息、状态、心跳等等。
Node2运行消息如下:
DEBUG Configurator - set property TCP.diagnostics_addr to default value /ff0e:0:0:0:0:0:75:75 ------------------------------------------------------------------- GMS: address=DSH07fFang-2882, cluster=FF, physical address=192.168.19.112:7801 ------------------------------------------------------------------- DEBUG GMS - DSH07fFang-2882: sending JOIN(DSH07fFang-2882) to DSH07fFang-18185 DEBUG NAKACK2 - [DSH07fFang-2882 setDigest()] existing digest: [] new digest: DSH07fFang-18185: [0 (0)], DSH07fFang-2882: [0 (0)] resulting digest: DSH07fFang-18185: [0 (0)], DSH07fFang-2882: [0 (0)] DEBUG GMS - DSH07fFang-2882: installing view [DSH07fFang-18185|1] (2) [DSH07fFang-18185, DSH07fFang-2882] DEBUG FD_SOCK - VIEW_CHANGE received: [DSH07fFang-18185, DSH07fFang-2882] INFO Node - 当前成员[DSH07fFang-2882] INFO Node - DSH07fFang-18185 INFO Node - [DSH07fFang-18185, DSH07fFang-2882] INFO Node - 当前节点数据:{hello=world}
DEBUG FD_SOCK - ping_dest is DSH07fFang-18185, pingable_mbrs=[DSH07fFang-18185, DSH07fFang-2882] DEBUG FD - DSH07fFang-2882: sending are-you-alive msg to DSH07fFang-18185 DEBUG FD - DSH07fFang-2882: sending are-you-alive msg to DSH07fFang-18185
节点之间存在通信和状态同步,可以通过控制台输入发送消息的命令观察节点变化。
以上是关于springboot~jgroups实现节点间的通讯的主要内容,如果未能解决你的问题,请参考以下文章