从套接字入队数据,出队在其他线程中返回null
Posted
技术标签:
【中文标题】从套接字入队数据,出队在其他线程中返回null【英文标题】:Enqueue data from socket, dequeue returns null in other thread 【发布时间】:2016-12-19 16:15:26 【问题描述】:我已经尝试过调试,但我不知道为什么会发生这种情况(我也是线程新手)。大约 2/3 的出队数据显示为空,而其余数据则正常通过。任何见解将不胜感激。
using UnityEngine;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace UDPNetwork
public class NetworkManager : MonoBehaviour
struct DataPacket
public IPEndPoint destination;
public byte[] data;
public int size;
public DataPacket(IPEndPoint destination, byte[] data)
this.destination = destination;
this.data = data;
size = 0;
[SerializeField]
string SERVER_IP = "127.0.0.1";
[SerializeField]
ushort SERVER_PORT = 55566;
AsyncPriorityQueue<DataPacket> queuedReceiveData = new AsyncPriorityQueue<DataPacket>(2000, false);
Socket sck;
IPEndPoint ipEndPoint;
bool listening = true;
bool processing = true;
void Start()
sck = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
#if SERVER
ipEndPoint = new IPEndPoint(IPAddress.Any, SERVER_PORT);
sck.Bind(ipEndPoint);
#endif
new Thread(() => ListenForData()).Start();
new Thread(() => ProcessData()).Start();
void OnDestroy()
listening = false;
processing = false;
sck.Close();
void ListenForData()
EndPoint endPoint = ipEndPoint;
while (listening)
byte[] buffer = new byte[512];
try
int rec = sck.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref endPoint);
Array.Resize(ref buffer, rec);
queuedReceiveData.Enqueue(new DataPacket((IPEndPoint)endPoint, buffer) size = rec , 0);
catch (Exception e)
Debug.LogError(e.Message);
void ProcessData()
DataPacket rcv;
byte[] data;
IPEndPoint ep;
while (processing)
rcv = queuedReceiveData.Dequeue(); // blocks until queue has >1 item
data = rcv.data;
ep = rcv.destination;
if (data == null)
Debug.LogError(data); // null
Debug.LogError(rcv.size); // 0
Debug.LogError(ep); // null
Debug.LogError(rcv);
continue;
//process...
队列:
using System;
/// <summary>
/// Priority queue removes added items highest priority items first, ties broken by First-In-First-Out.
/// </summary>
/// <typeparam name="T"></typeparam>
public class PriorityQueue<T>
struct Node
public T item;
public int priority;
public CircularInt32 insertionIndex;
Node[] items;
bool _resizeable;
CircularInt32 _numItemsEverEnqueued = 0;
/// <summary>
/// How many items are currently in the queue
/// </summary>
public int Count
get;
private set;
/// <summary>
/// How many items the queue can hold. 0 == infinite.
/// </summary>
public int Capacity
get
return _resizeable ? 0 : items.Length;
/// <summary>
/// Create a new resizeable priority queue with default capacity (8)
/// </summary>
public PriorityQueue() : this(8)
/// <summary>
/// Create a new priority queue
/// </summary>
/// <param name="capacity"></param>
/// <param name="resizeable"></param>
public PriorityQueue(int capacity, bool resizeable = true)
if (capacity < 2)
throw new ArgumentException("New queue size cannot be smaller than 2", "capacity");
items = new Node[capacity];
Count = 0;
_resizeable = resizeable;
/// <summary>
/// Add an object to the queue. If queue is full and resizeable is true, increases the capacity. If queue is full and resizeable is false, does nothing, returns false.
/// </summary>
/// <param name="item">object to add to queue</param>
/// <param name="priority">object's priority, lower # = higher priority, ties are broken by FIFO</param>
/// <returns>true if added successfully, false otherwise (queue is full)</returns>
public bool Enqueue(T item, int priority)
if (Count == items.Length)
if (_resizeable)
Array.Resize(ref items, Capacity * 3 / 2 + 1);
else
return false;
items[Count] = new Node() item = item, priority = priority, insertionIndex = _numItemsEverEnqueued++ ;
percolateUp(Count);
Count++;
return true;
void percolateUp(int index)
while (true)
if (index == 0)
break;
int parent = (index % 2 == 0) ? index / 2 - 1 : index / 2;
if (HasHigherPriority(items[parent], items[index]))
var temp = items[index];
items[index] = items[parent];
items[parent] = temp;
index = parent;
else
break;
/// <summary>
/// Removes and returns the highest priority object in the queue. Ties are broken by FIFO.
/// Returns an object's default value if the queue is empty.
/// </summary>
/// <returns></returns>
public T Dequeue()
if (Count == 0)
return default(T);
var item = items[0].item;
items[0] = new Node();
percolateDown(0);
Count--;
return item;
void percolateDown(int index)
while (true)
int left = index * 2 + 1;
if (left + 1 < Count && HasHigherPriority(items[left + 1], items[left]))
var temp = items[index];
items[index] = items[left + 1];
items[left + 1] = temp;
index = left + 1;
else if (left < Count)
var temp = items[index];
items[index] = items[left];
items[left] = temp;
index = left;
else
break;
bool HasHigherPriority(Node higher, Node lower)
return (higher.priority < lower.priority || (higher.priority == lower.priority && higher.insertionIndex < lower.insertionIndex));
异步:
using System.Threading;
/// <summary>
/// A thread-safe priority queue.
/// </summary>
/// <typeparam name="T"></typeparam>
public class AsyncPriorityQueue<T>
PriorityQueue<T> pq;
/// <summary>
/// How many items are currently in the queue
/// </summary>
public int Count
get return pq.Count;
/// <summary>
/// How many items the queue can hold. 0 == infinite.
/// </summary>
public int Capacity
get return pq.Capacity;
/// <summary>
/// Create a new resizeable async priority queue with default capacity (8)
/// </summary>
public AsyncPriorityQueue()
pq = new PriorityQueue<T>();
/// <summary>
/// Create a new priority queue
/// </summary>
/// <param name="capacity"></param>
/// <param name="resizeable"></param>
public AsyncPriorityQueue(int capacity, bool resizeable = true)
pq = new PriorityQueue<T>(capacity, resizeable);
/// <summary>
/// Add an object to the queue. If queue is full and resizeable is true, increases the capacity. If queue is full and resizeable is false, does nothing, returns false.
/// </summary>
/// <param name="item">object to add to queue</param>
/// <param name="priority">object's priority, lower # = higher priority, ties are broken by FIFO</param>
/// <returns>true if added successfully, false otherwise (queue is full)</returns>
public bool Enqueue(T item, int priority)
lock (pq)
bool added = pq.Enqueue(item, priority);
if (pq.Count == 1)
Monitor.Pulse(pq);
return added;
/// <summary>
/// Removes and returns the highest priority object in the queue. Ties are broken by FIFO.
/// WARNING: if the queue is empty when this is called, the thread WILL BLOCK until a new item is added to the queue in another thread. If this behaviour is not wanted, be sure to check Count > 0.
/// </summary>
/// <returns></returns>
public T Dequeue()
lock (pq)
while (pq.Count == 0)
Monitor.Wait(pq);
return pq.Dequeue();
【问题讨论】:
我很好奇 Array.Resize(ref buffer, rec);陈述。您可以调试它或跟踪它并查看数据是否在调整数组大小之前存在吗?另外,您确定发送方实际上正在发送一些东西吗?这可能是另一边的问题。 我在调整数组大小后将调试行放入侦听线程并正确打印出来。是的,我确定对方正在发送。 另外,即使由于某种原因没有数据,Array.Resize 也会返回一个全为 0 的新数组,在这种情况下它仍然不应该为 null。 好的,所以接收不是问题。之后您要做的事情是擦除阵列。这条线在我看来很可疑:“Array.Resize(ref items, Capacity * 3 / 2 + 1);”。如果队列是可调整大小的(这是执行此行的必要条件之一,在我看来,数组的大小将始终为 1。 嗯,不错。我在那里将Capacity
更改为items.Length
,它似乎可以与连接的2 个客户端一起使用(每个客户端不断发送~10 个数据包/秒)。添加第三个客户端后,它会再次开始打印 null,但现在的速度约为 1/10。
【参考方案1】:
首先,当所有消息都以优先级 0 排队时,为什么要使用优先级队列尚不清楚。但我假设您的目标是最终更改某些消息的优先级。在任何情况下,由于您将所有内容都以优先级 0 排入队列,因此您已经发现了优先级队列实现中的一个严重错误。
我怀疑如果您将优先级为 1 的所有内容都排入队列,您将永远不会看到此错误。但你不应该那样做。
问题在于,当您使一个项目出队时,您正在通过向下渗透一个优先级为 0 的空节点来重新调整堆。更重要的是,它的 insertionIndex
从未设置,所以它是 0。最终将新的空节点 放在 已经在队列中的好节点之前,并且稍后添加到队列中的新节点将被添加 after 该空节点。而且因为队列中的所有内容的优先级为 0,所以新的空节点就在根节点处。
您需要更改在使项目出列时重新调整堆的方式。与其在顶部输入一个空节点并向下渗透,不如取堆中的最后一个节点,将其插入根,然后向下渗透。但是你必须改变你的percolateDown
方法。
以下是我的建议:
public T Dequeue()
if (Count == 0)
return default(T);
var item = items[0].item;
items[0] = items[Count-1];
items[Count-1] = null;
Count--;
percolateDown(0);
return item;
void percolateDown(int index)
while (true)
// The rules for adjusting on percolate down are to swap the
// node with the highest-priority child. So first we have to
// find the highest-priority child.
int hpChild = index*2+1;
if (hpChild >= Count)
break;
if (hpChild+1 < Count && HasHigherPriority(items[hpChild+1], items[hpChild]))
++hpChild;
if (HasHigherPriority(items[hpChild, items[index]))
var temp = items[index];
items[index] = items[hpChild];
items[hpChild] = temp;
else
break;
index = hpChild;
有关正确实现二叉堆的更多详细信息,请参阅http://blog.mischel.com/2013/09/29/a-better-way-to-do-it-the-heap/ 以及随后的条目。
其他几点说明:
与其自己调整数组大小,不如将items
数组转换为List<Node>
。它会为你处理所有的大小调整等等。
在您的percolateUp
中,您有:
int parent = (index % 2 == 0) ? index / 2 - 1 : index / 2;
您可以将其简化为:
int parent = (index + 1)/2;
【讨论】:
啊哈!正如您建议的那样,插入索引或其优先级不是问题,而是我的渗透实现。我有缺陷的想法是将空节点强制到数组的末尾,但我只设法将它强制到最后一行,这可能是也可能不是数组的末尾。您的实现有效,我只需要添加一个额外的行来将 items[Count - 1] 设置为一个空节点,以便可以对出队的对象进行垃圾收集。非常感谢!感谢您提供简化的数学提示。 @Tim:感谢您在我的Dequeue
方法中提及这一疏忽。我已经更正了答案中的代码。以上是关于从套接字入队数据,出队在其他线程中返回null的主要内容,如果未能解决你的问题,请参考以下文章