⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queue.cs

📁 联通的SGIP发送代码
💻 CS
📖 第 1 页 / 共 2 页
字号:

            _cmdQueue = queue;
            _sendList = new TimeoutSendList(_maxWaitAnswerCommand, _answerTimeout + 1000);
            _serverIP = serverIP;
            _serverPort = serverPort;
            _bindBody = bindBody;
        }

        /// <summary>
        /// 是否正在发送数据
        /// </summary>
        public bool IsSendingData
        {
            get
            {
                return _isSendingData;
            }
            private set
            {
                _isSendingData = value;
                /*
                Interlocked.Increment(ref _isSendData);
                _isSendData = value ? 1 : 0; 
                Interlocked.Decrement(ref _isSendData);
                 */
            }
        }

        /// <summary>
        /// 开始从队列中取出命令并传送到服务器
        /// </summary>
        public void StartSendData()
        {
            lock (this)
            {
                if (_cmdQueue.IsQueueEmpty())
                {
                    return;
                }

                if ((_socket != null) && !SgipHelper.IsSocketConnected(_socket))
                {
                    SgipHelper.ShutdownAndCloseSocket(_socket);
                    _socket = null;
                }

                if (_socket == null)
                {
                    _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                    if (!ConnectToServer())
                    {
                        SgipHelper.ShutdownAndCloseSocket(_socket);
                        _socket = null;
                        return;
                    }
                    ReceiveData(new ReceiveStateObject(_socket)); //ReceiveData()只要socket不断开就不会中断
                    IsSendingData = false; /*如果发送时socket了中断了,则有可能IsSendingData还是true*/
                }

                //发送数据
                if (!IsSendingData)
                {
                    //开始发送数据
                    IsSendingData = true;
                    SendData(new SendStateObject(_socket));
                }
            }
        }

        private bool ConnectToServer()
        {
            try
            {
                _socket.Connect(_serverIP, _serverPort);
                Command bindCommand = Command.CreateCommand(_bindBody);
                _socket.Send(bindCommand.GetBytes());
                byte[] receiveBuffer = new byte[SgipConfig.MaxBufferCount];
                _socket.ReceiveTimeout = _answerTimeout;
                _socket.Receive(receiveBuffer);
                Command bindRespCommand = Command.ToCommand(receiveBuffer, 0);
                return (((Bind_Resp)bindRespCommand.Body).Result == 0);
            }catch{
                return false;
            }
        }

        private void ReceiveData(ReceiveStateObject obj)
        {
            try
            {
                obj.WorkSocket.BeginReceive(obj.Buffer, 0, obj.Buffer.Length, SocketFlags.None, new AsyncCallback(OnRecieveDataCallBack), obj);
            }catch(Exception e){
                System.Diagnostics.Debug.WriteLine(e);
                SgipHelper.ShutdownAndCloseSocket(obj.WorkSocket);
            }
        }

        private void OnRecieveDataCallBack(IAsyncResult ar)
        {
            ReceiveStateObject obj = (ReceiveStateObject)ar.AsyncState;
            int bytesRead = 0;
            try
            {
                bytesRead = obj.WorkSocket.EndReceive(ar);
            }catch (Exception e){
                System.Diagnostics.Debug.WriteLine(e);
            }
            if (bytesRead <= 0) //远程主机已断开
            {
                SgipHelper.ShutdownAndCloseSocket(obj.WorkSocket);
                return;
            }

            Command cmdAnswer = null;
            try
            {
                cmdAnswer = Command.ToCommand(obj.Buffer, 0);
            }catch (Exception e){
                System.Diagnostics.Debug.WriteLine(e);
            }

            if (cmdAnswer == null) //无法识别的命令
            {
                ReceiveData(obj);
                return;
            }

            //只考虑收到sgip_unbind命令的情况,不考虑unbind_resp, 因为本程序不会主动断开连接
            if (cmdAnswer.Head.CommandID == Command.SGIP_UNBIND)
            {
                try
                {
                    obj.WorkSocket.Send(Command.CreateCommand(new Unbind_Resp()).GetBytes());
                }catch(Exception e){
                    System.Diagnostics.Debug.WriteLine(e);
                }
                SgipHelper.ShutdownAndCloseSocket(obj.WorkSocket);
                return;
            }

            //读取到了服务器返回的数据, 注意考虑:记录这个成功的命令到数据库
            QueueItem Item = _sendList.FindItem(cmdAnswer.Head.SequenceNumber);
            if (Item != null)
            {
                _sendList.Remove(Item);
                try
                {
                    Item.SourceSocket.Send(cmdAnswer.GetBytes());//把应答传给item中的socket
                }catch (Exception e){
                    System.Diagnostics.Debug.WriteLine(e);
                }
            }
            ReceiveData(obj);
        }

        private void RemoveSendingItemAndShutdownSocket(SendStateObject obj)
        {
            _sendList.Remove(obj.Item);
            SgipHelper.ShutdownAndCloseSocket(obj.WorkSocket);
        }

        private void SendData(SendStateObject obj)
        {
            //发送队列已满
            while (_sendList.IsFull())
            {
                Thread.Sleep(2000);
            }
            lock (_cmdQueue)   //保证判断为队列为空和从队列中取出命令是原子的,即:不能让别的线程先取走了,否则就可能发生异常
            {
                if (_cmdQueue.IsQueueEmpty())
                {
                    IsSendingData = false;
                    return;
                }
                obj.Item = _cmdQueue.Dequeue();
            }
            _sendList.Add(obj.Item);

            byte[] byteSend = obj.Item.SourceCommand.GetBytes();
            try
            {
                obj.WorkSocket.BeginSend(byteSend, 0, byteSend.Length, SocketFlags.None, new AsyncCallback(OnSendDataCallBack), obj);
            }catch (Exception e){
                System.Diagnostics.Debug.WriteLine(e);
                RemoveSendingItemAndShutdownSocket(obj);
            }
        }

        private void OnSendDataCallBack(IAsyncResult ar)
        {
            SendStateObject obj = (SendStateObject)ar.AsyncState;
            try
            {
                obj.WorkSocket.EndSend(ar);
                //Trace命令没有应答
                if (obj.Item.SourceCommand.Body is Trace)
                {
                    _sendList.Remove(obj.Item);
                }
                SendData(obj);
            }catch(Exception e){
                System.Diagnostics.Debug.WriteLine(e);
                //注意:Item并没有发送到服务器,请做提交失败的处理
                RemoveSendingItemAndShutdownSocket(obj);
            }          
        }
    }

    public class Transmit
    {
        private List<MiddleClientSocket> _sendSocket;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="serverIP">服务器的IP</param>
        /// <param name="serverPort">服务器所监听的端口</param>
        /// <param name="maxConnection">最多能向服务器建立多少个socket</param>
        /// <param name="nMaxWaitAnswerCommand">一个Socket最大的待应答命令数目</param>
        /// <param name="nAnswerTimeout">命令应答的最大间隔,以秒为单位</param>
        /// <param name="bindBody">向服务器登录的Bind消息体</param>
        /// <param name="queue">所要传送的命令队列, 将从这个命令队列中取出命令发送到服务器</param>
        public Transmit(IPAddress serverIP, int serverPort, int maxConnection, int nMaxWaitAnswerCommand, int nAnswerTimeout,
            Bind bindBody, SocketCommandQueue queue)
        {
            _sendSocket = new List<MiddleClientSocket>(maxConnection);
            for (int i = 0; i < maxConnection; ++i)
            {
                _sendSocket.Add(new MiddleClientSocket(serverIP, serverPort, nMaxWaitAnswerCommand, nAnswerTimeout, bindBody, queue));
            }
        }

        public Transmit(List<MiddleClientSocket> socketList)
        {
            _sendSocket = socketList;
        }

        private void PromptSocketSendData(object state)
        {
            MiddleClientSocket socket = (MiddleClientSocket)state;
            socket.StartSendData();
        }

        /// <summary>
        /// 将队列中的命令传送到服务器
        /// </summary>
        public void SendData()
        {
            foreach (MiddleClientSocket Item in _sendSocket)
            {
                if (!Item.IsSendingData)
                {
                    ThreadPool.QueueUserWorkItem(new WaitCallback(PromptSocketSendData), Item);
                    break;
                }
            }
        }
    }

    /// <summary>
    /// 记录一个用户所登录以后的Sokect
    /// </summary>
    public class UserSockets
    {
        private int _maxSocket; //一个用户所能bind的最多的Socket数
        private Dictionary<string, List<Socket>> _userSockets;
        /// <summary>
        /// 
        /// </summary>
        /// <param name="maxSocket">一个用户最多能bind的socket的个数</param>
        public UserSockets(int maxSocket)
        {
            _maxSocket = maxSocket;
            _userSockets = new Dictionary<string, List<Socket>>();
        }

        /// <summary>
        /// 一个用户所bind的socket数目是否已满了
        /// </summary>
        /// <param name="strUser"></param>
        /// <returns></returns>
        public bool IsUserSocketFull(string strUser)
        {
            lock (this)
            {
                if (strUser == null)
                {
                    return true;
                }
                if (_userSockets.ContainsKey(strUser))
                {
                    return _userSockets[strUser].Count >= _maxSocket;
                }
                return false;
            }
        }

        /// <summary>
        /// 判断一个ip(由ip可以查知是哪一个二级sp)所能连接的socket是否已经满了
        /// </summary>
        /// <param name="strUser"></param>
        /// <returns></returns>
        public bool IsUserSocketFull(IPAddress ip)
        {
            lock (this)
            {
                if (ip == null)
                {
                    return true;
                }
                SubSP sp = SubSPManager.Manager.FindByIP(ip.ToString());
                return (sp == null) ? true : IsUserSocketFull(sp.LoginName);
            }
        }


        public int GetUserSocketCount(string strUser)
        {
            lock (this)
            {
                if ((strUser != null) && (_userSockets.ContainsKey(strUser)))
                {
                    return _userSockets[strUser].Count;
                }
                return 0;
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="strUser"></param>
        /// <param name="socket"></param>
        /// <returns></returns>
        public void Login(string strUser, Socket socket)
        {
            if (IsUserSocketFull(strUser))
            {
                throw new InvalidOperationException("用户所能连接的socket已满!无法再建立连接!");
            }

            lock (this)
            {
                if (strUser != null)
                {
                    if (!_userSockets.ContainsKey(strUser))
                    {
                        _userSockets[strUser] = new List<Socket>(_maxSocket);
                    }
                    _userSockets[strUser].Add(socket);
                }
            }
        }

        public void Logout(string strUser, Socket socket)
        {
            lock (this)
            {
                if ((strUser != null) && (_userSockets.ContainsKey(strUser)))
                {
                    _userSockets[strUser].Remove(socket);
                    //如果用户的socket已没有了,则字典中删除之,且对应的socket队列也删除
                }
            }           
        }

    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -