我可以从并发线程调用 XMPPConnection.sendPacket 吗?

Posted

技术标签:

【中文标题】我可以从并发线程调用 XMPPConnection.sendPacket 吗?【英文标题】:Can I invoke XMPPConnection.sendPacket from concurrent threads? 【发布时间】:2009-09-18 08:20:24 【问题描述】:

动机

我需要额外的眼睛来确认我可以调用这个方法 XMPPConnection.sendPacket( 数据包) 同时进行。对于我当前的代码,我正在以串行方式调用 Callables 列表(最多 3 个)。每个 Callable 在一个 XMPPConnection 上发送/接收 XMPP 数据包。我计划通过分离多个线程来并行化这些 Callables,每个 Callable 将在共享 XMPPConnection 上调用 sendPacket 而无需同步。

XMPPConnection

class XMPPConnection

    private boolean connected = false;

    public boolean isConnected() 
    
        return connected;
    

    PacketWriter packetWriter;

    public void sendPacket( Packet packet ) 
    
        if (!isConnected())
            throw new IllegalStateException("Not connected to server.");

        if (packet == null) 
            throw new NullPointerException("Packet is null.");

        packetWriter.sendPacket(packet);
    

PacketWriter

class PacketWriter

    public void sendPacket(Packet packet) 
    
        if (!done) 
            // Invoke interceptors for the new packet 
            // that is about to be sent. Interceptors
            // may modify the content of the packet.
            processInterceptors(packet);

            try 
                queue.put(packet);
            
            catch (InterruptedException ie) 
                ie.printStackTrace();
                return;
            
            synchronized (queue) 
                queue.notifyAll();
            

            // Process packet writer listeners. Note that we're 
            // using the sending thread so it's expected that 
            // listeners are fast.
            processListeners(packet);
    

    protected PacketWriter( XMPPConnection connection ) 
    
        this.queue = new ArrayBlockingQueue<Packet>(500, true);
        this.connection = connection;
        init();
    

我的结论

由于 PacketWriter 使用的是 BlockingQueue,因此我打算从多个线程调用 sendPacket 没有问题。我说的对吗?

【问题讨论】:

【参考方案1】:

是的,您可以毫无问题地从不同线程发送数据包。

Smack 阻塞队列是因为你不能让不同的线程同时写入输出流。 Smack 负责通过以每个数据包的粒度写入输出流来同步输出流。

Smack 实现的模式只是一个典型的生产者/消费者并发模式。您可能有多个生产者(您的线程)和只有一个消费者(Smack 的 PacketWriter 在它自己的线程中运行)。

问候。

【讨论】:

【参考方案2】:

你在这里没有提供足够的信息。

我们不知道以下是如何实现的:

进程拦截器 进程监听器

谁读取/写入“完成”变量?如果一个线程将其设置为 true,则所有其他线程将静默失败。

乍一看,这看起来不是线程安全的,但无法从您发布的内容中确定。

其他问题:

为什么 PacketWriter 是 XMPPConnection 的类成员,而它只用于一种方法? 为什么 PacketWriter 有一个 XMPPConnection 成员 var 而没有使用它?

【讨论】:

【参考方案3】:

如果您可以限制为 Java 5+,您可以考虑使用 BlockingQueue。

来自 Java API 文档,对使用 ArrayBlockingQueue 稍作改动:

class Producer implements Runnable 
   private final BlockingQueue queue;
   Producer(BlockingQueue q)  queue = q; 
   public void run() 
     try 
       while(true)  queue.put(produce()); 
      catch (InterruptedException ex)  ... handle ...
   
   Object produce()  ... 
 

 class Consumer implements Runnable 
   private final BlockingQueue queue;
   Consumer(BlockingQueue q)  queue = q; 
   public void run() 
     try 
       while(true)  consume(queue.take()); 
      catch (InterruptedException ex)  ... handle ...
   
   void consume(Object x)  ... 
 

 class Setup 
   void main() 
     BlockingQueue q = new ArrayBlockingQueue();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   
 

对于您的使用,您的真正发送者(实际连接的持有者)是消费者,而数据包准备者/发送者是生产者。

另一个有趣的想法是,您可以使用 PriorityBlockingQueue 来允许闪存覆盖在任何其他等待数据包之前发送的 XMPP 数据包。

此外,Glen 在设计上的观点也很好。您可能想看看 Smack API (http://www.igniterealtime.org/projects/smack/) 而不是自己创建。

【讨论】:

以上是关于我可以从并发线程调用 XMPPConnection.sendPacket 吗?的主要内容,如果未能解决你的问题,请参考以下文章

从并发队列中调用 dispatch_sync - 它是不是完全阻塞?

从命名管道并发选择

Java并发编程原理与实战二十一:线程通信wait&notify&join

Java并发编程(10):使用wait/notify/notifyAll实现线程间通信的几点重要说明

android实现xmppconnection服务

JUC并发编程 -- 线程状态转换