basesocketconnectionhost.cs
来自「ActiveSync数据同步」· CS 代码 · 共 1,663 行 · 第 1/4 页
CS
1,663 行
}
#endregion
#region FireOnDisconnected
private void FireOnDisconnected(ConnectionEventArgs e)
{
FSocketService.OnDisconnected(e);
}
#endregion
#region FireOnException
internal void FireOnException(BaseSocketConnection connection, Exception ex, bool forceEvent)
{
if (forceEvent || connection.Active)
{
FSocketService.OnException(new ExceptionEventArgs(connection, ex));
}
}
internal void FireOnException(BaseSocketConnection connection, Exception ex)
{
FireOnException(connection, ex, false);
}
#endregion
#endregion
#region Begin Methods
#region BeginSend
/// <summary>
/// Begin send the data.
/// </summary>
internal void BeginSend(BaseSocketConnection connection, byte[] buffer, bool sentByServer)
{
if (!Disposed)
{
try
{
if (connection.Active)
{
if (buffer.Length > FMessageBufferSize)
{
throw new MessageLengthException("Message length is greater than Host maximum message length.");
}
connection.LastAction = DateTime.Now;
MessageBuffer writeMessage = MessageBuffer.GetPacketMessage(connection, buffer);
writeMessage.SentByServer = sentByServer;
lock (connection.WriteQueue)
{
if (connection.WriteQueueHasItems)
{
//----- If the connection is sending, enqueue the message!
connection.WriteQueue.Enqueue(writeMessage);
}
else
{
//----- If the connection is not sending, send the message!
connection.WriteQueueHasItems = true;
if (connection.Stream != null)
{
//----- Ssl!
connection.Stream.BeginWrite(writeMessage.PacketBuffer, writeMessage.PacketOffSet, writeMessage.PacketRemaining, new AsyncCallback(BeginSendCallback), new CallbackData(connection, writeMessage));
}
else
{
//----- Socket!
connection.Socket.BeginSend(writeMessage.PacketBuffer, writeMessage.PacketOffSet, writeMessage.PacketRemaining, SocketFlags.None, new AsyncCallback(BeginSendCallback), new CallbackData(connection, writeMessage));
}
}
}
}
}
catch (Exception ex)
{
FireOnException(connection, ex);
}
}
}
#endregion
#region BeginSendCallback
/// <summary>
/// Send Callback.
/// </summary>
private void BeginSendCallback(IAsyncResult ar)
{
if (!Disposed)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(BeginSendCallbackProcessing), ar);
}
}
#endregion
#region BeginSendCallbackProcessing
/// <summary>
/// Send Callback Processing.
/// </summary>
private void BeginSendCallbackProcessing(object state)
{
if (!Disposed)
{
IAsyncResult ar = (IAsyncResult) state;
BaseSocketConnection connection = null;
MessageBuffer writeMessage = null;
bool CanReadQueue = false;
try
{
CallbackData callbackData = (CallbackData) ar.AsyncState;
writeMessage = callbackData.Buffer;
connection = callbackData.Connection;
if (connection.Active)
{
if (connection.Stream != null)
{
//----- Ssl!
connection.Stream.EndWrite(ar);
FireOnSent(connection, writeMessage.RawBuffer, writeMessage.SentByServer);
CanReadQueue = true;
}
else
{
//----- Socket!
int writeBytes = connection.Socket.EndSend(ar);
if (writeBytes < writeMessage.PacketRemaining)
{
//----- Continue to send until all bytes are sent!
writeMessage.PacketOffSet += writeBytes;
connection.Socket.BeginSend(writeMessage.PacketBuffer, writeMessage.PacketOffSet, writeMessage.PacketRemaining, SocketFlags.None, new AsyncCallback(BeginSendCallback), callbackData);
}
else
{
FireOnSent(connection, writeMessage.RawBuffer, writeMessage.SentByServer);
CanReadQueue = true;
}
}
//----- Check Queue!
if (CanReadQueue)
{
callbackData = null;
writeMessage = null;
lock (connection.WriteQueue)
{
if (connection.WriteQueue.Count > 0)
{
//----- If has items, send it!
MessageBuffer dequeueWriteMessage = connection.WriteQueue.Dequeue();
if (connection.Stream != null)
{
//----- Ssl!
connection.Stream.BeginWrite(dequeueWriteMessage.PacketBuffer, dequeueWriteMessage.PacketOffSet, dequeueWriteMessage.PacketRemaining, new AsyncCallback(BeginSendCallback), new CallbackData(connection, dequeueWriteMessage));
}
else
{
//----- Socket!
connection.Socket.BeginSend(dequeueWriteMessage.PacketBuffer, dequeueWriteMessage.PacketOffSet, dequeueWriteMessage.PacketRemaining, SocketFlags.None, new AsyncCallback(BeginSendCallback), new CallbackData(connection, dequeueWriteMessage));
}
}
else
{
connection.WriteQueueHasItems = false;
}
}
}
}
}
catch (Exception ex)
{
FireOnException(connection, ex);
}
}
}
#endregion
#region BeginReceive
/// <summary>
/// Receive data from connetion.
/// </summary>
internal void BeginReceive(BaseSocketConnection connection)
{
if (!Disposed)
{
try
{
if (connection.Active)
{
lock (connection.SyncReadCount)
{
if (connection.ReadCanEnqueue)
{
if (connection.ReadCount == 0)
{
//----- if the connection is not receiving, start the receive!
MessageBuffer readMessage = new MessageBuffer(FSocketBufferSize);
if (connection.Stream != null)
{
//----- Ssl!
connection.Stream.BeginRead(readMessage.PacketBuffer, readMessage.PacketOffSet, readMessage.PacketRemaining, new AsyncCallback(BeginReadCallback), new CallbackData(connection, readMessage));
}
else
{
//----- Socket!
connection.Socket.BeginReceive(readMessage.PacketBuffer, readMessage.PacketOffSet, readMessage.PacketRemaining, SocketFlags.None, new AsyncCallback(BeginReadCallback), new CallbackData(connection, readMessage));
}
}
//----- Increase the read count!
connection.ReadCount++;
}
}
}
}
catch (Exception ex)
{
FireOnException(connection, ex);
}
}
}
#endregion
#region BeginReadCallback
private void BeginReadCallback(IAsyncResult ar)
{
if (!Disposed)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(BeginReadCallbackProcessing), ar);
}
}
#endregion
#region BeginReadCallbackProcessing
private void BeginReadCallbackProcessing(object state)
{
if (!Disposed)
{
IAsyncResult ar = (IAsyncResult)state;
BaseSocketConnection connection = null;
try
{
CallbackData callbackData = (CallbackData)ar.AsyncState;
connection = callbackData.Connection;
if (connection.Active)
{
int readBytes = 0;
if (connection.Stream != null)
{
//----- Ssl!
readBytes = connection.Stream.EndRead(ar);
}
else
{
//----- Socket!
readBytes = connection.Socket.EndReceive(ar);
}
if (readBytes > 0)
{
ReadBytesFromConnection(callbackData, readBytes);
}
else
{
//----- Is has no data to read then the connection has been terminated!
connection.BeginDisconnect();
}
}
}
catch (Exception ex)
{
FireOnException(connection, ex);
}
}
}
#endregion
#region ReadBytesFromConnection
private void ReadBytesFromConnection(CallbackData callbackData, int readBytes)
{
BaseSocketConnection connection = callbackData.Connection;
MessageBuffer readMessage = callbackData.Buffer;
//----- Has bytes!
connection.LastAction = DateTime.Now;
byte[] rawBuffer = null;
bool socketWasRead = false;
readMessage.PacketOffSet += readBytes;
switch (connection.DelimiterType)
{
case DelimiterType.dtNone:
//----- Message with no delimiter!
rawBuffer = readMessage.GetRawBuffer(readBytes, 0);
break;
case DelimiterType.dtPacketHeader:
//----- Message with packet header!
rawBuffer = ReadMessageWithPacketHeader(connection.Delimiter, callbackData, ref socketWasRead);
break;
case DelimiterType.dtMessageTailExcludeOnReceive:
case DelimiterType.dtMessageTailIncludeOnReceive:
//----- Message with tail!
rawBuffer = ReadMessageWithMessageTail(connection.Delimiter, callbackData, ref socketWasRead);
break;
}
if (rawBuffer != null)
{
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?