📄 queue.cs
字号:
using System;
using System.Collections.Generic;
using System.Text;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace JLL.SGIP
{
public class QueueItem
{
/// <summary>
/// 发送命令的Socket, 如果有应答则转发给这个socket
/// </summary>
public Socket SourceSocket;
/// <summary>
/// 所发送的命令,将该命令转发给服务器后
/// </summary>
public Command SourceCommand;
public Timer timer;
public QueueItem(): this(null, null, null)
{
}
public QueueItem(Socket s, Command cmd):this(s, cmd, null)
{
}
public QueueItem(Socket s, Command cmd, Timer t)
{
SourceSocket = s;
SourceCommand = cmd;
timer = t;
}
}
public class SocketCommandQueue
{
Dictionary<Socket, List<Command>> _socketWaitCommand; //一个二级sp socket所传送的submit命令(等待服务器应答), List<Command>作为一个队列
Dictionary<string, QueueItem> _cmdDict; //命令字典, 命令序列号---Item
Queue<QueueItem> _queue; //命令队列
private int _maxSocketCommand;//一个socket能发送的最大command队列
/// <summary>
/// 构造函数
/// </summary>
/// <param name="maxSocketCommand">一个socket的待应答命令最大数</param>
public SocketCommandQueue(int maxSocketCommand)
{
_maxSocketCommand = maxSocketCommand;
_socketWaitCommand = new Dictionary<Socket, List<Command>>();
_cmdDict = new Dictionary<string, QueueItem>();
_queue = new Queue<QueueItem>();
}
/// <summary>
/// 判断一个socket所发送的队列是否已经满了
/// </summary>
/// <param name="s"></param>
/// <returns></returns>
public bool IsSocketQueueFull(Socket s)
{
lock (this)
{
if (!_socketWaitCommand.ContainsKey(s))
{
return false;
}
return (_socketWaitCommand[s].Count >= _maxSocketCommand);
}
}
/// <summary>
/// 判断一个命令是否已存在于队列中(以命令的序列号为标准来判断)
/// </summary>
/// <param name="cmd"></param>
/// <returns></returns>
public bool IsCommandInQueue(Command cmd)
{
lock (_cmdDict)
{
return _cmdDict.ContainsKey(cmd.Head.SequenceNumber.ToString());
}
}
/// <summary>
/// 根据序列号查找一个QueueItem, 未找到则返回null
/// </summary>
/// <param name="sequenceNumber"></param>
/// <returns></returns>
public QueueItem FindItem(SequenceNumber sequenceNumber)
{
lock (_cmdDict)
{
if (_cmdDict.ContainsKey(sequenceNumber.ToString()))
{
return _cmdDict[sequenceNumber.ToString()];
}
return null;
}
}
/// <summary>
/// 判断队列是否为空
/// </summary>
/// <returns></returns>
public bool IsQueueEmpty()
{
lock (_queue)
{
return _queue.Count <= 0;
}
}
/// <summary>
/// 从队列中取出一个Item, 将该Item从队列中删除,并返回之
/// </summary>
/// <returns></returns>
public QueueItem Dequeue()
{
if (IsQueueEmpty())
{
throw new InvalidOperationException("队列为空!");
}
lock (this)
{
QueueItem result = _queue.Dequeue();
_cmdDict.Remove(result.SourceCommand.Head.SequenceNumber.ToString());
_socketWaitCommand[result.SourceSocket].Remove(result.SourceCommand);
if (_socketWaitCommand[result.SourceSocket].Count == 0)
{
_socketWaitCommand.Remove(result.SourceSocket);
}
return result;
}
}
/// <summary>
/// 加入一个item到队列中,请注意它会抛出异常的条件
/// </summary>
/// <param name="Item"></param>
public void Enqueue(QueueItem Item)
{
//一个socket能发送的command队列已满
if (IsSocketQueueFull(Item.SourceSocket))
{
throw new InvalidOperationException("socket的命令等待队列已满!");
}
if (IsCommandInQueue(Item.SourceCommand))
{
throw new InvalidOperationException("命令已存在于队列中!");
}
lock (this)
{
_queue.Enqueue(Item);
_cmdDict[Item.SourceCommand.Head.SequenceNumber.ToString()] = Item;
if (!_socketWaitCommand.ContainsKey(Item.SourceSocket))
{
_socketWaitCommand[Item.SourceSocket] = new List<Command>(_maxSocketCommand);
}
_socketWaitCommand[Item.SourceSocket].Add(Item.SourceCommand);
}
}
/// <summary>
/// 加入一个Submit command到队列
/// </summary>
/// <param name="cmd"></param>
public void Enqueue(Socket socket, Command cmd)
{
Enqueue(new QueueItem(socket, cmd));
}
/// <summary>
/// 返回队列中命令的个数
/// </summary>
public int Count
{
get { return _queue.Count; }
}
}
public class TimeoutSendList
{
private List<QueueItem> _cmdlist;
private Dictionary<string, QueueItem> _cmdDict;
private int _maxCommand; //最多的command的个数
private int _timeout; //超时,以毫秒为单位
/// <summary>
///
/// </summary>
/// <param name="maxCommand">列表中最多的command的个数</param>
/// <param name="timeout">以毫秒为单位,自Add(..)间隔timeout时间后,自动从列表中删除相关的项</param>
public TimeoutSendList(int maxCommand, int timeout)
{
_maxCommand = maxCommand;
_timeout = timeout;
_cmdlist = new List<QueueItem>();
_cmdDict = new Dictionary<string, QueueItem>();
}
/// <summary>
/// 列表是否已满了
/// </summary>
/// <returns></returns>
public bool IsFull()
{
lock (_cmdlist)
{
return (_cmdlist.Count >= _maxCommand);
}
}
/// <summary>
/// 定时器已超时,从列表中删除一个Item
/// </summary>
/// <param name="state"></param>
private void OnTimerInterval(object state)
{
QueueItem Item = (QueueItem)state;
Remove(Item);
}
/// <summary>
/// 新增一项到列表中, 如果超出nTimeout(毫秒)时间后还没有从列表中删除该项,则自动删除该项
/// </summary>
/// <param name="Item"></param>
public void Add(QueueItem Item, int nTimeout)
{
if (IsFull())
{
throw new InvalidOperationException("列表已满!");
}
/*
* 注意:这里没有判断是否重复, 尚不知是否存在bug
*/
Item.timer = new Timer(new TimerCallback(OnTimerInterval), Item, nTimeout, Timeout.Infinite);
lock (this)
{
_cmdlist.Add(Item);
_cmdDict[Item.SourceCommand.Head.SequenceNumber.ToString()] = Item;
}
}
/// <summary>
/// 新增一项到列表中, 如果超出构造函数所指定的时还没有删除该项,则自动删除该项
/// </summary>
/// <param name="Item"></param>
public void Add(QueueItem Item)
{
Add(Item, _timeout);
}
public void Remove(QueueItem Item)
{
lock (this)
{
_cmdlist.Remove(Item);
_cmdDict.Remove(Item.SourceCommand.Head.SequenceNumber.ToString());
if (Item.timer != null)
{
Item.timer.Dispose();
Item.timer = null;
}
}
}
public QueueItem FindItem(SequenceNumber sequenceNumber)
{
lock (_cmdDict)
{
if (_cmdDict.ContainsKey(sequenceNumber.ToString()))
{
return _cmdDict[sequenceNumber.ToString()];
}
return null;
}
}
public int Count
{
get { return _cmdlist.Count; }
}
}
public class MiddleClientSocket
{
/// <summary>
/// 用于异步接收的一个类
/// </summary>
class ReceiveStateObject
{
private Socket _workSocket;
private byte[] _buffer;
public Socket WorkSocket
{
get { return _workSocket; }
}
public byte[] Buffer
{
get { return _buffer; }
}
public ReceiveStateObject(Socket s)
{
_workSocket = s;
_buffer = new byte[SgipConfig.MaxBufferCount];
}
}
/// <summary>
/// 用于异步发送的一个类
/// </summary>
class SendStateObject
{
private Socket _workSocket;
private QueueItem _item;
public Socket WorkSocket
{
get { return _workSocket; }
}
public QueueItem Item
{
get { return _item; }
set { _item = value; }
}
public SendStateObject(Socket s)
{
_workSocket = s;
_item = null;
}
}
private SocketCommandQueue _cmdQueue;
private Socket _socket = null;
private IPAddress _serverIP;
private int _serverPort;
private Bind _bindBody;
private int _answerTimeout = 30000; //命令应答的最长时间,以毫秒为单位
private int _maxWaitAnswerCommand = 32; //最多的待应答命令数
private TimeoutSendList _sendList; //已发送的命令列表,如果在指定的时间内没有回应,则自动从列表中删除相应的项
private volatile bool _isSendingData = false; //是否正在发送数据, IsSendData属性
/// <summary>
///
/// </summary>
/// <param name="serverIP">服务器的IP</param>
/// <param name="serverPort">服务器所监听的端口</param>
/// <param name="nMaxWaitAnswerCommand">一个Socket最大的待应答命令数目</param>
/// <param name="nAnswerTimeout">命令应答的最大间隔,以秒为单位</param>
/// <param name="bindBody">向服务器登录的Bind消息体</param>
/// <param name="queue">所要传送的命令队列, 将从这个命令队列中取出命令发送到服务器</param>
public MiddleClientSocket(IPAddress serverIP, int serverPort, int nMaxWaitAnswerCommand, int nAnswerTimeout,
Bind bindBody, SocketCommandQueue queue)
{
_answerTimeout = nAnswerTimeout * 1000; //秒转为毫秒
_maxWaitAnswerCommand = nMaxWaitAnswerCommand;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -