📄 asyncsocketsession.cs
字号:
namespace NCindy.Session.AIO
{
using NCindy;
using NCindy.Buffer;
using NCindy.Session;
using NCindy.Util;
using NCindy.Util.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
public abstract class AsyncSocketSession : AbstractSession
{
protected const int DEFAULT_BUFFER_SIZE = 0x2000;
protected readonly Socket innerSocket;
protected IPEndPoint localEndPoint;
private static readonly ILogger log = LogFactory.CreateLogger(MethodBase.GetCurrentMethod().ReflectedType);
protected IAsyncResult pendingReceiveResult;
protected long receivedCounter;
protected Queue<IPacket> receiveQueue;
protected IPEndPoint remoteEndPoint;
protected bool reuseSocket;
protected readonly Queue<AsyncFuture> sendQueue;
protected const int SOCKET_TIMEOUT_MICROSECONDS = 0x3e8;
protected const int SOCKET_TIMEOUT_SECONDS = 1;
protected AsyncCallback socketEndRecvCallback;
protected AsyncCallback socketEndSendCallback;
protected readonly object syncRoot;
protected const int UDP_DEFAULT_BUFFER_SIZE = 0x800;
public AsyncSocketSession(Socket sock) : base((sock.ProtocolType == ProtocolType.Tcp) ? SessionType.Tcp : SessionType.Udp)
{
this.sendQueue = new Queue<AsyncFuture>();
this.syncRoot = new object();
if (sock == null)
{
throw new ArgumentNullException("sock");
}
this.innerSocket = sock;
this.InitializeAsyncCallbacks();
}
public AsyncSocketSession(SessionType sessionType, IPEndPoint remoteIPEndPoint) : base(sessionType)
{
this.sendQueue = new Queue<AsyncFuture>();
this.syncRoot = new object();
this.innerSocket = SocketFactory.CreateSocket(sessionType);
this.remoteEndPoint = remoteIPEndPoint;
this.InitializeAsyncCallbacks();
}
public override IFuture Close()
{
IFuture future3;
if (log.IsInfoEnabled)
{
log.Info("Enter Close.");
}
if (!base.IsOpened)
{
AsyncFuture future = new AsyncFuture(this);
future.IsCompleted = true;
future.IsSucceeded = false;
return future;
}
lock (this.syncRoot)
{
if (!base.IsOpened)
{
throw new InvalidOperationException("This session not started.");
}
base.State = SessionState.Closing;
}
try
{
AsyncFuture future2 = new AsyncFuture(this);
if ((this.pendingReceiveResult != null) && (this.pendingReceiveResult.AsyncWaitHandle != null))
{
this.pendingReceiveResult.AsyncWaitHandle.WaitOne(0x3e8, false);
}
this.DoClose();
if (base.filters != null)
{
base.filters.Clear();
}
base.SessionClosed();
future2.IsCompleted = true;
future3 = future2;
}
catch (Exception exception)
{
if (this.innerSocket != null)
{
this.innerSocket.Close(0x3e8);
}
base.SessionCaughtException(exception);
throw;
}
finally
{
base.State = SessionState.Closed;
}
return future3;
}
protected abstract void DoBeginReceive(IBuffer buffer);
protected abstract IAsyncResult DoBeginSend(byte[] content, int offset, int size, object state);
protected abstract void DoClose();
protected abstract IPacket DoEndReceive(IAsyncResult asyncResult);
protected void InitializeAsyncCallbacks()
{
this.socketEndSendCallback = new AsyncCallback(this.SocketEndSend);
this.socketEndRecvCallback = new AsyncCallback(this.SocketEndReceive);
}
protected abstract void InternalOpen(AsyncFuture future);
public override IFuture Open()
{
IFuture future2;
if (base.State != SessionState.Initial)
{
throw new InvalidOperationException("This session already started.");
}
if (this.remoteEndPoint == null)
{
throw new InvalidOperationException("Remote endpoint could not be null.");
}
lock (this.syncRoot)
{
if (base.IsOpened)
{
throw new InvalidOperationException("This session already started.");
}
try
{
AsyncFuture future = new AsyncFuture(this);
this.InternalOpen(future);
future2 = future;
}
catch (Exception exception)
{
base.SessionCaughtException(exception);
if (this.innerSocket == null)
{
throw;
}
try
{
this.innerSocket.Shutdown(SocketShutdown.Both);
}
catch (ObjectDisposedException)
{
}
catch
{
this.innerSocket.Close(1);
}
throw;
}
}
return future2;
}
protected override IFuture Send(object obj, IPacket packet, int priority)
{
AsyncFuture future = new AsyncFuture(this);
future.AssociatedObject = obj;
future.AssociatedPacket = packet;
lock (this.sendQueue)
{
this.sendQueue.Enqueue(future);
if (this.sendQueue.get_Count() > 1)
{
return future;
}
this.SendFirstOfQueue();
}
return future;
}
private void SendFirstOfQueue()
{
if (this.sendQueue.get_Count() > 0)
{
try
{
AsyncFuture future = null;
lock (this.sendQueue)
{
if (this.sendQueue.get_Count() <= 0)
{
return;
}
future = this.sendQueue.Peek();
}
if (future != null)
{
IPacket associatedPacket = future.AssociatedPacket;
base.GetSessionFilterChain(new SendOperateFilter(this, future), FilterChainMode.Send).PacketSend(associatedPacket);
}
}
catch (Exception exception)
{
base.SessionCaughtException(exception);
throw;
}
}
}
internal void SocketBeginReceive()
{
if (log.IsInfoEnabled)
{
log.Info("Enter SocketBeginReceive");
}
try
{
IBuffer buffer = BufferFactory.GetBuffer((base.sessionType == SessionType.Udp) ? 0x800 : 0x2000);
this.DoBeginReceive(buffer);
}
catch (ObjectDisposedException exception)
{
base.SessionCaughtException(exception);
}
catch (Exception exception2)
{
base.SessionCaughtException(exception2);
}
}
protected virtual IAsyncResult SocketBeginSend(IPacket packet, object state)
{
return this.DoBeginSend(packet.Content.GetInnerByteArray(), packet.Content.Position, packet.Content.Remaining, state);
}
protected void SocketEndReceive(IAsyncResult asyncResult)
{
if (log.IsInfoEnabled)
{
log.Info("Enter SocketEndReceive");
}
if (!base.IsOpened)
{
if (log.IsWarnEnabled)
{
log.Warn("Session has not been started.");
}
}
else
{
try
{
IPacket packet = this.DoEndReceive(asyncResult);
if (base.IsOpened)
{
this.SocketBeginReceive();
}
if (packet != null)
{
base.GetSessionFilterChain(FilterChainMode.Receive).PacketReceived(packet);
}
}
catch (RemoteSocketClosedException exception)
{
if (base.IsOpened)
{
log.Warn("Remote Socket Was Closed.", exception);
}
this.Close();
}
catch (Exception exception2)
{
base.SessionCaughtException(exception2);
}
}
}
protected virtual void SocketEndSend(IAsyncResult asyncResult)
{
if (base.IsOpened)
{
AsyncFuture state = asyncResult.AsyncState as AsyncFuture;
if (state != null)
{
IPacket associatedPacket = state.AssociatedPacket;
try
{
int num = this.innerSocket.EndSend(asyncResult);
try
{
IBuffer content = associatedPacket.Content;
content.Position += num;
}
catch (Exception exception)
{
if (log.IsWarnEnabled)
{
log.Warn("Change packet content postion failed.", exception);
}
}
if ((associatedPacket.Content.Remaining > 0) && (base.sessionType != SessionType.Udp))
{
state.AsyncResult = this.SocketBeginSend(associatedPacket, state);
}
else
{
lock (this.sendQueue)
{
this.sendQueue.Dequeue();
}
state.IsSucceeded = true;
this.SendFirstOfQueue();
}
}
catch (Exception exception2)
{
log.Fatal("End received failed.", exception2);
state.IsSucceeded = false;
base.SessionCaughtException(exception2);
}
finally
{
base.GetSessionFilterChain(FilterChainMode.Send).PacketSent(state.AssociatedPacket);
base.GetSessionFilterChain(FilterChainMode.Send).ObjectSent(state.AssociatedObject);
}
}
}
}
internal Socket InnerSocket
{
get
{
return this.innerSocket;
}
}
public IPEndPoint LocalEndPoint
{
get
{
if (this.localEndPoint != null)
{
return this.localEndPoint;
}
lock (this.syncRoot)
{
if (this.localEndPoint == null)
{
IPEndPoint point = this.innerSocket.LocalEndPoint as IPEndPoint;
if (point == null)
{
return new IPEndPoint(IPAddress.Any, 0);
}
if (!IPAddress.Any.Equals(point.Address))
{
this.localEndPoint = point;
return point;
}
if ((this.remoteEndPoint == null) || IPAddress.Any.Equals(this.remoteEndPoint.Address))
{
return point;
}
this.localEndPoint = WinSock2Wrapper.QueryRoutingInterface(this.innerSocket, this.remoteEndPoint);
}
return this.localEndPoint;
}
}
set
{
if (base.IsOpened)
{
throw new InvalidOperationException("Session already opened.");
}
this.localEndPoint = value;
}
}
public int LocalPort
{
get
{
if ((this.innerSocket != null) && (this.innerSocket.LocalEndPoint != null))
{
return ((IPEndPoint) this.innerSocket.LocalEndPoint).Port;
}
return 0;
}
}
public virtual IPEndPoint RemoteEndPoint
{
get
{
return this.remoteEndPoint;
}
set
{
throw new NotSupportedException();
}
}
protected class SendOperateFilter : SessionFilterAdapter
{
protected readonly AsyncFuture future;
protected readonly AsyncSocketSession parentSession;
public SendOperateFilter(AsyncSocketSession session, AsyncFuture future)
{
this.future = future;
this.parentSession = session;
}
public override void PacketSend(ISessionFilterChain filterChain, IPacket packet)
{
if (packet.Content.Remaining > 0)
{
this.future.AsyncResult = this.parentSession.SocketBeginSend(packet, this.future);
}
base.PacketSend(filterChain, packet);
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -