📄 asyncudtsession.cs
字号:
namespace NCindy.Session.AIO
{
using NCindy;
using NCindy.Buffer;
using NCindy.Packet;
using NCindy.Protocol.UDT;
using NCindy.Session;
using NCindy.Util.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
public class AsyncUDTSession : AbstractSession
{
protected const int DEFAULT_BUFFER_SIZE = 0x2000;
protected UDTSocket innerSocket;
private readonly bool isSender;
protected readonly UDTSocket listenSocket;
protected IPEndPoint localEndPoint;
private static readonly ILogger log = LogFactory.CreateLogger(MethodBase.GetCurrentMethod().ReflectedType);
protected IAsyncResult pendingReceiveResult;
protected long receivedCounter;
protected Queue<IPacket> receiveQueue;
private readonly object recvSyncRoot;
protected IPEndPoint remoteEndPoint;
protected bool reuseSocket;
protected readonly Queue<AsyncFuture> sendQueue;
private readonly object sendSyncRoot;
protected const int SOCKET_TIMEOUT_MICROSECONDS = 0x3e8;
protected const int SOCKET_TIMEOUT_SECONDS = 1;
protected AsyncCallback socketEndRecvCallback;
protected AsyncCallback socketEndSendCallback;
private ManualResetEvent startWaitHandle;
protected readonly object syncRoot;
private int tryConnectTimes;
public AsyncUDTSession(IPEndPoint localIPEndPoint, IPEndPoint remoteIPEndPoint, bool isSender) : base(SessionType.Udp)
{
this.sendQueue = new Queue<AsyncFuture>();
this.syncRoot = new object();
this.sendSyncRoot = new object();
this.recvSyncRoot = new object();
this.isSender = isSender;
this.localEndPoint = localIPEndPoint;
if (!isSender)
{
this.innerSocket = new UDTSocket(AddressFamily.InterNetwork);
}
else
{
this.listenSocket = new UDTSocket(AddressFamily.InterNetwork);
}
this.remoteEndPoint = remoteIPEndPoint;
this.InitializeAsyncCallbacks();
}
public override IFuture Close()
{
IFuture future3;
if (log.IsInfoEnabled)
{
log.Info("Enter Close.");
}
if (!base.IsOpened)
{
AsyncFuture future = new AsyncFuture(this);
future.IsCompleted = true;
future.IsSucceeded = false;
return future;
}
lock (this.syncRoot)
{
if (!base.IsOpened)
{
throw new InvalidOperationException("This session not started.");
}
base.State = SessionState.Closing;
}
try
{
AsyncFuture future2 = new AsyncFuture(this);
if (this.pendingReceiveResult != null)
{
this.pendingReceiveResult.AsyncWaitHandle.WaitOne(0x3e8, false);
}
this.DoClose();
if (base.filters != null)
{
base.filters.Clear();
}
base.SessionClosed();
future2.IsCompleted = true;
future3 = future2;
}
catch (Exception exception)
{
if (this.innerSocket != null)
{
this.innerSocket.Close();
}
base.SessionCaughtException(exception);
throw;
}
finally
{
base.State = SessionState.Closed;
}
return future3;
}
protected void DoBeginReceive(IBuffer buffer)
{
this.pendingReceiveResult = this.innerSocket.BeginReceive(buffer.GetInnerByteArray(), 0, 0x2000, SocketFlags.None, this.socketEndRecvCallback, buffer);
}
protected IAsyncResult DoBeginSend(byte[] content, int offset, int size, object state)
{
if (log.IsInfoEnabled)
{
log.Info(string.Format("Send content, lenght:{0}, offset:{1}, size:{2}", content.Length, offset, size));
}
lock (this.sendSyncRoot)
{
SocketError error;
return this.innerSocket.BeginSend(content, offset, size, SocketFlags.None, out error, this.socketEndSendCallback, state);
}
}
protected void DoClose()
{
this.innerSocket.Shutdown(SocketShutdown.Both);
this.innerSocket.Close();
}
protected IPacket DoEndReceive(IAsyncResult asyncResult)
{
IBuffer content = (IBuffer) asyncResult.AsyncState;
int num = this.innerSocket.EndReceive(asyncResult);
if (log.IsInfoEnabled)
{
log.Info(string.Format("Received size: {0}", num));
}
if (num <= 0)
{
if (log.IsWarnEnabled)
{
log.Warn("Received 0 byte from " + this.remoteEndPoint);
}
content.Release();
throw new RemoteSocketClosedException();
}
content.Limit = num;
return new DefaultPacket(content, this.remoteEndPoint);
}
protected void InitializeAsyncCallbacks()
{
this.socketEndSendCallback = new AsyncCallback(this.SocketEndSend);
this.socketEndRecvCallback = new AsyncCallback(this.SocketEndReceive);
}
protected void InternalOpen(AsyncFuture future)
{
this.startWaitHandle = new ManualResetEvent(false);
future.AddWaitHandle(this.startWaitHandle);
if (!this.isSender)
{
this.innerSocket.Bind(this.LocalEndPoint);
this.innerSocket.BeginConnect(this.remoteEndPoint, new AsyncCallback(this.SocketEndConnect), future);
}
else
{
this.listenSocket.Bind(this.localEndPoint);
this.listenSocket.Listen(1);
this.listenSocket.BeginAccept(new AsyncCallback(this.SocketEndAccept), future);
}
}
public override IFuture Open()
{
IFuture future2;
if (base.State != SessionState.Initial)
{
throw new InvalidOperationException("This session already started.");
}
if (this.remoteEndPoint == null)
{
throw new InvalidOperationException("Remote endpoint could not be null.");
}
lock (this.syncRoot)
{
if (base.IsOpened)
{
throw new InvalidOperationException("This session already started.");
}
try
{
AsyncFuture future = new AsyncFuture(this);
this.InternalOpen(future);
future2 = future;
}
catch (Exception exception)
{
base.SessionCaughtException(exception);
if (this.innerSocket == null)
{
throw;
}
try
{
this.innerSocket.Shutdown(SocketShutdown.Both);
}
catch (ObjectDisposedException)
{
}
catch
{
this.innerSocket.Close();
}
throw;
}
}
return future2;
}
protected override IFuture Send(object obj, IPacket packet, int priority)
{
AsyncFuture future = new AsyncFuture(this);
future.AssociatedObject = obj;
future.AssociatedPacket = packet;
lock (this.sendQueue)
{
this.sendQueue.Enqueue(future);
if (this.sendQueue.get_Count() > 1)
{
log.Error("Queue count > 1");
return future;
}
this.SendFirstOfQueue();
}
return future;
}
private void SendFirstOfQueue()
{
if (this.sendQueue.get_Count() > 0)
{
try
{
AsyncFuture future = null;
lock (this.sendQueue)
{
if (this.sendQueue.get_Count() <= 0)
{
return;
}
future = this.sendQueue.Peek();
}
if (future != null)
{
IPacket associatedPacket = future.AssociatedPacket;
base.GetSessionFilterChain(new SendOperateFilter(this, future), FilterChainMode.Send).PacketSend(associatedPacket);
}
}
catch (Exception exception)
{
base.SessionCaughtException(exception);
}
}
}
internal void SocketBeginReceive()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -