从套接字入队数据,出队在其他线程中返回null

Posted

技术标签:

【中文标题】从套接字入队数据,出队在其他线程中返回null【英文标题】:Enqueue data from socket, dequeue returns null in other thread 【发布时间】:2016-12-19 16:15:26 【问题描述】:

我已经尝试过调试,但我不知道为什么会发生这种情况(我也是线程新手)。大约 2/3 的出队数据显示为空,而其余数据则正常通过。任何见解将不胜感激。

using UnityEngine;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace UDPNetwork

    public class NetworkManager : MonoBehaviour
    
        struct DataPacket
        
            public IPEndPoint destination;
            public byte[] data;
            public int size;

            public DataPacket(IPEndPoint destination, byte[] data)
            
                this.destination = destination;
                this.data = data;
                size = 0;
            
        

        [SerializeField]
        string SERVER_IP = "127.0.0.1";
        [SerializeField]
        ushort SERVER_PORT = 55566;

        AsyncPriorityQueue<DataPacket> queuedReceiveData = new AsyncPriorityQueue<DataPacket>(2000, false);

        Socket sck;
        IPEndPoint ipEndPoint;
        bool listening = true;
        bool processing = true;

        void Start()
        
            sck = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
#if SERVER
            ipEndPoint = new IPEndPoint(IPAddress.Any, SERVER_PORT);
            sck.Bind(ipEndPoint);
#endif
            new Thread(() => ListenForData()).Start();
            new Thread(() => ProcessData()).Start();
        
        void OnDestroy()
        
            listening = false;
            processing = false;
            sck.Close();
        

        void ListenForData()
        
            EndPoint endPoint = ipEndPoint;
            while (listening)
            
                byte[] buffer = new byte[512];
                try
                
                    int rec = sck.ReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref endPoint);
                    Array.Resize(ref buffer, rec);

                    queuedReceiveData.Enqueue(new DataPacket((IPEndPoint)endPoint, buffer)  size = rec , 0);
                
                catch (Exception e)
                
                    Debug.LogError(e.Message);
                
            
        

        void ProcessData()
        
            DataPacket rcv;
            byte[] data;
            IPEndPoint ep;

            while (processing)
            
                rcv = queuedReceiveData.Dequeue(); // blocks until queue has >1 item
                data = rcv.data;
                ep = rcv.destination;

                if (data == null)
                
                    Debug.LogError(data); // null
                    Debug.LogError(rcv.size); // 0
                    Debug.LogError(ep); // null
                    Debug.LogError(rcv);
                    continue;
                

                //process...
            
        
    

队列:

using System;

/// <summary>
/// Priority queue removes added items highest priority items first, ties broken by First-In-First-Out.
/// </summary>
/// <typeparam name="T"></typeparam>
public class PriorityQueue<T>


    struct Node
    
        public T item;
        public int priority;
        public CircularInt32 insertionIndex;
    

    Node[] items;
    bool _resizeable;
    CircularInt32 _numItemsEverEnqueued = 0;

    /// <summary>
    /// How many items are currently in the queue
    /// </summary>
    public int Count
    
        get;
        private set;
    

    /// <summary>
    /// How many items the queue can hold. 0 == infinite.
    /// </summary>
    public int Capacity
    
        get
        
            return _resizeable ? 0 : items.Length;
        
    

    /// <summary>
    /// Create a new resizeable priority queue with default capacity (8)
    /// </summary>
    public PriorityQueue() : this(8)  

    /// <summary>
    /// Create a new priority queue
    /// </summary>
    /// <param name="capacity"></param>
    /// <param name="resizeable"></param>
    public PriorityQueue(int capacity, bool resizeable = true)
    
        if (capacity < 2)
        
            throw new ArgumentException("New queue size cannot be smaller than 2", "capacity");
        
        items = new Node[capacity];
        Count = 0;
        _resizeable = resizeable;
    

    /// <summary>
    /// Add an object to the queue. If queue is full and resizeable is true, increases the capacity. If queue is full and resizeable is false, does nothing, returns false.
    /// </summary>
    /// <param name="item">object to add to queue</param>
    /// <param name="priority">object's priority, lower # = higher priority, ties are broken by FIFO</param>
    /// <returns>true if added successfully, false otherwise (queue is full)</returns>
    public bool Enqueue(T item, int priority)
    
        if (Count == items.Length)
        
            if (_resizeable)
            
                Array.Resize(ref items, Capacity * 3 / 2 + 1);
            
            else
            
                return false;
            
        
        items[Count] = new Node()  item = item, priority = priority, insertionIndex = _numItemsEverEnqueued++ ;
        percolateUp(Count);
        Count++;
        return true;
    

    void percolateUp(int index)
    
        while (true)
        
            if (index == 0)
            
                break;
            

            int parent = (index % 2 == 0) ? index / 2 - 1 : index / 2;

            if (HasHigherPriority(items[parent], items[index]))
            
                var temp = items[index];
                items[index] = items[parent];
                items[parent] = temp;
                index = parent;
            
            else
            
                break;
            
        
    

    /// <summary>
    /// Removes and returns the highest priority object in the queue. Ties are broken by FIFO.
    /// Returns an object's default value if the queue is empty.
    /// </summary>
    /// <returns></returns>
    public T Dequeue()
    
        if (Count == 0)
        
            return default(T);
        

        var item = items[0].item;
        items[0] = new Node();
        percolateDown(0);
        Count--;
        return item;
    

    void percolateDown(int index)
    
        while (true)
        
            int left = index * 2 + 1;

            if (left + 1 < Count && HasHigherPriority(items[left + 1], items[left]))
            
                var temp = items[index];
                items[index] = items[left + 1];
                items[left + 1] = temp;
                index = left + 1;
            
            else if (left < Count)
            
                var temp = items[index];
                items[index] = items[left];
                items[left] = temp;
                index = left;
            
            else
            
                break;
            
        

    

    bool HasHigherPriority(Node higher, Node lower)
    
        return (higher.priority < lower.priority || (higher.priority == lower.priority && higher.insertionIndex < lower.insertionIndex));
    

异步:

using System.Threading;

/// <summary>
/// A thread-safe priority queue.
/// </summary>
/// <typeparam name="T"></typeparam>
public class AsyncPriorityQueue<T>

    PriorityQueue<T> pq;

    /// <summary>
    /// How many items are currently in the queue
    /// </summary>
    public int Count
    
        get  return pq.Count; 
    

    /// <summary>
    /// How many items the queue can hold. 0 == infinite.
    /// </summary>
    public int Capacity
    
        get  return pq.Capacity; 
    

    /// <summary>
    /// Create a new resizeable async priority queue with default capacity (8)
    /// </summary>
    public AsyncPriorityQueue()
    
        pq = new PriorityQueue<T>();
    

    /// <summary>
    /// Create a new priority queue
    /// </summary>
    /// <param name="capacity"></param>
    /// <param name="resizeable"></param>
    public AsyncPriorityQueue(int capacity, bool resizeable = true)
    
        pq = new PriorityQueue<T>(capacity, resizeable);
    

    /// <summary>
    /// Add an object to the queue. If queue is full and resizeable is true, increases the capacity. If queue is full and resizeable is false, does nothing, returns false.
    /// </summary>
    /// <param name="item">object to add to queue</param>
    /// <param name="priority">object's priority, lower # = higher priority, ties are broken by FIFO</param>
    /// <returns>true if added successfully, false otherwise (queue is full)</returns>
    public bool Enqueue(T item, int priority)
    
        lock (pq)
        
            bool added = pq.Enqueue(item, priority);
            if (pq.Count == 1)
            
                Monitor.Pulse(pq);
            
            return added;
        
    

    /// <summary>
    /// Removes and returns the highest priority object in the queue. Ties are broken by FIFO.
    /// WARNING: if the queue is empty when this is called, the thread WILL BLOCK until a new item is added to the queue in another thread. If this behaviour is not wanted, be sure to check Count > 0.
    /// </summary>
    /// <returns></returns>
    public T Dequeue()
    
        lock (pq)
        
            while (pq.Count == 0)
            
                Monitor.Wait(pq);
            
            return pq.Dequeue();
        
    

【问题讨论】:

我很好奇 Array.Resize(ref buffer, rec);陈述。您可以调试它或跟踪它并查看数据是否在调整数组大小之前存在吗?另外,您确定发送方实际上正在发送一些东西吗?这可能是另一边的问题。 我在调整数组大小后将调试行放入侦听线程并正确打印出来。是的,我确定对方正在发送。 另外,即使由于某种原因没有数据,Array.Resize 也会返回一个全为 0 的新数组,在这种情况下它仍然不应该为 null。 好的,所以接收不是问题。之后您要做的事情是擦除阵列。这条线在我看来很可疑:“Array.Resize(ref items, Capacity * 3 / 2 + 1);”。如果队列是可调整大小的(这是执行此行的必要条件之一,在我看来,数组的大小将始终为 1。 嗯,不错。我在那里将Capacity 更改为items.Length,它似乎可以与连接的2 个客户端一起使用(每个客户端不断发送~10 个数据包/秒)。添加第三个客户端后,它会再次开始打印 null,但现在的速度约为 1/10。 【参考方案1】:

首先,当所有消息都以优先级 0 排队时,为什么要使用优先级队列尚不清楚。但我假设您的目标是最终更改某些消息的优先级。在任何情况下,由于您将所有内容都以优先级 0 排入队列,因此您已经发现了优先级队列实现中的一个严重错误。

我怀疑如果您将优先级为 1 的所有内容都排入队列,您将永远不会看到此错误。但你不应该那样做。

问题在于,当您使一个项目出队时,您正在通过向下渗透一个优先级为 0 的空节点来重新调整堆。更重要的是,它的 insertionIndex 从未设置,所以它是 0。最终将新的空节点 放在 已经在队列中的好节点之前,并且稍后添加到队列中的新节点将被添加 after 该空节点。而且因为队列中的所有内容的优先级为 0,所以新的空节点就在根节点处。

您需要更改在使项目出列时重新调整堆的方式。与其在顶部输入一个空节点并向下渗透,不如取堆中的最后一个节点,将其插入根,然后向下渗透。但是你必须改变你的percolateDown 方法。

以下是我的建议:

public T Dequeue()

    if (Count == 0)
    
        return default(T);
    

    var item = items[0].item;
    items[0] = items[Count-1];
    items[Count-1] = null;
    Count--;
    percolateDown(0);
    return item;


void percolateDown(int index)

    while (true)
    
        // The rules for adjusting on percolate down are to swap the
        // node with the highest-priority child. So first we have to
        // find the highest-priority child.
        int hpChild = index*2+1;
        if (hpChild >= Count)
        
            break;
        
        if (hpChild+1 < Count && HasHigherPriority(items[hpChild+1], items[hpChild]))
        
            ++hpChild;
        
        if (HasHigherPriority(items[hpChild, items[index]))
        
            var temp = items[index];
            items[index] = items[hpChild];
            items[hpChild] = temp;
        
        else
        
            break;
        
        index = hpChild;
    

有关正确实现二叉堆的更多详细信息,请参阅http://blog.mischel.com/2013/09/29/a-better-way-to-do-it-the-heap/ 以及随后的条目。

其他几点说明:

与其自己调整数组大小,不如将items 数组转换为List&lt;Node&gt;。它会为你处理所有的大小调整等等。

在您的percolateUp 中,您有:

int parent = (index % 2 == 0) ? index / 2 - 1 : index / 2;

您可以将其简化为:

int parent = (index + 1)/2;

【讨论】:

啊哈!正如您建议的那样,插入索引或其优先级不是问题,而是我的渗透实现。我有缺陷的想法是将空节点强制到数组的末尾,但我只设法将它强制到最后一行,这可能是也可能不是数组的末尾。您的实现有效,我只需要添加一个额外的行来将 items[Count - 1] 设置为一个空节点,以便可以对出队的对象进行垃圾收集。非常感谢!感谢您提供简化的数学提示。 @Tim:感谢您在我的Dequeue 方法中提及这一疏忽。我已经更正了答案中的代码。

以上是关于从套接字入队数据,出队在其他线程中返回null的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 java 套接字将图像从 android studio 发送到 pc,filePath 返回 null

后台工作线程和异步调用

在 ICMP 套接字上接收数据

循环队列的实现(出队,入队,遍历等)

SynchronousQueue 1.8 源码解析

从套接字流读取时是不是需要线程睡眠?