我可以从并发线程调用 XMPPConnection.sendPacket 吗?
Posted
技术标签:
【中文标题】我可以从并发线程调用 XMPPConnection.sendPacket 吗?【英文标题】:Can I invoke XMPPConnection.sendPacket from concurrent threads? 【发布时间】:2009-09-18 08:20:24 【问题描述】:动机
我需要额外的眼睛来确认我可以调用这个方法 XMPPConnection.sendPacket( 数据包) 同时进行。对于我当前的代码,我正在以串行方式调用 Callables 列表(最多 3 个)。每个 Callable 在一个 XMPPConnection 上发送/接收 XMPP 数据包。我计划通过分离多个线程来并行化这些 Callables,每个 Callable 将在共享 XMPPConnection 上调用 sendPacket 而无需同步。
XMPPConnection
class XMPPConnection
private boolean connected = false;
public boolean isConnected()
return connected;
PacketWriter packetWriter;
public void sendPacket( Packet packet )
if (!isConnected())
throw new IllegalStateException("Not connected to server.");
if (packet == null)
throw new NullPointerException("Packet is null.");
packetWriter.sendPacket(packet);
PacketWriter
class PacketWriter
public void sendPacket(Packet packet)
if (!done)
// Invoke interceptors for the new packet
// that is about to be sent. Interceptors
// may modify the content of the packet.
processInterceptors(packet);
try
queue.put(packet);
catch (InterruptedException ie)
ie.printStackTrace();
return;
synchronized (queue)
queue.notifyAll();
// Process packet writer listeners. Note that we're
// using the sending thread so it's expected that
// listeners are fast.
processListeners(packet);
protected PacketWriter( XMPPConnection connection )
this.queue = new ArrayBlockingQueue<Packet>(500, true);
this.connection = connection;
init();
我的结论
由于 PacketWriter 使用的是 BlockingQueue,因此我打算从多个线程调用 sendPacket 没有问题。我说的对吗?
【问题讨论】:
【参考方案1】:是的,您可以毫无问题地从不同线程发送数据包。
Smack 阻塞队列是因为你不能让不同的线程同时写入输出流。 Smack 负责通过以每个数据包的粒度写入输出流来同步输出流。
Smack 实现的模式只是一个典型的生产者/消费者并发模式。您可能有多个生产者(您的线程)和只有一个消费者(Smack 的 PacketWriter 在它自己的线程中运行)。
问候。
【讨论】:
【参考方案2】:你在这里没有提供足够的信息。
我们不知道以下是如何实现的:
进程拦截器 进程监听器谁读取/写入“完成”变量?如果一个线程将其设置为 true,则所有其他线程将静默失败。
乍一看,这看起来不是线程安全的,但无法从您发布的内容中确定。
其他问题:
为什么 PacketWriter 是 XMPPConnection 的类成员,而它只用于一种方法? 为什么 PacketWriter 有一个 XMPPConnection 成员 var 而没有使用它?【讨论】:
【参考方案3】:如果您可以限制为 Java 5+,您可以考虑使用 BlockingQueue。
来自 Java API 文档,对使用 ArrayBlockingQueue 稍作改动:
class Producer implements Runnable
private final BlockingQueue queue;
Producer(BlockingQueue q) queue = q;
public void run()
try
while(true) queue.put(produce());
catch (InterruptedException ex) ... handle ...
Object produce() ...
class Consumer implements Runnable
private final BlockingQueue queue;
Consumer(BlockingQueue q) queue = q;
public void run()
try
while(true) consume(queue.take());
catch (InterruptedException ex) ... handle ...
void consume(Object x) ...
class Setup
void main()
BlockingQueue q = new ArrayBlockingQueue();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
对于您的使用,您的真正发送者(实际连接的持有者)是消费者,而数据包准备者/发送者是生产者。
另一个有趣的想法是,您可以使用 PriorityBlockingQueue 来允许闪存覆盖在任何其他等待数据包之前发送的 XMPP 数据包。
此外,Glen 在设计上的观点也很好。您可能想看看 Smack API (http://www.igniterealtime.org/projects/smack/) 而不是自己创建。
【讨论】:
以上是关于我可以从并发线程调用 XMPPConnection.sendPacket 吗?的主要内容,如果未能解决你的问题,请参考以下文章
从并发队列中调用 dispatch_sync - 它是不是完全阻塞?
Java并发编程原理与实战二十一:线程通信wait¬ify&join