📄 cmpp.cs
字号:
/// <summary>
/// 记录最后接收到网关数据的时间,否则退出
/// </summary>
private void RegisterLastRecvISMGMsgTime()
{
this.RWL_LastRecvISMGTime.AcquireWriterLock(-1);
this.LastRecvISMGTime=DateTime.Now;
this.RWL_LastRecvISMGTime.ReleaseWriterLock();
Thread.Sleep(1);
}
/// <summary>
/// 记录最后发送数据时间避免重复发送
/// </summary>
private void RegisnterLastSentISMGMsgTime()
{
this.RWL_LastSentISMGTime.AcquireWriterLock(-1);
this.LastSentISMGTime=DateTime.Now;
this.RWL_LastSentISMGTime.ReleaseWriterLock();
Thread.Sleep(1);
}
/// <summary>
/// 检查当前TCP是否有用
/// </summary>
/// <returns></returns>
private bool IfTCPCanUse()
{
this.RWL_LastRecvISMGTime.AcquireReaderLock(-1);
TimeSpan ts=DateTime.Now-this.LastRecvISMGTime;
this.RWL_LastRecvISMGTime.ReleaseReaderLock();
Thread.Sleep(1);
if(ts.TotalSeconds>=180)
{
return false;
}
else
{
return true;
}
}
private bool IfPingTime()
{
this.RWL_LastSentISMGTime.AcquireReaderLock(-1);
TimeSpan ts=DateTime.Now-this.LastSentISMGTime;
this.RWL_LastSentISMGTime.ReleaseReaderLock();
Thread.Sleep(1);
if(ts.TotalSeconds>=10)
{
return true;
}
else
{
return false;
}
}
/// <summary>
/// 使用线程将缓冲队列里的数据重新发送
/// </summary>
private void SetTempQueueItemToOutQueue()
{
int i=0;
while(this.GetIfCanAddItemToOutQueue())
{
try
{
System.Messaging.Message msg=this.MSMQ_CMPP3GateWayService_TempQueue.Receive(TimeSpan.FromMilliseconds(300));
if(msg!=null)
{
i=0;
try
{
QueueItem q=(QueueItem)msg.Body;
uint seq=this.getNextSequence();
MSG.CMPP_Submit submit=(MSG.CMPP_Submit)q.Msg;
submit.SetNewSequence(seq);
q.MsgSequence=seq;
q.MsgType=(uint)CMPP3.Client.MSG.CMPP_COMMAND_ID.CMPP_SUBMIT;
this.AddItemToOutQueueToGateWay(q);
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
Client.ErrorEventArgs ea=new ErrorEventArgs();
ea.MyMessage="将缓冲队列中数据倒入待发送队列时失败,函数SetBufferQueueItemToOutQueue()";
ea.ErrorMessage=ex.Message;
this.onErrorHandler(this,ea);
}
}
}
else
{
i++;
}
}
catch
{
i++;
}
if(i>=3)
{
break;
}
}
}
private void AddItemToOutQueueToGateWay(QueueItem q)
{
this.RWL_OutQueue.AcquireWriterLock(-1);
this.MSMQ_CMPP3GateWayService_OutQueue.Send(q);
this.RWL_OutQueue.ReleaseWriterLock();
Thread.Sleep(1);
}
private void DeamonThread()
{
Thread thSetBufQueueToOutQueue=new Thread(new ThreadStart(this.SetTempQueueItemToOutQueue));
thSetBufQueueToOutQueue.Start();
//Thread.Sleep(10000);
int Tcount=0;
this.StartRecvAndSendThread();
this.State=CMPPState.Started;//启动成功
while(this.State==CMPPState.Started)
{
Tcount++;
if(this.IfTCPCanUse())
{
if(this.IfPingTime())
{
this.Ping();
this.RegisnterLastSentISMGMsgTime();
}
if(Tcount>=50)
{
Tcount=0;
this.CheckResend();
}
}
else
{
//TCP失效
ErrorEventArgs ea=new ErrorEventArgs();
ea.ErrorMessage="TCP已经失效";
ea.MyMessage="已经超过180S没有接收到网关的任何信息,TCP断开连接,函数DeamonThread()";
if(this.onErrorHandler!=null)
{
this.onErrorHandler(this,ea);
}
CMPP3.Client.SocketClosedEventArgs e=new SocketClosedEventArgs(this.OutQueue,this.WaitQueue);
if(this.onSocketCloseHandler!=null)
{
this.onSocketCloseHandler(this,e);
}
this._Stop(2);
}
Thread.Sleep(1000);
}
}
/// <summary>
/// 停止服务
/// </summary>
/// <param name="StopType">1-外部停止,2-内部异常停止,3-网关请求停止,4-收到网关回复自动停止</param>
private void _Stop(int StopType)
{
switch(StopType)
{
case 1://外部手动停止
{
uint seq=this.getNextSequence();
MSG.CMPP_Terminate terminate=new CMPP3.Client.MSG.CMPP_Terminate(seq);
QueueItem q=new QueueItem();
q.InQueueTime=DateTime.Now;
q.Msg=terminate;
q.MsgSequence=seq;
q.MsgType=(uint)CMPP3.Client.MSG.CMPP_COMMAND_ID.CMPP_TERMINATE;
//不走队列,直接发送
byte[] bs=terminate.ToBytes();
this.RWL_ServerSocket.AcquireWriterLock(-1);
try
{
this.SeverSocket.BeginSend(bs,0,bs.Length,SocketFlags.None,new AsyncCallback(this.SentISMGMsgCallBack),q);
}
catch
{}
this.RWL_ServerSocket.ReleaseWriterLock();
Thread.Sleep(1);
}
break;
case 2://内部异常停止
{
this.State=CMPPState.Stopping;
try
{
this.thRecvMsg.Abort();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="停止接收线程时出现异常,函数:_Stop();Type=2";
}
}
// try
// {
// this.thSendMsg.Abort();
// }
// catch(Exception ex)
// {
// if(this.onErrorHandler!=null)
// {
// ErrorEventArgs e=new ErrorEventArgs();
// e.ErrorMessage=ex.Message;
// e.ExceptionObject=ex;
// e.MyMessage="停止发送线程时出现异常,函数:_Stop();Type=2";
// }
// }
// try
// {
// this.thDeamon.Abort();
// }
// catch(Exception ex)
// {
// if(this.onErrorHandler!=null)
// {
// ErrorEventArgs e=new ErrorEventArgs();
// e.ErrorMessage=ex.Message;
// e.ExceptionObject=ex;
// e.MyMessage="停止监控线程时出现异常,函数:_Stop();Type=2";
// }
// }
try
{
this.SeverSocket.Close();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="关闭套接字时出现异常,函数:_Stop();Type=2";
}
}
this.SetIfCanProc(false);//停止处理数据
this.SetIfCanRecvItemFromOutQueue(false);//停止从待发送队列读取数据
this.SetIfCanAddItemToOutQueue(false);//停止向待发送队列发送数据
this.SetOutQueueItemToTempQueue();//将待发送队列里的数据放入缓冲队列
this.State=CMPPState.Stopped;
this.onRequestRestartHandler(this,null);
}
break;
case 3://网关请求停止
{
this.State=CMPPState.Stopping;
try
{
this.thRecvMsg.Abort();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="停止接收线程时出现异常,函数:_Stop();Type=3";
}
}
try
{
this.thSendMsg.Abort();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="停止发送线程时出现异常,函数:_Stop();Type=3";
}
}
try
{
this.thDeamon.Abort();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="停止监控线程时出现异常,函数:_Stop();Type=3";
}
}
try
{
this.SeverSocket.Close();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="关闭套接字时出现异常,函数:_Stop();Type=3";
}
}
this.SetIfCanProc(false);//停止处理数据
this.SetIfCanRecvItemFromOutQueue(false);//停止从待发送队列读取数据
this.SetIfCanAddItemToOutQueue(false);//停止向待发送队列发送数据
this.SetOutQueueItemToTempQueue();//将待发送队列里的数据放入缓冲队列
this.State=CMPPState.Stopped;
this.onRequestRestartHandler(this,null);
}
break;
case 4://接收到网关回复,自动停止
{
this.State=CMPPState.Stopping;
try
{
this.thRecvMsg.Abort();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="停止接收线程时出现异常,函数:_Stop();Type=4";
}
}
try
{
this.thSendMsg.Abort();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="停止发送线程时出现异常,函数:_Stop();Type=3";
}
}
try
{
this.thDeamon.Abort();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="停止监控线程时出现异常,函数:_Stop();Type=3";
}
}
try
{
this.SeverSocket.Close();
}
catch(Exception ex)
{
if(this.onErrorHandler!=null)
{
ErrorEventArgs e=new ErrorEventArgs();
e.ErrorMessage=ex.Message;
e.ExceptionObject=ex;
e.MyMessage="关闭套接字时出现异常,函数:_Stop();Type=3";
}
}
this.SetIfCanProc(false);//停止处理数据
this.SetIfCanRecvItemFromOutQueue(false);//停止从待发送队列读取数据
this.SetIfCanAddItemToOutQueue(false);//停止向待发送队列发送数据
this.SetOutQueueItemToTempQueue();//将待发送队列里的数据放入缓冲队列
this.State=CMPPState.Stopped;
}
break;
}
}
private void SentISMGMsgCallBack(System.IAsyncResult ar)
{
QueueItem q=(QueueItem)ar.AsyncState;
if(!ar.IsCompleted)
{
//异步操作未完成,退出
this._Stop(2);
return;
}
try
{
this.RWL_ServerSocket.AcquireWriterLock(-1);
this.SeverSocket.EndSend(ar);
}
catch
{}
this.RWL_ServerSocket.ReleaseWriterLock();
Thread.Sleep(1);
if(q!=null)
{
switch(q.MsgType)
{
case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST:
{
q.InQueueTime=DateTime.Now;
this.AddWaitQueueItem(q);
}
break;
case (uint)MSG.CMPP_COMMAND_ID.CMPP_SUBMIT:
{
q.InQueueTime=DateTime.Now;
this.AddWaitQueueItem(q);
}
break;
case (uint)MSG.CMPP_COMMAND_ID.CMPP_DELIVER_RESP:
{}
break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -