📄 filesender.cs
字号:
namespace Imps.Client.Core.P2P.FileTransportor
{
using Imps.Client.Core.P2P.ICE;
using NCindy;
using NCindy.Buffer;
using NCindy.Session.AIO;
using NCindy.Util.Logging;
using System;
using System.Threading;
public class FileSender : FileTransportorBase
{
private readonly object beginSendLock;
private long completedSize;
private bool handshakedAcked;
private readonly string handshakeToken;
private static readonly ILogger log = LogFactory.CreateLogger(MethodBase.GetCurrentMethod().ReflectedType);
private const int MTU = 0x200;
private readonly ManualResetEvent shutdownEvent;
private bool shutdownReceived;
public FileSender(ISession session, Peer remotePeer, TransportingFile file) : base(session, remotePeer, file)
{
this.shutdownEvent = new ManualResetEvent(false);
this.beginSendLock = new object();
if (log.IsInfoEnabled)
{
log.Info("Init completed size: " + file.CompletedSize);
}
this.handshakeToken = "HANDSHAKE:" + Guid.NewGuid();
this.completedSize = file.CompletedSize;
}
public FileSender(ISession session, Peer remotePeer, string localFilePath) : this(session, remotePeer, localFilePath, (long) 0)
{
}
public FileSender(ISession session, Peer remotePeer, string localFilePath, long completedSize) : base(session, remotePeer, localFilePath, completedSize)
{
this.shutdownEvent = new ManualResetEvent(false);
this.beginSendLock = new object();
if (log.IsInfoEnabled)
{
log.Info("Init completed size: " + completedSize);
}
this.handshakeToken = "HANDSHAKE:" + Guid.NewGuid();
this.completedSize = completedSize;
}
private void DoSend()
{
IBuffer buffer = BufferFactory.GetBuffer(0x200);
lock (base.fileInfo)
{
if (base.fileInfo.InnerFileStream.Position != base.fileInfo.CompletedSize)
{
log.Error("Position error.");
}
int num = base.fileInfo.InnerFileStream.Read(buffer.GetInnerByteArray(), 0, 0x200);
base.fileInfo.CompletedSize += num;
base.FireBlockTransported(Interlocked.Add(ref this.completedSize, (long) num));
if (log.IsInfoEnabled)
{
log.Info(string.Format("Completed size:{0}, total size:{1}, completed precent:{2:F2}%", this.completedSize, base.fileInfo.FileSize, (100 * this.completedSize) / base.fileInfo.FileSize));
}
buffer.Limit = num;
lock (base.sendSyncRoot)
{
base.tranSession.Send(buffer);
}
}
}
protected override void OnObjectReceived(object sender, SessionEventArgs e)
{
IBuffer buffer = e.Obj as IBuffer;
if (buffer != null)
{
string initOffer = FileTransportorBase.Buffer2String(buffer);
if (!string.IsNullOrEmpty(initOffer))
{
if (initOffer.StartsWith(this.handshakeToken))
{
string text2 = this.handshakeToken + P2PManager.Instace.LocalPeer.PeerToken;
if (initOffer == text2)
{
lock (this.beginSendLock)
{
this.handshakedAcked = true;
Thread.Sleep(0x7d0);
}
this.DoSend();
}
}
else if ((!this.shutdownReceived && (buffer.Limit == FileTransportorBase.ShutdownCmdLenght)) && initOffer.StartsWith("SHUTDOWN:"))
{
this.shutdownReceived = true;
base.SendACK(initOffer);
}
}
}
}
private void OnObjectSent(object sender, SessionEventArgs e)
{
if (log.IsInfoEnabled)
{
log.Info("Enter OnObjectSent. Object: " + e.Obj);
}
if ((this.handshakedAcked && this.shutdownReceived) && (base.fileInfo.FileSize == base.fileInfo.CompletedSize))
{
this.shutdownEvent.WaitOne(0x1388, true);
base.FireTransportCompleted();
}
if (!this.handshakedAcked)
{
Thread.Sleep(0x7d0);
lock (this.beginSendLock)
{
if (!this.handshakedAcked)
{
this.SendHandShake();
}
}
}
else if (!this.shutdownReceived)
{
this.ProcessFileData(e);
}
}
private void OnStateChanged(object sender, SessionEventArgs e)
{
if (((e.Session.State != SessionState.Opened) && this.shutdownReceived) && (base.fileInfo.FileSize == base.fileInfo.CompletedSize))
{
this.shutdownEvent.Set();
}
}
private void ProcessFileData(SessionEventArgs e)
{
int limit = ((IBuffer) e.Obj).Limit;
try
{
((IBuffer) e.Obj).Release();
}
catch
{
}
if (log.IsInfoEnabled)
{
log.Info("Sent size: " + limit);
}
if (base.canceled)
{
base.tranSession.Close();
base.fileInfo.Close();
}
else if (this.completedSize == base.fileInfo.FileSize)
{
if (log.IsInfoEnabled)
{
log.Info("Data transport completed.");
}
}
else
{
this.DoSend();
}
}
private void SendHandShake()
{
if (log.IsInfoEnabled)
{
log.Info("Send HandShake.");
}
if (!this.handshakedAcked)
{
lock (base.sendSyncRoot)
{
base.tranSession.Send(FileTransportorBase.String2Buffer(this.handshakeToken));
}
}
}
protected override void Start(object obj)
{
AsyncUDTSession session = (AsyncUDTSession) obj;
base.tranSession = session;
this.completedSize = base.fileInfo.CompletedSize;
base.tranSession.ObjectReceived += new EventHandler<SessionEventArgs>(this, (IntPtr) this.OnObjectReceived);
base.tranSession.ObjectSent += new EventHandler<SessionEventArgs>(this, (IntPtr) this.OnObjectSent);
base.tranSession.StateChanged += new EventHandler<SessionEventArgs>(this, (IntPtr) this.OnStateChanged);
FileDataCodec codec = new FileDataCodec();
base.tranSession.PacketDecoder = codec;
base.tranSession.PacketEncoder = codec;
this.SendHandShake();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -