basesocketconnectionhost.cs

来自「ActiveSync数据同步」· CS 代码 · 共 1,663 行 · 第 1/4 页

CS
1,663
字号
        }

        #endregion

        #region FireOnDisconnected

        private void FireOnDisconnected(ConnectionEventArgs e)
        {
            
          FSocketService.OnDisconnected(e);

        }

        #endregion

        #region FireOnException

        internal void FireOnException(BaseSocketConnection connection, Exception ex, bool forceEvent)
        {
            
          if (forceEvent || connection.Active)
          {
            FSocketService.OnException(new ExceptionEventArgs(connection, ex));
          }

        }

        internal void FireOnException(BaseSocketConnection connection, Exception ex)
        {
          FireOnException(connection, ex, false);
        }

        #endregion

        #endregion

        #region Begin Methods

        #region BeginSend

        /// <summary>
        /// Begin send the data.
        /// </summary>
        internal void BeginSend(BaseSocketConnection connection, byte[] buffer, bool sentByServer)
        {

            if (!Disposed)
            {

                try
                {

                    if (connection.Active)
                    {

                        if (buffer.Length > FMessageBufferSize) 
                        {
                            throw new MessageLengthException("Message length is greater than Host maximum message length.");
                        }

                        connection.LastAction = DateTime.Now;

                        MessageBuffer writeMessage = MessageBuffer.GetPacketMessage(connection, buffer);
                        writeMessage.SentByServer = sentByServer;

                        lock (connection.WriteQueue)
                        {

                            if (connection.WriteQueueHasItems)
                            {
                                //----- If the connection is sending, enqueue the message!
                                connection.WriteQueue.Enqueue(writeMessage);
                            }
                            else
                            {

                                //----- If the connection is not sending, send the message!
                                connection.WriteQueueHasItems = true;

                                if (connection.Stream != null)
                                {
                                    //----- Ssl!
                                    connection.Stream.BeginWrite(writeMessage.PacketBuffer, writeMessage.PacketOffSet, writeMessage.PacketRemaining, new AsyncCallback(BeginSendCallback), new CallbackData(connection, writeMessage));
                                }
                                else
                                {
                                    //----- Socket!
                                    connection.Socket.BeginSend(writeMessage.PacketBuffer, writeMessage.PacketOffSet, writeMessage.PacketRemaining, SocketFlags.None, new AsyncCallback(BeginSendCallback), new CallbackData(connection, writeMessage));
                                }

                            }

                        }

                    }

                }
                catch (Exception ex)
                {
                    FireOnException(connection, ex);
                }

            }

        }

        #endregion

        #region BeginSendCallback

        /// <summary>
        /// Send Callback.
        /// </summary>
        private void BeginSendCallback(IAsyncResult ar)
        {

            if (!Disposed)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(BeginSendCallbackProcessing), ar);
            }

        }

        #endregion

        #region BeginSendCallbackProcessing

        /// <summary>
        /// Send Callback Processing.
        /// </summary>
        private void BeginSendCallbackProcessing(object state)
        {

            if (!Disposed)
            {

                IAsyncResult ar = (IAsyncResult) state;
                BaseSocketConnection connection = null;
                MessageBuffer writeMessage = null;
                bool CanReadQueue = false;

                try
                {

                    CallbackData callbackData = (CallbackData) ar.AsyncState;

                    writeMessage = callbackData.Buffer;
                    connection = callbackData.Connection;

                    if (connection.Active)
                    {

                        if (connection.Stream != null)
                        {

                            //----- Ssl!
                            connection.Stream.EndWrite(ar);
                            FireOnSent(connection, writeMessage.RawBuffer, writeMessage.SentByServer);
                            CanReadQueue = true;

                        }
                        else
                        {

                            //----- Socket!
                            int writeBytes = connection.Socket.EndSend(ar);

                            if (writeBytes < writeMessage.PacketRemaining)
                            {
                                //----- Continue to send until all bytes are sent!
                                writeMessage.PacketOffSet += writeBytes;
                                connection.Socket.BeginSend(writeMessage.PacketBuffer, writeMessage.PacketOffSet, writeMessage.PacketRemaining, SocketFlags.None, new AsyncCallback(BeginSendCallback), callbackData);
                            }
                            else
                            {
                                
                                FireOnSent(connection, writeMessage.RawBuffer, writeMessage.SentByServer);
                                CanReadQueue = true;
                            }

                        }

                        //----- Check Queue!
                        if (CanReadQueue)
                        {

                            callbackData = null;
                            writeMessage = null;

                            lock (connection.WriteQueue)
                            {
                                if (connection.WriteQueue.Count > 0)
                                {

                                    //----- If has items, send it!
                                    MessageBuffer dequeueWriteMessage = connection.WriteQueue.Dequeue();

                                    if (connection.Stream != null)
                                    {
                                        //----- Ssl!
                                        connection.Stream.BeginWrite(dequeueWriteMessage.PacketBuffer, dequeueWriteMessage.PacketOffSet, dequeueWriteMessage.PacketRemaining, new AsyncCallback(BeginSendCallback), new CallbackData(connection, dequeueWriteMessage));
                                    }
                                    else
                                    {
                                        //----- Socket!
                                        connection.Socket.BeginSend(dequeueWriteMessage.PacketBuffer, dequeueWriteMessage.PacketOffSet, dequeueWriteMessage.PacketRemaining, SocketFlags.None, new AsyncCallback(BeginSendCallback), new CallbackData(connection, dequeueWriteMessage));
                                    }

                                }
                                else
                                {
                                    connection.WriteQueueHasItems = false;
                                }

						    }

                        }

                    }

                }
                catch (Exception ex)
                {
                    FireOnException(connection, ex);
                }

            }

        }

        #endregion

        #region BeginReceive

        /// <summary>
        /// Receive data from connetion.
        /// </summary>
        internal void BeginReceive(BaseSocketConnection connection)
        {

            if (!Disposed)
            {

                try
                {

                    if (connection.Active)
                    {

                        lock (connection.SyncReadCount)
                        {

                            if (connection.ReadCanEnqueue)
                            {

                                if (connection.ReadCount == 0)
                                {

                                    //----- if the connection is not receiving, start the receive!
                                    MessageBuffer readMessage = new MessageBuffer(FSocketBufferSize);

                                    if (connection.Stream != null)
                                    {
                                        //----- Ssl!
                                        connection.Stream.BeginRead(readMessage.PacketBuffer, readMessage.PacketOffSet, readMessage.PacketRemaining, new AsyncCallback(BeginReadCallback), new CallbackData(connection, readMessage));
                                    }
                                    else
                                    {
                                        //----- Socket!
                                        connection.Socket.BeginReceive(readMessage.PacketBuffer, readMessage.PacketOffSet, readMessage.PacketRemaining, SocketFlags.None, new AsyncCallback(BeginReadCallback), new CallbackData(connection, readMessage));
                                    }

                                }

                                //----- Increase the read count!
                                connection.ReadCount++;

                            }

                        }

                    }

                }
                catch (Exception ex)
                {
                    FireOnException(connection, ex);
                }

            }

        }

        #endregion

        #region BeginReadCallback

        private void BeginReadCallback(IAsyncResult ar)
        {

            if (!Disposed)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(BeginReadCallbackProcessing), ar);
            }

        }

        #endregion

        #region BeginReadCallbackProcessing

        private void BeginReadCallbackProcessing(object state)
        {
            
            if (!Disposed)
            {

                IAsyncResult ar = (IAsyncResult)state;

                BaseSocketConnection connection = null;

                try
                {

                    CallbackData callbackData = (CallbackData)ar.AsyncState;

                    connection = callbackData.Connection;

                    if (connection.Active)
                    {

                        int readBytes = 0;

                        if (connection.Stream != null)
                        {
                            //----- Ssl!
                            readBytes = connection.Stream.EndRead(ar);
                        }
                        else
                        {
                            //----- Socket!
                            readBytes = connection.Socket.EndReceive(ar);
                        }

                        if (readBytes > 0)
                        {

                            ReadBytesFromConnection(callbackData, readBytes);

                        }
                        else
                        {
                            //----- Is has no data to read then the connection has been terminated!
                            connection.BeginDisconnect();
                        }

                    }
                }
                catch (Exception ex)
                {
                    FireOnException(connection, ex);
                }

            }

        }

        #endregion

        #region ReadBytesFromConnection

        private void ReadBytesFromConnection(CallbackData callbackData, int readBytes)
        {

            BaseSocketConnection connection = callbackData.Connection;
            MessageBuffer readMessage = callbackData.Buffer;

            //----- Has bytes!
            connection.LastAction = DateTime.Now;

            byte[] rawBuffer = null;
            bool socketWasRead = false;
            
            readMessage.PacketOffSet += readBytes;

            switch (connection.DelimiterType)
            {

                case DelimiterType.dtNone:

                    //----- Message with no delimiter!
                    rawBuffer = readMessage.GetRawBuffer(readBytes, 0);

                    break;

                case DelimiterType.dtPacketHeader:

                    //----- Message with packet header!
                    rawBuffer = ReadMessageWithPacketHeader(connection.Delimiter, callbackData, ref socketWasRead);

                    break;

                case DelimiterType.dtMessageTailExcludeOnReceive:
                case DelimiterType.dtMessageTailIncludeOnReceive:


                    //----- Message with tail!
                    rawBuffer = ReadMessageWithMessageTail(connection.Delimiter, callbackData, ref socketWasRead);

                    break;

            }

            if (rawBuffer != null)
            {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?