📄 subsmgserver.cs
字号:
private Transmit _transmit;
public DictData(SubSP sp)
{
_queue = new SocketCommandQueue(sp.MaxWaitAnswerCommand);
Bind bindBody = new Bind();
bindBody.LoginName = "jll";
bindBody.LoginPassword = "jll";
bindBody.LoginType = 2;
_transmit = new Transmit(IPAddress.Parse(sp.IP), sp.DeliverPort,
sp.MaxConnection, sp.MaxWaitAnswerCommand, sp.AnswerTimeout, bindBody, _queue);
}
public SocketCommandQueue Queue
{
get { return _queue; }
}
public Transmit Transmit
{
get { return _transmit; }
}
}
private Dictionary<IPEndPoint, DictData> _spDictData = new Dictionary<IPEndPoint, DictData>();//sp的deliver IP和端口----命令队列
public DeliverListenServer(): base(SgipConfig.Deliver.IP, SgipConfig.Deliver.Port,
SgipConfig.Deliver.MaxConnection, SgipConfig.Deliver.AnswerTimeOut, SgipConfig.Deliver.IdleTimeOut)
{
}
protected override bool IsSocketQueueFull(StateObject obj)
{
/*
Dictionary<IPEndPoint, DictData>.ValueCollection dvc = _spDictData.Values;
foreach (DictData Item in dvc)
{
if (Item.Queue.IsSocketQueueFull(obj.WorkSocket))
{
return true;
}
}
*/
return false;
}
protected override bool IsValidateLogin(Socket client, Bind bindBody)
{
/*
if (bindBody.LoginType != 2)
{
return false;
}
*/
//请在这做一些验证判断!
return true;
}
private volatile int nDeliverCount = 0;
protected override void ProcessOtherCommand(StateObject obj, Command cmd)
{
//3.7.2.6从SMG到SP的消息, 仅smg做为客户端
switch (cmd.Head.CommandID)
{
case Command.SGIP_DELIVER:
Console.WriteLine("共收到:{0}条deliver消息", ++nDeliverCount);
obj.WorkSocket.Send(Command.CreateCommand(new Deliver_Resp()).GetBytes());
//ProcessDeliverCommand(obj, cmd);
break;
case Command.SGIP_REPORT:
ProcessReportCommand(obj, cmd);
break;
case Command.SGIP_USERRPT:
ProcessUserRptCommand(obj, cmd);
break;
case Command.SGIP_TRACE_RESP:
ProcessTrace_RespCommand(obj, cmd);
break;
default:
break;
}
}
private void SendSubspCommand(SubSP sp, StateObject obj, Command cmd)
{
System.Diagnostics.Debug.Assert(sp != null);
IPEndPoint ep = new IPEndPoint(IPAddress.Parse(sp.IP), sp.DeliverPort);
if (!_spDictData.ContainsKey(ep))
{
_spDictData[ep] = new DictData(sp);
}
_spDictData[ep].Queue.Enqueue(obj.WorkSocket, cmd);
_spDictData[ep].Transmit.SendData();
}
private void ProcessDeliverCommand(StateObject obj, Command cmd)
{
SubSP sp = SubSPManager.Manager.FindBySPNumber(((Deliver)cmd.Body).SPNumber);
//未找到二级sp
if (sp == null)
{
System.Diagnostics.Debug.WriteLine(this.GetType().FullName + ":未找到二级sp, Deliver命令无法转发!");
}
else
{
SendSubspCommand(sp, obj, cmd);
}
}
private void ProcessUserRptCommand(StateObject obj, Command cmd)
{
SubSP sp = SubSPManager.Manager.FindBySPNumber(((UserRpt)cmd.Body).SPNumber);
//未找到二级sp
if (sp == null)
{
System.Diagnostics.Debug.WriteLine(this.GetType().FullName + ":未找到二级sp, UserRpt命令无法转发!");
}
else
{
SendSubspCommand(sp, obj, cmd);
}
}
/// <summary>
/// 用于传送Trace_Resp或者Report命令, 这两种命令所涉及的序列号必须存在于TimeoutDictionary中, 根据字典中的记录来查找对应的二级sp来转发
/// </summary>
/// <param name="dict">字典, 由SubmitListenServer将命令序列号保存</param>
/// <param name="strSequence">指出原来保存在字典中的命令序列号. trace命令由trace.head.sequencenumber.tostring()来指定,
/// Report命令由Report.SequenceNumber(该命令的消息体中所指的序列号)来指定</param>
/// <param name="obj">一个有关当前socket的状态</param>
/// <param name="cmd">需要转发的命令</param>
/// <returns></returns>
private bool SendInDictCommand(TimeoutDictionary<string, string> dict, string strSequence, StateObject obj, Command cmd)
{
if (dict.ContainsKey(strSequence))
{
SubSP sp = SubSPManager.Manager.FindByLoginName(dict[strSequence]);
if (sp != null)
{
SendSubspCommand(sp, obj, cmd);
dict.Remove(strSequence);
return true;
}
}
return false;
}
private void ProcessReportCommand(StateObject obj, Command cmd)
{
if (!SendInDictCommand(s_reportDict, ((Report)(cmd.Body)).SequenceNumber.ToString(), obj, cmd))
{
System.Diagnostics.Debug.WriteLine(this.GetType().FullName + ":收到一条Report命令, 却无法找出二级sp, 转发失败!");
}
}
private void ProcessTrace_RespCommand(StateObject obj, Command cmd)
{
if (!SendInDictCommand(s_traceDict, cmd.Head.SequenceNumber.ToString(), obj, cmd))
{
System.Diagnostics.Debug.WriteLine(this.GetType().FullName + ":收到一条Trace命令, 却无法找出二级sp, 转发失败!");
}
}
}
public class TimeoutDictionary<TKey, TValue>
{
class Data
{
private TValue _value;
private DateTime _dt;
public Data(TValue v)
{
_value = v;
_dt = DateTime.Now;
}
public TValue Value
{
get { return _value; }
}
public DateTime DateTime
{
get { return _dt; }
}
}
private Dictionary<TKey, Data> _dict = new Dictionary<TKey, Data>(128);
private Timer _timer;
private long _checkIntervalSeconds; //定时器每隔多久就检查一次, 以秒为单位
/// <summary>
/// 字典中的数据隔多久就过期了,即指:如果存放一个submit的序列号, 如果隔了_timeoutMinutes没有删除它(没收到report命令)则表示过期了, 定时器检测时将它从列表中删除
/// </summary>
private long _timeoutSeconds;
/// <summary>
///
/// </summary>
/// <param name="nCheckIntervalSeconds">定时器的间隔, 以秒为单位</param>
/// <param name="nTimeoutSeconds">字典中的序列号过多久就超时(即可以移除了), 以秒为单位</param>
public TimeoutDictionary(long nCheckIntervalSeconds, long nTimeoutSeconds)
{
_checkIntervalSeconds = nCheckIntervalSeconds;
_timeoutSeconds = nTimeoutSeconds;
_timer = new Timer(new TimerCallback(OnTimerInterval), null, nCheckIntervalSeconds * 1000, nCheckIntervalSeconds * 1000);
}
private void OnTimerInterval(object state)
{
lock (_dict)
{
TimeSpan ts;
foreach (KeyValuePair<TKey, Data> kvp in _dict)
{
ts = kvp.Value.DateTime - DateTime.Now;
if (ts.TotalSeconds >= _timeoutSeconds)
{
_dict.Remove(kvp.Key);
}
}
}
}
public TValue this[TKey key]
{
get
{
lock (_dict)
{
return _dict[key].Value;
}
}
set
{
Add(key, value);
}
}
public void Add(TKey key, TValue v)
{
lock (_dict)
{
_dict[key] = new Data(v);
}
}
public void Remove(TKey key)
{
lock (_dict)
{
_dict.Remove(key);
}
}
public bool ContainsKey(TKey key)
{
lock (_dict)
{
return _dict.ContainsKey(key);
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -