使用 H2 数据库服务器如何通知客户端更改(JMS 消息传递)

Posted

技术标签:

【中文标题】使用 H2 数据库服务器如何通知客户端更改(JMS 消息传递)【英文标题】:Using H2 database server how to notify changes to clients (JMS messaging) 【发布时间】:2013-10-10 08:51:29 【问题描述】:

我在 AUTO_SERVER 模式下成功地使用了 H2 数据库,以便在网络上的多个桌面客户端之间透明地共享数据库文件。 这样,在客户端和从 tcp 服务器读取的所有其他客户端中选出了一个服务器。

我缺少的是客户端或服务器如何通知所有其他桌面客户端数据库中的某些内容已更改。 现在我正在使用 JGroups 通道让所有客户端相互通信,但这是另一个故障点,也是另一个与 H2 并行运行的领导者选举算法。

没有其他方法吗? 我已经阅读了一些数据库支持的 JMS(Java 消息服务 Java API)。 H2有什么提示吗?

谢谢

编辑

以下代码是对当前答案的改编,如果我先启动 Sender(将 args 设置为“sender”),他作为服务器连接到 H2 数据库,然后我执行 Receiver(将 args 设置为“receiver”)远程机器,它们作为客户端连接。

然而只有服务器收到通知,客户端什么也没有收到。

从我目前所知道的情况来看,这是有道理的:仅在服务器上调用触发器,在客户端或服务器上调用从客户端或服务器调用的用户定义函数,但不会跨连接到客户端的所有客户端(和服务器)调用数据库。

那么有没有办法调整以下内容以通知所有连接的实例数据库发生变化?

import java.io.File;
import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.tools.TriggerAdapter;

public class TestSimpleDB2


    public static void main(String[] args) throws Exception
    
        //final String url = "jdbc:h2:mem:test;multi_threaded=true";
        final String url = "jdbc:h2:" + File.separator + "mnt/testdir/PlanIGS" + File.separator
                + "persondb;create=true;AUTO_SERVER=TRUE;multi_threaded=true";
        Connection conn = DriverManager.getConnection(url);
        Statement stat = conn.createStatement();

        boolean isSender = false;
        args = new String[]
        
            "sender"
        ;
        for (String arg : args)
        
            if (arg.contains("receiver"))
            
                System.out.println("receiver starting");
                isSender = false;
            
            else if (arg.contains("sender"))
            
                System.out.println("sender starting");
                isSender = true;
            
        

        if (isSender)
        
            stat.execute("create alias wait_for_change for \""
                    + TestSimpleDB2.class.getName()
                    + ".waitForChange\"");
            stat.execute("create table test(id identity)");
            stat.execute("create trigger notifier "
                    + "before insert, update, delete, rollback "
                    + "on test call \""
                    + TestSimpleDB2.Notifier.class.getName() + "\"");

            Thread.sleep(1000);
            for (int i = 0; i < 10; i++)
            
                System.out.println("Sender: I change something...");
                stat.execute("insert into test values(null)");
                Thread.sleep(2000);
            
        
        else
        
            new Thread()
            
                public void run()
                
                    try
                    
                        Connection conn = DriverManager.getConnection(url);
                        for (int i = 0; i < 10; i++)
                        
                            conn.createStatement().execute(
                                    "call wait_for_change(100000)");
                            System.out.println("Receiver: event received");
                        
                    
                    catch (Exception e)
                    
                        e.printStackTrace();
                    
                
            .start();
        
        conn.close();
    

    static AtomicLong modCount = new AtomicLong();

    public static void waitForChange(long maxWaitMillis)
    
        synchronized (modCount)
        
            try
            
                modCount.wait(maxWaitMillis);
            
            catch (InterruptedException e)
            
                // ignore
            
        
    

    public static class Notifier extends TriggerAdapter
    

        public void fire(Connection conn, ResultSet oldRow, ResultSet newRow)
                throws SQLException
        
            modCount.incrementAndGet();
            synchronized (modCount)
            
                modCount.notifyAll();
            
        
    

【问题讨论】:

您确定所有客户端都连接到同一个数据库吗?我可能会用create=false 替换create=true 以确保这一点。另外,您能检查一下文件persondb.lock.db 中的内容吗?它是一个纯文本文件,应该包含服务器地址和一个随机密钥(以及一些其他数据)。 【参考方案1】:

H2 没有实现 JMS(事实上我不知道有哪个数据库可以实现)。但是,您可以在 H2 中构建一个简单的通知机制,使用触发器和用户定义的函数,如下所示。请注意,这需要 H2 中的多线程模式,该模式尚未完全测试。因此,使用单独的数据库进行消息传递而不是用于数据的数据库可能更有意义。

import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.tools.TriggerAdapter;

public class TestSimpleDb 

    public static void main(String[] args) throws Exception 
        final String url = "jdbc:h2:mem:test;multi_threaded=true";
        Connection conn = DriverManager.getConnection(url);
        Statement stat = conn.createStatement();
        stat.execute("create alias wait_for_change for \"" + 
                TestSimpleDb.class.getName() + 
                ".waitForChange\"");
        stat.execute("create table test(id identity)");
        stat.execute("create trigger notifier " + 
                "before insert, update, delete, rollback " +
                "on test call \"" + 
                TestSimpleDb.Notifier.class.getName() + "\"");
        new Thread() 
            public void run() 
                try 
                    Connection conn = DriverManager.getConnection(url);
                    for (int i = 0; i < 10; i++) 
                        conn.createStatement().execute(
                                "call wait_for_change(10000)");
                        System.out.println("Receiver: event received");
                    
                 catch (Exception e) 
                    e.printStackTrace();
                
            
        .start();
        Thread.sleep(500);
        for (int i = 0; i < 10; i++) 
            System.out.println("Sender: I change something...");
            stat.execute("insert into test values(null)");
            Thread.sleep(1000);
        
        conn.close();
    

    static AtomicLong modCount = new AtomicLong();

    public static void waitForChange(long maxWaitMillis) 
        synchronized (modCount) 
            try 
                modCount.wait(maxWaitMillis);
             catch (InterruptedException e) 
                // ignore
            
        
    

    public static class Notifier extends TriggerAdapter 
        public void fire(Connection conn, ResultSet oldRow, ResultSet newRow)
                throws SQLException 
            modCount.incrementAndGet();
            synchronized (modCount) 
                modCount.notifyAll();
            
        
    


【讨论】:

您的代码看起来特别酷,但是 AtomicLong modCount 上的同步让我认为它只能在同一个 JVM 内工作,而不能跨在不同计算机上运行的 JVM 共享网络驱动器上的数据库。你能确认或否认吗?谢谢 你是对的,这只适用于同一个JVM。但是,这不是问题,因为数据库和触发器以及用户定义的函数 都在同一个 JVM 中运行。如果使用AUTO_SERVER模式,那么触发器和用户定义的函数都运行在服务器中(在首先打开数据库的进程中)。 嘿等等..但你是 H2 数据库的创建者!!既然你写信给我,我觉得自己是个摇滚明星:) 无论如何,我无法弄清楚为什么在远程客户端上没有调用 Notifier.fire() 上面的代码(我想使用 newRow 和 oldRow 来交换消息) .我为此 ow.ly/pIOMN 创建了一个单独的问题。谢谢

以上是关于使用 H2 数据库服务器如何通知客户端更改(JMS 消息传递)的主要内容,如果未能解决你的问题,请参考以下文章

HornetQ JMS 主题到主题桥

如何创建一个通知服务器,在数据库更改时通知 Delphi 应用程序?

如何使用 redis PUBLISH/SUBSCRIBE 和 nodejs 在数据值更改时通知客户端?

如何使用 redis PUBLISH/SUBSCRIBE 和 nodejs 在数据值更改时通知客户端?

如何检测 JMS 主题连接丢失

H2数据库用户