ActiveMQ KahaDB 总是锁定和等待

Posted

技术标签:

【中文标题】ActiveMQ KahaDB 总是锁定和等待【英文标题】:ActiveMQ KahaDB always locking and waiting 【发布时间】:2014-05-18 05:25:55 【问题描述】:

我正在尝试在一个相对简单的工作队列用例中使用 ActiveMQ。我有一个队列,并且有一个简单的生产者和消费者。 我的问题是我做错了什么导致数据库锁定? 这是我不断收到的消息:

14/04/05 18:14:13 INFO store.SharedFileLocker: Database activemq-data\localhost\KahaDB\lock is locked... waiting 10 seconds for the database to be unlocked. Reason: java.io.IOException: File 'activemq-data\localhost\KahaDB\lock' could not be locked.

我同时在不同的线程中运行生产者和消费者。 最初,我在类级别有连接,所以我认为这是问题所在,但即使每次调用 put 和 get 从头开始​​创建连接仍然会导致锁定。

我进行了研究,但未能找到解决方案。 我在 Windows 7 上运行 ActiveMQ 5.9.0。

以下是我通过 cmd 提示符启动它时打印出来的内容:

C:\activemq\apache-activemq-5.9.0\bin>activemq
Java Runtime: Oracle Corporation 1.7.0_40 C:\Program Files\Java\jre7
  Heap sizes: current=1005568k  free=995061k  max=1005568k
    JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Dhawtio.realm=activemq -Dhawtio.role=admins -Dhawtio.rolePrincipalCla
vemq.jaas.GroupPrincipal -Djava.security.auth.login.config=C:\activemq\apache-activemq-5.9.0\bin\..\conf\login.config -Dactivemq.classpath=C:\activemq\apache-activemq-5.9.0\bin\..\
che-activemq-5.9.0\bin\../conf;C:\activemq\apache-activemq-5.9.0\bin\../conf; -Dactivemq.home=C:\activemq\apache-activemq-5.9.0\bin\.. -Dactivemq.base=C:\activemq\apache-activemq-5
mq.conf=C:\activemq\apache-activemq-5.9.0\bin\..\conf -Dactivemq.data=C:\activemq\apache-activemq-5.9.0\bin\..\data -Djava.io.tmpdir=C:\activemq\apache-activemq-5.9.0\bin\..\data\t
Extensions classpath:
  [C:\activemq\apache-activemq-5.9.0\bin\..\lib,C:\activemq\apache-activemq-5.9.0\bin\..\lib\camel,C:\activemq\apache-activemq-5.9.0\bin\..\lib\optional,C:\activemq\apache-activemq
b,C:\activemq\apache-activemq-5.9.0\bin\..\lib\extra]
ACTIVEMQ_HOME: C:\activemq\apache-activemq-5.9.0\bin\..
ACTIVEMQ_BASE: C:\activemq\apache-activemq-5.9.0\bin\..
ACTIVEMQ_CONF: C:\activemq\apache-activemq-5.9.0\bin\..\conf
ACTIVEMQ_DATA: C:\activemq\apache-activemq-5.9.0\bin\..\data
Loading message broker from: xbean:activemq.xml
 INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@5bf2a8f5: startup date [Sat Apr 05 17:42:42 EDT 2014]; root of context hierarchy
 INFO | PListStore:[C:\activemq\apache-activemq-5.9.0\bin\..\data\localhost\tmp_storage] started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\activemq\apache-activemq-5.9.0\bin\..\data\kahadb]
 INFO | KahaDB is version 5
 INFO | Recovering from the journal ...
 INFO | Recovery replayed 6935 operations from the journal in 0.416 seconds.
 INFO | Apache ActiveMQ 5.9.0 (localhost, ID:Owner-PC-49614-1396734165637-0:1) is starting
 INFO | Listening for connections at: tcp://Owner-PC:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector openwire started
 INFO | Listening for connections at: amqp://Owner-PC:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector amqp started
 INFO | Listening for connections at: stomp://Owner-PC:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector stomp started
 INFO | Listening for connections at: mqtt://Owner-PC:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector mqtt started
 INFO | Listening for connections at ws://Owner-PC:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector ws started
 INFO | Apache ActiveMQ 5.9.0 (localhost, ID:Owner-PC-49614-1396734165637-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 INFO | Welcome to hawtio 1.2-M23 : http://hawt.io/ : Don't cha wish your console was hawt like me? ;-)
 INFO | Starting hawtio authentication filter, JAAS realm: "activemq" authorized role: "admins" role principal classes: "org.apache.activemq.jaas.GroupPrincipal"
 INFO | Using file upload directory: C:\activemq\apache-activemq-5.9.0\bin\..\data\tmp\uploads
 INFO | jolokia-agent: Using access restrictor classpath:/jolokia-access.xml
 INFO | ActiveMQ WebConsole available at http://localhost:8161/
 INFO | Initializing Spring FrameworkServlet 'dispatcher'

这是我将队列实现为 ActiveMQ 的 java(我有一个队列接口,这是一个 Impl)

import com.google.gson.Gson;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQImpl implements Queue, ExceptionListener 

  private String host;
  private String user;
  private String pw;

  public void init() 
  

  public void close()  
  

  public Message get() 
    Message outMessage = null;
    ActiveMQConnectionFactory connectionFactory = null;
    Connection connection = null;  
    try 
      if (connection == null) 
        connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

        // Create a Connection
        connection = connectionFactory.createConnection();
      
      connection.start();

      // Create a Session
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      // Create the destination (Topic or Queue)
      Destination destination = session.createQueue("work");
      // Create a MessageConsumer from the Session to the Topic or Queue
      MessageConsumer consumer = session.createConsumer(destination);

      // Wait for a message
      javax.jms.Message message = consumer.receive(1000);

      if (message instanceof TextMessage) 
        TextMessage textMessage = (TextMessage) message;
        String text = textMessage.getText();
        outMessage = new Gson().fromJson(text, Message.class);
        //   System.out.println("Received: " + text);
       else 
        // System.out.println("Received: " + message);
      

      consumer.close();
      //  session.commit();
      session.close();
      //connection.close();
     catch (Exception e) 
      System.out.println("Caught: " + e);
      e.printStackTrace();
    
    return outMessage;
  

  public void put(Message inMessage) 
    try 
      ActiveMQConnectionFactory connectionFactory = null;


      Connection prodConnection = null;
      connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

      prodConnection = connectionFactory.createConnection();

      prodConnection.start();

      Session session = prodConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      Destination destination = session.createQueue("work");

      MessageProducer producer = session.createProducer(destination);
      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
      producer.setTimeToLive(60000);
      // Create a messages
      String text = inMessage.toString();
      TextMessage message = session.createTextMessage(text);

      // Tell the producer to send the message
      System.out.println("Sent message: " + text);
      producer.send(message);
      producer.close();
      // session.commit();
      session.close();
      prodConnection.close();
     catch (Exception e) 
      System.out.println("Caught: " + e);
      onException(null);
      e.printStackTrace();
    
  

  public void onException(JMSException jmse) 
    //send this to the error channel object... 
    System.out.println(jmse);
  

  public void put(Set<Message> messages) 
    try 
  ActiveMQConnectionFactory connectionFactory=null;
  Connection connection=null;

  Connection prodConnection=null;
      if (connection == null) 
        connectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.useAsyncSend=true");

        // Create a Connection
        connection = connectionFactory.createConnection("admin", "admin");
      

      connection.start();

      // Create a Session
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      // Create the destination (Topic or Queue)
      Destination destination = session.createQueue("work");
      // Create a MessageProducer from the Session to the Topic or Queue
      MessageProducer producer = session.createProducer(destination);
      producer.setDeliveryMode(DeliveryMode.PERSISTENT);

      // Create a messages
      for (Message inMessage : messages) 
        String text = inMessage.toString();
        TextMessage message = session.createTextMessage(text);

        // Tell the producer to send the message
        System.out.println("Sent message: " + text);
        producer.send(message);
      
      producer.close();
      // session.commit();
      session.close();
      //connection.close();
     catch (Exception e) 
      System.out.println("Caught: " + e);
      onException(null);
      e.printStackTrace();
    
  


这里是生产者和消费者(简单的调试类)

public class Producer 

  public static void main(String[] args) 
    Queue q = QueueFactory.create(QueueType.ACTIVEMQ);
    try 
      for (int i = 0; i < 10; i++) 

        q.put(new Message("testimpl" + i, "whatever", i));
        Thread.sleep(1000);

      
     catch (InterruptedException ex) 
      Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
    

  



public class Consumer 

  public static void main(String[] args) 

    Queue q = QueueFactory.create(QueueType.ACTIVEMQ);
    try 
      while (true) 

        Message get = q.get();
        System.out.println(get);
        Thread.sleep(1000);

      
     catch (InterruptedException ex) 
      Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
    
  

感谢任何帮助。

【问题讨论】:

这通常是当您运行主代理然后使用 KahaDB 的相同数据路径启动从属代理时的情况。因此,请确保您运行的是单个实例,确保 AMQ kahaDB 文件夹中没有损坏的锁定文件等等。 【参考方案1】:

关注

杀死活动 mq 的 java 进程,open task manager -&gt; go to process -&gt; check for java -&gt; and right click and say end process

然后去&lt;activemq_install_directory&gt;/data/kahadb

并删除生成的锁定文件

尝试再次启动activemq

对我来说,每次我遇到这个问题时都有效

【讨论】:

我试过这个,现在当我 put() 一条消息,然后立即 get() 它在一个线程中,它可以工作。但是,当我尝试在不同的线程中运行生产者和消费者时,它会立即再次进入锁定状态....想法? 您是否确保您的put() 从第一个线程完全执行?? 不,我没有……我想你明白了。它现在似乎起作用了。谢谢!我将标记为答案。 这有助于我在 MBP 上安装 JBoss Fuse。干得好。 install 不是activemq 命令的任务之一。还有其他建议吗?【参考方案2】:

有同样的问题。 没有其他 Java 服务/代理启动并运行。 这个问题很愚蠢 -

确保您使用的用户有权写入锁定文件的路径。

【讨论】:

很好的答案,但尽量解释清楚。例如,代替“确保您使用的用户有权写入锁定文件的路径”,您可以写为“当运行该进程的用户有没有写入文件的权限"【参考方案3】:

我遇到的唯一释放此锁的解决方案是-

    转到 services.msc 永远停止 ActiveMQ(让 StartUp 类型为 Automatic) 您会看到锁定文件消失了 删除数据文件夹本身 重启 apache-activemq-5.10.0-bin\apache-activemq-5.10.0\bin\win64\activemq.bat(不是 services.msc) 你很高兴

【讨论】:

【参考方案4】:

仅适用于 linux:

sudo killall java

重新开始

【讨论】:

杀死所有java也会杀死其他进程,这可能会导致破坏。 然后关闭生产服务器,因为我想释放一个锁。

以上是关于ActiveMQ KahaDB 总是锁定和等待的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ消息持久化-KahaDB

哪个套件 KahaDB 或现有的用于 activeMQ 的 JDBC?

Database activemq-data\localhost\KahaDB\lock is locked...

ActiveMQ 从不删除 kahadb .log 文件;没有通过 JSP 接口可见的待处理消息;如何检测罪魁祸首?

ActiveMQ消息持久化-LevelDB

ActiveMQ 消息存储持久化