📄 queue.cs
字号:
_cmdQueue = queue;
_sendList = new TimeoutSendList(_maxWaitAnswerCommand, _answerTimeout + 1000);
_serverIP = serverIP;
_serverPort = serverPort;
_bindBody = bindBody;
}
/// <summary>
/// 是否正在发送数据
/// </summary>
public bool IsSendingData
{
get
{
return _isSendingData;
}
private set
{
_isSendingData = value;
/*
Interlocked.Increment(ref _isSendData);
_isSendData = value ? 1 : 0;
Interlocked.Decrement(ref _isSendData);
*/
}
}
/// <summary>
/// 开始从队列中取出命令并传送到服务器
/// </summary>
public void StartSendData()
{
lock (this)
{
if (_cmdQueue.IsQueueEmpty())
{
return;
}
if ((_socket != null) && !SgipHelper.IsSocketConnected(_socket))
{
SgipHelper.ShutdownAndCloseSocket(_socket);
_socket = null;
}
if (_socket == null)
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
if (!ConnectToServer())
{
SgipHelper.ShutdownAndCloseSocket(_socket);
_socket = null;
return;
}
ReceiveData(new ReceiveStateObject(_socket)); //ReceiveData()只要socket不断开就不会中断
IsSendingData = false; /*如果发送时socket了中断了,则有可能IsSendingData还是true*/
}
//发送数据
if (!IsSendingData)
{
//开始发送数据
IsSendingData = true;
SendData(new SendStateObject(_socket));
}
}
}
private bool ConnectToServer()
{
try
{
_socket.Connect(_serverIP, _serverPort);
Command bindCommand = Command.CreateCommand(_bindBody);
_socket.Send(bindCommand.GetBytes());
byte[] receiveBuffer = new byte[SgipConfig.MaxBufferCount];
_socket.ReceiveTimeout = _answerTimeout;
_socket.Receive(receiveBuffer);
Command bindRespCommand = Command.ToCommand(receiveBuffer, 0);
return (((Bind_Resp)bindRespCommand.Body).Result == 0);
}catch{
return false;
}
}
private void ReceiveData(ReceiveStateObject obj)
{
try
{
obj.WorkSocket.BeginReceive(obj.Buffer, 0, obj.Buffer.Length, SocketFlags.None, new AsyncCallback(OnRecieveDataCallBack), obj);
}catch(Exception e){
System.Diagnostics.Debug.WriteLine(e);
SgipHelper.ShutdownAndCloseSocket(obj.WorkSocket);
}
}
private void OnRecieveDataCallBack(IAsyncResult ar)
{
ReceiveStateObject obj = (ReceiveStateObject)ar.AsyncState;
int bytesRead = 0;
try
{
bytesRead = obj.WorkSocket.EndReceive(ar);
}catch (Exception e){
System.Diagnostics.Debug.WriteLine(e);
}
if (bytesRead <= 0) //远程主机已断开
{
SgipHelper.ShutdownAndCloseSocket(obj.WorkSocket);
return;
}
Command cmdAnswer = null;
try
{
cmdAnswer = Command.ToCommand(obj.Buffer, 0);
}catch (Exception e){
System.Diagnostics.Debug.WriteLine(e);
}
if (cmdAnswer == null) //无法识别的命令
{
ReceiveData(obj);
return;
}
//只考虑收到sgip_unbind命令的情况,不考虑unbind_resp, 因为本程序不会主动断开连接
if (cmdAnswer.Head.CommandID == Command.SGIP_UNBIND)
{
try
{
obj.WorkSocket.Send(Command.CreateCommand(new Unbind_Resp()).GetBytes());
}catch(Exception e){
System.Diagnostics.Debug.WriteLine(e);
}
SgipHelper.ShutdownAndCloseSocket(obj.WorkSocket);
return;
}
//读取到了服务器返回的数据, 注意考虑:记录这个成功的命令到数据库
QueueItem Item = _sendList.FindItem(cmdAnswer.Head.SequenceNumber);
if (Item != null)
{
_sendList.Remove(Item);
try
{
Item.SourceSocket.Send(cmdAnswer.GetBytes());//把应答传给item中的socket
}catch (Exception e){
System.Diagnostics.Debug.WriteLine(e);
}
}
ReceiveData(obj);
}
private void RemoveSendingItemAndShutdownSocket(SendStateObject obj)
{
_sendList.Remove(obj.Item);
SgipHelper.ShutdownAndCloseSocket(obj.WorkSocket);
}
private void SendData(SendStateObject obj)
{
//发送队列已满
while (_sendList.IsFull())
{
Thread.Sleep(2000);
}
lock (_cmdQueue) //保证判断为队列为空和从队列中取出命令是原子的,即:不能让别的线程先取走了,否则就可能发生异常
{
if (_cmdQueue.IsQueueEmpty())
{
IsSendingData = false;
return;
}
obj.Item = _cmdQueue.Dequeue();
}
_sendList.Add(obj.Item);
byte[] byteSend = obj.Item.SourceCommand.GetBytes();
try
{
obj.WorkSocket.BeginSend(byteSend, 0, byteSend.Length, SocketFlags.None, new AsyncCallback(OnSendDataCallBack), obj);
}catch (Exception e){
System.Diagnostics.Debug.WriteLine(e);
RemoveSendingItemAndShutdownSocket(obj);
}
}
private void OnSendDataCallBack(IAsyncResult ar)
{
SendStateObject obj = (SendStateObject)ar.AsyncState;
try
{
obj.WorkSocket.EndSend(ar);
//Trace命令没有应答
if (obj.Item.SourceCommand.Body is Trace)
{
_sendList.Remove(obj.Item);
}
SendData(obj);
}catch(Exception e){
System.Diagnostics.Debug.WriteLine(e);
//注意:Item并没有发送到服务器,请做提交失败的处理
RemoveSendingItemAndShutdownSocket(obj);
}
}
}
public class Transmit
{
private List<MiddleClientSocket> _sendSocket;
/// <summary>
///
/// </summary>
/// <param name="serverIP">服务器的IP</param>
/// <param name="serverPort">服务器所监听的端口</param>
/// <param name="maxConnection">最多能向服务器建立多少个socket</param>
/// <param name="nMaxWaitAnswerCommand">一个Socket最大的待应答命令数目</param>
/// <param name="nAnswerTimeout">命令应答的最大间隔,以秒为单位</param>
/// <param name="bindBody">向服务器登录的Bind消息体</param>
/// <param name="queue">所要传送的命令队列, 将从这个命令队列中取出命令发送到服务器</param>
public Transmit(IPAddress serverIP, int serverPort, int maxConnection, int nMaxWaitAnswerCommand, int nAnswerTimeout,
Bind bindBody, SocketCommandQueue queue)
{
_sendSocket = new List<MiddleClientSocket>(maxConnection);
for (int i = 0; i < maxConnection; ++i)
{
_sendSocket.Add(new MiddleClientSocket(serverIP, serverPort, nMaxWaitAnswerCommand, nAnswerTimeout, bindBody, queue));
}
}
public Transmit(List<MiddleClientSocket> socketList)
{
_sendSocket = socketList;
}
private void PromptSocketSendData(object state)
{
MiddleClientSocket socket = (MiddleClientSocket)state;
socket.StartSendData();
}
/// <summary>
/// 将队列中的命令传送到服务器
/// </summary>
public void SendData()
{
foreach (MiddleClientSocket Item in _sendSocket)
{
if (!Item.IsSendingData)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(PromptSocketSendData), Item);
break;
}
}
}
}
/// <summary>
/// 记录一个用户所登录以后的Sokect
/// </summary>
public class UserSockets
{
private int _maxSocket; //一个用户所能bind的最多的Socket数
private Dictionary<string, List<Socket>> _userSockets;
/// <summary>
///
/// </summary>
/// <param name="maxSocket">一个用户最多能bind的socket的个数</param>
public UserSockets(int maxSocket)
{
_maxSocket = maxSocket;
_userSockets = new Dictionary<string, List<Socket>>();
}
/// <summary>
/// 一个用户所bind的socket数目是否已满了
/// </summary>
/// <param name="strUser"></param>
/// <returns></returns>
public bool IsUserSocketFull(string strUser)
{
lock (this)
{
if (strUser == null)
{
return true;
}
if (_userSockets.ContainsKey(strUser))
{
return _userSockets[strUser].Count >= _maxSocket;
}
return false;
}
}
/// <summary>
/// 判断一个ip(由ip可以查知是哪一个二级sp)所能连接的socket是否已经满了
/// </summary>
/// <param name="strUser"></param>
/// <returns></returns>
public bool IsUserSocketFull(IPAddress ip)
{
lock (this)
{
if (ip == null)
{
return true;
}
SubSP sp = SubSPManager.Manager.FindByIP(ip.ToString());
return (sp == null) ? true : IsUserSocketFull(sp.LoginName);
}
}
public int GetUserSocketCount(string strUser)
{
lock (this)
{
if ((strUser != null) && (_userSockets.ContainsKey(strUser)))
{
return _userSockets[strUser].Count;
}
return 0;
}
}
/// <summary>
///
/// </summary>
/// <param name="strUser"></param>
/// <param name="socket"></param>
/// <returns></returns>
public void Login(string strUser, Socket socket)
{
if (IsUserSocketFull(strUser))
{
throw new InvalidOperationException("用户所能连接的socket已满!无法再建立连接!");
}
lock (this)
{
if (strUser != null)
{
if (!_userSockets.ContainsKey(strUser))
{
_userSockets[strUser] = new List<Socket>(_maxSocket);
}
_userSockets[strUser].Add(socket);
}
}
}
public void Logout(string strUser, Socket socket)
{
lock (this)
{
if ((strUser != null) && (_userSockets.ContainsKey(strUser)))
{
_userSockets[strUser].Remove(socket);
//如果用户的socket已没有了,则字典中删除之,且对应的socket队列也删除
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -