📄 scsman.cxx
字号:
pInternal->CInternalSection::CInternalSection(INTERNAL_CONNECTION_PARAMETER_PACKET);
CCPSection *pCPSection = (CCPSection *)pInternal->GetNextSection();
pCPSection->CCPSection::CCPSection (uiMyWindowSize, uiStoreAckTimeout,
uiAckTimeout, 0);
int iTotalSent = 0;
while (iTotalSent < iPacketSize) {
int iSent = send (s, &__buf[iTotalSent], iPacketSize - iTotalSent, 0);
if (iSent == SOCKET_ERROR) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connection Parameters Packet send to %s failed (%d bytes transmitted)...\n", lpszHostName, iTotalSent);
#endif
return FALSE;
}
iTotalSent += iSent;
}
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connection Parameters Packet send to %s succeeded (%d bytes transmitted)...\n", lpszHostName, iTotalSent);
#endif
return TRUE;
}
int ScSession::RecvCPP (void) {
SVSUTIL_ASSERT (! gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Receiving Connection Parameters Packet from %s...\n", lpszHostName);
#endif
const int iPacketSize = sizeof(CBaseHeader) + sizeof(CInternalSection) + sizeof(CCPSection);
char __buf[iPacketSize];
int iTotalRecvd = 0;
while (iTotalRecvd < iPacketSize) {
int iRecvd = recv (s, &__buf[iTotalRecvd], iPacketSize - iTotalRecvd, 0);
if (iRecvd <= 0) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connection Parameters Packet receive from %s failed (%d bytes transmitted)...\n", lpszHostName, iTotalRecvd);
#endif
return FALSE;
}
iTotalRecvd += iRecvd;
}
CBaseHeader *pBaseHeader = (CBaseHeader *)__buf;
if (! (pBaseHeader->SignatureIsValid() && pBaseHeader->VersionIsValid())) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (bad sig or version)...\n", lpszHostName);
#endif
return FALSE;
}
if (pBaseHeader->GetType() != FALCON_INTERNAL_PACKET) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (bad packet type %d)...\n", lpszHostName, pBaseHeader->GetType());
#endif
return FALSE;
}
if (pBaseHeader->GetPacketSize() != iPacketSize) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (bad packet type %d)...\n", lpszHostName, pBaseHeader->GetType());
#endif
return FALSE;
}
CInternalSection *pInternal = (CInternalSection *)pBaseHeader->GetNextSection();
if (pInternal->GetPacketType() != INTERNAL_CONNECTION_PARAMETER_PACKET) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (bad internal packet type %d)...\n", lpszHostName, pInternal->GetPacketType());
#endif
return FALSE;
}
CCPSection *pCPSection = (CCPSection *)pInternal->GetNextSection();
if (fSessionState == SCSESSION_STATE_CONNECTED_SERVER) {
uiAckTimeout = pCPSection->GetAckTimeout();
uiStoreAckTimeout = pCPSection->GetRecoverAckTimeout();
} else if ((uiAckTimeout != pCPSection->GetAckTimeout()) ||
(uiStoreAckTimeout != pCPSection->GetRecoverAckTimeout())) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (inconsistent ack timing)...\n", lpszHostName, pInternal->GetPacketType());
#endif
return FALSE;
}
uiOtherWindowSize = pCPSection->GetWindowSize();
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s succeeded. Timeout(Store) = %d(%d). Window size = %d...\n",
lpszHostName, uiAckTimeout, uiStoreAckTimeout, uiOtherWindowSize);
#endif
return TRUE;
}
int ScSession::HandleSessionSection (CSessionSection *pcSessionSection) {
SVSUTIL_ASSERT (gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Handling session section for %s...\n", lpszHostName);
#endif
unsigned short usSyncAckSequenceNo, usSyncAckRecoverNo;
pcSessionSection->GetSyncNo(&usSyncAckSequenceNo, &usSyncAckRecoverNo);
unsigned short usOtherReceived = pcSessionSection->GetAcknowledgeNo();
unsigned short usRelOtherReceived = pcSessionSection->GetStorageAckNo();
unsigned int uiRelBitfield = pcSessionSection->GetStorageAckBitField();
if ((usSyncAckSequenceNo != usPacketsReceived) ||
(usSyncAckRecoverNo != usRelPacketsReceived) ||
sessnum_greater (usOtherReceived, usPacketsSent) ||
sessnum_greater (usRelOtherReceived, usRelPacketsSent)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Error handling session section for %s - synchronization check failed...\n", lpszHostName);
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Our side: %d packets received, %d recoverable packets received,\n\t%d packets sent, %d reliable packets sent.\n",
usPacketsReceived, usRelPacketsReceived, usPacketsSent, usRelPacketsSent);
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Other side: %d packets sent, %d recoverable packets sent,\n\t%d packets received, %d reliable packets received.\n",
usSyncAckSequenceNo, usSyncAckRecoverNo, usOtherReceived, usRelOtherReceived);
#endif
return FALSE;
}
uiOtherWindowSize = pcSessionSection->GetWindowSize();
//
// Throw away acked packets. Do express first.
//
SentPacket *pSentRunner = pSentPackets;
SentPacket *pSentParent = NULL;
while (pSentRunner) {
if ((signed short) (usOtherReceived - pSentRunner->usNum) >= 0)
break;
pSentParent = pSentRunner;
pSentRunner = pSentRunner->pNext;
}
if (pSentRunner) {
if (pSentParent)
pSentParent->pNext = NULL;
else
pSentPackets = NULL;
while (pSentRunner) {
pSentParent = pSentRunner;
pSentRunner = pSentRunner->pNext;
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Recorded ack for express message %08x ptr %08x session number %d\n", pSentParent->pPacket->uiMessageID, pSentParent->pPacket, pSentParent->usNum);
#endif
gQueueMan->JournalPacket (pSentParent->pPacket);
ScQueue *pQueue = pSentParent->pPacket->pQueue;
pQueue->DisposeOfPacket (pSentParent->pPacket);
svsutil_FreeFixed (pSentParent, gMem->pAckNodeMem);
}
}
//
// Now remove reliable packets...
//
if (usRelOtherReceived) {
pSentRunner = pSentRelPackets;
pSentParent = NULL;
int iBit = 32; // Bit count starts from 1
unsigned short usRelPackNum = usRelOtherReceived + iBit;
for ( ; ; ) {
if ((iBit > 0) && ((uiRelBitfield & (1 << (iBit - 1))) == 0)) {
--iBit;
--usRelPackNum;
continue;
}
while (pSentRunner && sessnum_greater (pSentRunner->usNum, usRelPackNum)) {
pSentParent = pSentRunner;
pSentRunner = pSentRunner->pNext;
}
if (! pSentRunner)
break;
if (usRelPackNum == (int)pSentRunner->usNum) {
if (pSentParent)
pSentParent->pNext = pSentRunner->pNext;
else
pSentRelPackets = pSentRunner->pNext;
SentPacket *pSentX = pSentRunner;
pSentRunner = pSentRunner->pNext;
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Recorded ack for reliable message %08x ptr %08x session number %d\n", pSentX->pPacket->uiMessageID, pSentX->pPacket, pSentX->usNum);
#endif
gQueueMan->JournalPacket (pSentX->pPacket);
ScQueue *pQueue = pSentX->pPacket->pQueue;
pQueue->DisposeOfPacket (pSentX->pPacket);
svsutil_FreeFixed (pSentX, gMem->pAckNodeMem);
}
--iBit;
--usRelPackNum;
if (sessnum_less (usRelPackNum, usRelOtherReceived))
break;
}
}
usLastAckReceived = usOtherReceived;
SetEvent (hEvent);
return TRUE;
}
int ScSession::HandleInternalPacket (CBaseHeader *pBaseHeader) {
SVSUTIL_ASSERT (gMem->IsLocked());
if (! pBaseHeader->SessionIsIncluded())
return FALSE;
SVSUTIL_ASSERT (pBaseHeader->GetType() == FALCON_INTERNAL_PACKET);
CInternalSection *pInternalSect = (CInternalSection *)pBaseHeader->GetNextSection ();
CSessionSection *pSessionSect = (CSessionSection *)pInternalSect->GetNextSection();
return HandleSessionSection (pSessionSect);
}
int ScSession::HandleUserPacket (ScPacketImage *pImage, CSessionSection *pcSessionSection) {
SVSUTIL_ASSERT (gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"User packet arrived from %s\n", lpszHostName);
#endif
if (! pImage->PopulateSections()) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"User packet from %s is corrupted...\n", lpszHostName);
#endif
g_funcFree (pImage, g_pvFreeData);
return FALSE;
}
int fReliable = (pImage->sect.pUserHeader->GetDelivery() == MQMSG_DELIVERY_RECOVERABLE);
++usPacketsReceived;
if (! usPacketsReceived)
++usPacketsReceived;
unsigned int uiNow = GetTickCount ();
unsigned int uiAckNext = uiNow + uiAckTimeout / 2;
if (fReliable) {
++usRelPacketsReceived;
if (! usRelPacketsReceived)
++usRelPacketsReceived;
uiAckNext = uiNow + uiStoreAckTimeout / 2;
}
if (sessnum_greater_equal (usPacketsReceived, (unsigned short)(usLastAckSent + uiOtherWindowSize)) ||
sessnum_greater_equal (usRelPacketsReceived, (unsigned short)(usLastRelAckSent + STORED_ACK_BITFIELD_SIZE)))
uiAckNext = uiNow;
if ((! uiAckDue) || (time_greater (uiAckDue, uiAckNext))) {
uiAckDue = uiAckNext;
SVSUTIL_ASSERT (time_less_equal (uiNow, uiAckDue));
svsutil_SetAttrTimer (gMem->pTimer, (SVSHandle)hEvent, uiAckDue - uiNow);
}
#if defined (SC_VERBOSE)
if (uiAckDue && time_greater (uiNow, uiAckDue + 3000))
scerror_DebugOut (VERBOSE_MASK_WARN, L"WARNING - underrunning more than 3 secs on ACK time! now %08x, due %08x\n", uiNow, uiAckDue);
#endif
if (pcSessionSection) {
SVSUTIL_ASSERT (pImage->sect.pBaseHeader->SessionIsIncluded());
pImage->sect.pBaseHeader->IncludeSession (FALSE);
if (! HandleSessionSection (pcSessionSection)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Couldn't handle session sect for packet from %s...\n", lpszHostName);
#endif
g_funcFree (pImage, g_pvFreeData);
return FALSE;
}
}
ScQueue *pQueue = NULL;
if (! (pQueue = gQueueMan->FindQueueByPacketImage (pImage, FALSE))) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"User packet from %s arrived to non-existent queue...\n", lpszHostName);
#endif
gQueueMan->ForwardTransactionalResponse (pImage, MQMSG_CLASS_NACK_BAD_DST_Q, lpszHostName, &guidDest);
gQueueMan->RejectPacket (pImage, MQMSG_CLASS_NACK_BAD_DST_Q);
g_funcFree (pImage, g_pvFreeData);
} else {
//
// Put this thing in a queue...
//
//
// If we don't have resources, this will fail. In this case we'd
// better close the connection.
//
if (! pQueue->qp.bIsIncoming) {
if (pImage->sect.pUserHeader->GetHopCount () > MSMQ_SC_MAX_HOP_COUNT) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"User packet from %s exceeded hop count...\n", lpszHostName);
#endif
gQueueMan->ForwardTransactionalResponse (pImage, MQMSG_CLASS_NACK_HOP_COUNT_EXCEEDED, lpszHostName, &guidDest);
gQueueMan->RejectPacket (pImage, MQMSG_CLASS_NACK_HOP_COUNT_EXCEEDED);
g_funcFree (pImage, g_pvFreeData);
} else {
pImage->sect.pUserHeader->IncHopCount ();
if (! pQueue->MakePacket (pImage, -1, TRUE)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"User packet from %s cannot be put into %s...\n", lpszHostName, pQueue->lpszFormatName);
#endif
gQueueMan->RejectPacket (pImage, MQMSG_CLASS_NACK_Q_EXCEED_QUOTA);
g_funcFree (pImage, g_pvFreeData);
}
}
} else if (pQueue->qp.bTransactional) {
if (! gQueueMan->StoreTransactionalPacket (pQueue, pImage, lpszHostName, &guidDest))
g_funcFree (pImage, g_pvFreeData);
} else {
pImage->hkOrderKey = ++pQueue->hkReceived;
pImage->hkOrderKey |= (pImage->sect.pBaseHeader->GetPriority() ^ MQ_MAX_PRIORITY) << SCPACKET_ORD_TIMEBITS;
ScPacket *pPacket = pQueue->MakePacket (pImage, -1, TRUE);
if (! pPacket) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"User packet from %s cannot be put into %s...\n", lpszHostName, pQueue->lpszFormatName);
#endif
gQueueMan->RejectPacket (pImage, MQMSG_CLASS_NACK_Q_EXCEED_QUOTA);
g_funcFree (pImage, g_pvFreeData);
} else
gQueueMan->AcceptPacket (pPacket, MQMSG_CLASS_ACK_REACH_QUEUE, pQueue);
}
}
return TRUE;
}
//
// Get the connection to happen
//
int ScSession::Connect (void) {
sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons (gMachine->uiPort);
unsigned long ul_addr = inet_addr (lpszmbHostName);
if (ul_addr != INADDR_NONE) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connecting to %s via direct IP\n", lpszHostName);
#endif
sin.sin_addr.S_un.S_addr = ul_addr;
ipPeerAddr = sin.sin_addr;
return (connect (s, (sockaddr *)&sin, sizeof(sin)) == 0);
}
//
// Do two passes - pinging and connecting...
//
unsigned long ul_wins[MSMQ_SC_IPBUFFER];
unsigned long ul_wins_report[MSMQ_SC_IPBUFFER];
int nWins = 0;
for (int iPass = 0 ; iPass < 2 ; ++iPass) {
if (iPass == 0) {
HOSTENT *pHost = gethostbyname (lpszmbHostName);
if (pHost)
ul_addr = *(unsigned long *)pHost->h_addr_list[0];
}
if (ul_addr != INADDR_NONE) {
sin.sin_addr.S_un.S_addr = ul_addr;
ipPeerAddr = sin.sin_addr;
if ((iPass == 0) && ping (ul_addr)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connecting to %s as %d.%d.%d.%d (gethostbyname, ping)\n", lpszHostName,
sin.sin_addr.S_un.S_un_b.s_b1, sin.sin_addr.S_un.S_un_b.s_b2, sin.sin_addr.S_un.S_un_b.s_b3, sin.sin_addr.S_un.S_un_b.s_b4);
#endif
return (connect (s, (sockaddr *)&sin, sizeof(sin)) == 0);
}
if ((iPass == 1) || (! gMachine->uiPingPort)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connecting to %s as %d.%d.%d.%d (gethostbyname)\n", lpszHostName,
sin.sin_addr.S_un.S_un_b.s_b1, sin.sin_addr.S_un.S_un_b.s_b2, sin.sin_addr.S_un.S_un_b.s_b3, sin.sin_addr.S_un.S_un_b.s_b4);
#endif
if (connect (s, (sockaddr *)&sin, sizeof(sin)) == 0)
return TRUE;
}
}
if (iPass == 0)
nWins = NBGetWins (ul_wins, MSMQ_SC_IPBUFFER);
for (int i = 0 ; i < nWins ; ++i) {
if (iPass == 0) {
ul_wins_report[i] = NBQueryWins (lpszmbHostName, ul_wins[i]);
#if defined (SC_VERBOSE)
{
IN_ADDR __sin1 = *(IN_ADDR *)&ul_wins[i];
IN_ADDR __sin2 = *(IN_ADDR *)&ul_wins_report[i];
scerror_DebugOut (VERBOSE_MASK_SESSION, L"WINS %d.%d.%d.%d reports %s as %d.%d.%d.%d (gethostbyname)\n",
__sin1.S_un.S_un_b.s_b1, __sin1.S_un.S_un_b.s_b2,
__sin1.S_un.S_un_b.s_b3, __sin1.S_un.S_un_b.s_b4,
lpszHostName,
__sin2.S_un.S_un_b.s_b1, __sin2.S_un.S_un_b.s_b2,
__sin2.S_un.S_un_b.s_b3, __sin2.S_un.S_un_b.s_b4
);
}
#endif
}
int fFound = FALSE;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -