📄 asyncsocketsessionacceptor.cs
字号:
namespace NCindy.Session.AIO
{
using NCindy;
using NCindy.Session;
using NCindy.Util;
using NCindy.Util.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
public class AsyncSocketSessionAcceptor : AbstractSessionAcceptor
{
protected readonly AsyncCallback acceptCallback;
protected readonly ManualResetEvent acceptReceivedEvent;
protected const string AsyncEventBitsClassName = "System.Net.Sockets.AsyncEventBits";
protected readonly Timer checkPendingSocketsTimer;
protected const int CONNECTED_NO_DATA_TIMEOUT_MICROSECONDS = 0x1d4c0;
protected const int CONNECTED_NO_DATA_TIMEOUT_SECONDS = 120;
private static readonly object FdAccept;
protected const string FdAcceptFieldName = "FdAccept";
protected const int FIRST_BLOCK_OF_DATA_SIZE = 0;
protected const int INIT_ACCEPT_COUNT = 50;
protected Socket listenerSocket;
protected readonly Thread listenThread;
private static ILogger log = LogFactory.CreateLogger(MethodBase.GetCurrentMethod().ReflectedType);
protected const int MAX_BACKLOG = 200;
protected const string OSSOCKClassName = "System.Net.UnsafeNclNativeMethods+OSSOCK";
protected LinkedList<Socket> pendingSockets;
protected Queue<Socket> reuseSocketQueue;
protected readonly LightLock reuseSocketQueueLock;
protected const string SafeCloseSocketClassName = "System.Net.SafeCloseSocket";
protected const string SafeCloseSocketHandleField = "m_Handle";
protected const int SELECT_TIMEOUT = 500;
private static readonly MethodInfo WSAEventSelectMethod;
protected const string WSAEventSelectMethodName = "WSAEventSelect";
static AsyncSocketSessionAcceptor()
{
try
{
Assembly assembly = Assembly.GetAssembly(typeof(Socket));
Type type = assembly.GetType("System.Net.UnsafeNclNativeMethods+OSSOCK");
Type[] types = new Type[] { assembly.GetType("System.Net.SafeCloseSocket"), typeof(SafeHandle), assembly.GetType("System.Net.Sockets.AsyncEventBits") };
FdAccept = GetMemberByReflection(assembly, "System.Net.Sockets.AsyncEventBits", "FdAccept", null, BindingFlags.GetField);
BindingFlags bindingAttr = BindingFlags.InvokeMethod | BindingFlags.NonPublic | BindingFlags.Static;
WSAEventSelectMethod = type.GetMethod("WSAEventSelect", bindingAttr, null, types, null);
}
catch (Exception exception)
{
Trace.WriteLine(exception);
throw;
}
}
protected AsyncSocketSessionAcceptor()
{
this.reuseSocketQueueLock = new LightLock();
this.reuseSocketQueue = new Queue<Socket>();
this.pendingSockets = new LinkedList<Socket>();
base.sessionType = SessionType.Tcp;
this.acceptReceivedEvent = new ManualResetEvent(false);
this.acceptCallback = new AsyncCallback(this.SocketEndAccept);
this.listenThread = new Thread(new ThreadStart(this.ListenProcess));
this.checkPendingSocketsTimer = new Timer(new TimerCallback(this.CheckPendingSocketsTimeout), null, 0, 0x1d4c0);
}
public AsyncSocketSessionAcceptor(int port) : this(new IPEndPoint(IPAddress.Any, port))
{
}
public AsyncSocketSessionAcceptor(IPEndPoint listenEndPoint) : this()
{
base.ListenEndPoint = listenEndPoint;
}
protected void CheckPendingSocketsTimeout(object arg)
{
try
{
LinkedList<Socket> list = new LinkedList<Socket>();
lock (this.pendingSockets)
{
LinkedList<Socket>.Enumerator enumerator = this.pendingSockets.GetEnumerator();
try
{
while (enumerator.MoveNext())
{
Socket sock = enumerator.get_Current();
try
{
if (WinSock2Wrapper.GetConnectTime(sock) >= 120)
{
list.AddLast(sock);
this.pendingSockets.Remove(sock);
}
continue;
}
catch
{
continue;
}
}
}
finally
{
enumerator.Dispose();
}
}
LinkedList<Socket>.Enumerator enumerator2 = list.GetEnumerator();
try
{
while (enumerator2.MoveNext())
{
Socket socket2 = enumerator2.get_Current();
try
{
socket2.Disconnect(true);
}
catch
{
}
this.reuseSocketQueue.Enqueue(socket2);
}
}
finally
{
enumerator2.Dispose();
}
}
catch
{
}
}
public void CheckSessionStatus(CheckSessionCallBack checkSession)
{
base.connectedSessionLock.AcquireReaderLock(0x3e8);
try
{
Dictionary<IPEndPoint, ISession>.Enumerator enumerator = base.connectedSessions.GetEnumerator();
try
{
while (enumerator.MoveNext())
{
KeyValuePair<IPEndPoint, ISession> pair = enumerator.get_Current();
try
{
checkSession(pair.get_Value());
continue;
}
catch
{
continue;
}
}
}
finally
{
enumerator.Dispose();
}
}
finally
{
base.connectedSessionLock.ReleaseReaderLock();
}
}
public override void Close()
{
try
{
if (base.started)
{
base.started = false;
this.listenThread.Join(0x3e8);
this.listenerSocket.Close();
}
base.connectedSessionLock.AcquireReaderLock(0x3e8);
try
{
Dictionary<IPEndPoint, ISession>.Enumerator enumerator = base.connectedSessions.GetEnumerator();
try
{
while (enumerator.MoveNext())
{
KeyValuePair<IPEndPoint, ISession> pair = enumerator.get_Current();
try
{
pair.get_Value().Close();
continue;
}
catch (Exception exception)
{
log.Warn("Close connected session failed.", exception);
continue;
}
}
}
finally
{
enumerator.Dispose();
}
}
finally
{
base.connectedSessionLock.ReleaseReaderLock();
}
}
catch (Exception exception2)
{
this.CaughtException(exception2);
}
}
private static object GetMemberByReflection(Type type, string fieldName, object target, BindingFlags bindingFlags)
{
return type.InvokeMember(fieldName, bindingFlags, null, target, null);
}
private static object GetMemberByReflection(Assembly assembly, string typeName, string fieldName, object target, BindingFlags bindingFlags)
{
return GetMemberByReflection(assembly.GetType(typeName), fieldName, target, bindingFlags);
}
protected void ListenProcess()
{
while (base.started)
{
try
{
if (this.acceptReceivedEvent.WaitOne(500, true))
{
try
{
this.PostBeginAccept(base.backLog / 4);
}
finally
{
this.acceptReceivedEvent.Reset();
}
}
continue;
}
catch (Exception exception)
{
log.Error("Some thing wrong in listen loop.", exception);
continue;
}
}
}
protected override void OnSessionClosed(object sender, SessionEventArgs e)
{
base.OnSessionClosed(sender, e);
try
{
this.reuseSocketQueueLock.Enter();
this.reuseSocketQueue.Enqueue(((AsyncSocketSession) e.Session).InnerSocket);
}
finally
{
this.reuseSocketQueueLock.Exit();
}
}
private void PostBeginAccept(int count)
{
for (int i = 0; i < count; i++)
{
Socket socket = null;
if (this.reuseSocketQueue.get_Count() > 0)
{
socket = this.reuseSocketQueue.Dequeue();
}
else
{
socket = SocketFactory.CreateSocket(base.sessionType);
}
this.listenerSocket.BeginAccept(socket, 0, this.acceptCallback, this.listenerSocket);
lock (this.pendingSockets)
{
this.pendingSockets.AddLast(socket);
}
}
}
private static SocketError RegisterForFdAccept(Socket socket, SafeHandle eventHandle)
{
object obj2 = GetMemberByReflection(socket.GetType(), "m_Handle", socket, BindingFlags.GetField | BindingFlags.NonPublic | BindingFlags.Instance);
object[] parameters = new object[] { obj2, eventHandle, FdAccept };
return (SocketError) WSAEventSelectMethod.Invoke(null, parameters);
}
protected override void RemoveConnectedSession(ISession session)
{
base.RemoveConnectedSession(session);
try
{
this.reuseSocketQueue.Enqueue(((AsyncSocketSession) session).InnerSocket);
}
catch
{
}
}
protected void SetSocketOptions()
{
this.listenerSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, base.reuseAddress);
}
protected void SocketEndAccept(IAsyncResult asyncResult)
{
try
{
byte[] buffer = null;
int num = 0;
Socket socket = this.listenerSocket.EndAccept(ref buffer, ref num, asyncResult);
base.IncrementAcceptedCount();
AsyncTcpSession session = new AsyncTcpSession(socket);
session.PacketDecoder = base.packetDecoder;
session.PacketEncoder = base.packetEncoder;
SessionAcceptedEventArgs sessionAcceptedEventArgs = new SessionAcceptedEventArgs(this, session);
session.Closed += base.sessionClosedEventHandler;
session.ExceptionCaught += base.sessionExceptionCaughtEventHandler;
lock (this.pendingSockets)
{
this.pendingSockets.Remove(socket);
}
session.SocketBeginReceive();
this.AcceptedSession(sessionAcceptedEventArgs);
}
catch (Exception exception)
{
this.CaughtException(exception);
}
finally
{
try
{
this.PostBeginAccept(1);
}
catch (Exception exception2)
{
this.CaughtException(exception2);
}
}
}
public override void Start()
{
try
{
base.started = true;
this.listenerSocket = SocketFactory.CreateSocket(base.SessionType);
this.SetSocketOptions();
this.listenerSocket.Bind(base.ListenEndPoint);
base.backLog = Math.Min(200, base.backLog);
this.listenerSocket.Listen(base.Backlog);
this.PostBeginAccept(50);
SocketError error = RegisterForFdAccept(this.listenerSocket, this.acceptReceivedEvent.get_SafeWaitHandle());
if (error != null)
{
throw new Exception("WSAEventSelect failed. Socket error: " + error);
}
this.listenThread.Start();
}
catch (Exception exception)
{
this.CaughtException(exception);
throw new Exception("Async socket session acceptor start failed.", exception);
}
}
public delegate void CheckSessionCallBack(ISession session);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -