📄 scsman.cxx
字号:
pSess->hServiceThreadR = CreateThread (NULL, 0, ScSession::ServiceThreadR_s, pSess, 0, &tid);
pSess->hServiceThreadW = CreateThread (NULL, 0, ScSession::ServiceThreadW_s, pSess, 0, &tid);
return TRUE;
}
void ScSessionManager::AccThread (void) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"In listener...\n");
#endif
while (s_listen != INVALID_SOCKET) {
SVSUTIL_ASSERT (! gMem->IsLocked());
sockaddr_in addr;
int addrlen = sizeof (addr);
SOCKET s = accept (s_listen, (sockaddr *)&addr, &addrlen);
if (s == INVALID_SOCKET)
continue;
char mbbuffer[MSMQ_SC_SMALLBUFFER];
char *l_lpmbHostName;
if (! NBStatusQuery (mbbuffer, sizeof(mbbuffer), addr.sin_addr.S_un.S_addr)) {
HOSTENT *pHost = gethostbyaddr ((char *)&addr.sin_addr, sizeof(addr.sin_addr), AF_INET);
if (pHost)
l_lpmbHostName = pHost->h_name;
else {
closesocket (s);
scerror_Inform (MSMQ_SC_ERRMSG_UNNAMEDCONNECT, addr.sin_addr.S_un.S_un_b.s_b1,
addr.sin_addr.S_un.S_un_b.s_b2, addr.sin_addr.S_un.S_un_b.s_b3,
addr.sin_addr.S_un.S_un_b.s_b4);
continue;
}
} else
l_lpmbHostName = mbbuffer;
int iWStrLen = MultiByteToWideChar (CP_ACP, 0, l_lpmbHostName, -1, NULL, 0) + 1;
SVSUTIL_ASSERT (iWStrLen > 0);
WCHAR *l_lpszHostName = (WCHAR *)g_funcAlloc (iWStrLen * sizeof(WCHAR), g_pvAllocData);
if (l_lpszHostName == NULL) {
closesocket (s);
Sleep(1000); // wait and hope that we can get memory.
continue;
}
MultiByteToWideChar (CP_ACP, 0, l_lpmbHostName, -1, l_lpszHostName, iWStrLen);
gMem->Lock ();
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Got socket connection for host %s\n", l_lpszHostName);
#endif
WCHAR *lpszHashedName = svsutil_StringHashAlloc (gMem->pStringHash, l_lpszHostName);
g_funcFree (l_lpszHostName, g_pvFreeData);
if (lpszHashedName == NULL) {
// memory failure...
gMem->Unlock(); // don't hold the lock during delay.
closesocket (s);
Sleep(1000); // wait and hope that we can get memory.
continue;
}
ScSession *pSess = GetInactiveOsSession (lpszHashedName);
int fSockClose = FALSE;
if (! pSess)
fSockClose = TRUE;
else {
if (! SpinSession (pSess, s)) {
fSockClose = TRUE;
ReleaseSession (pSess);
} else
pSess->ipPeerAddr = addr.sin_addr;
}
svsutil_StringHashFree (gMem->pStringHash, lpszHashedName);
gMem->Unlock ();
if (fSockClose)
closesocket (s);
}
}
void ScSessionManager::ConnService (void) {
SVSUTIL_ASSERT (gMem->IsLocked());
//
// Why do we want to rescan it?
// 1. New message arrived to a queue which session may have been inactive (buzz)
// 2. There are sessions waiting for retry and the time has come (timeout)
// 3. We did not have the thread and now we do (buzz)
//
//
// How do we want to rescan? Check all queues for inactives and waiting with expired
// and spin them off...
//
scutil_IsLocalTCP (NULL); // Reset IP table
unsigned int uiNow = scutil_now ();
unsigned int uiNextWakeup = 0;
int iLanNum = -1;
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connection creator scan, time %08x...\n", uiNow);
#endif
ScQueueList *pql = gQueueMan->pqlOutgoing;
while (pql) {
ScSession *pSess = pql->pQueue->pSess;
//
// What do we do here?
//
// If session is waiting for retry and the time's up, retry.
// If session is inactive and the queue is not empty, retry.
//
if (! pql->pQueue->pPackets->IsEmpty()) {
if (((pSess->fSessionState == SCSESSION_STATE_WAITING) && time_less_equal (pSess->uiNextAttemptTime, uiNow)) ||
(pSess->fSessionState == SCSESSION_STATE_INACTIVE)) {
if (iLanNum == -1)
iLanNum = (gMachine->uiLanOffDelay == 0) ? 255 : scutil_GetLanNum();
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Can spin for %s, status %d requested time %d, have %d lan interfaces (255 = zero port override)...\n",
pSess->lpszHostName, pSess->fSessionState, pSess->uiNextAttemptTime, iLanNum);
#endif
if (iLanNum) {
pSess->AddRef ();
if (! SpinSession (pSess, INVALID_SOCKET))
ReleaseSession (pSess);
} else {
pSess->uiNextAttemptTime = uiNow + gMachine->uiLanOffDelay;
pSess->fSessionState = SCSESSION_STATE_WAITING;
}
} else if (pSess->fSessionState == SCSESSION_STATE_OPERATING)
SetEvent (pSess->hEvent);
}
if ((pSess->fSessionState == SCSESSION_STATE_WAITING) &&
time_greater (pSess->uiNextAttemptTime, uiNow) &&
((! uiNextWakeup) ||
time_less (pSess->uiNextAttemptTime, uiNextWakeup)))
uiNextWakeup = pSess->uiNextAttemptTime;
pql = pql->pqlNext;
}
if (! uiNextWakeup)
svsutil_ClearAttrTimer (gMem->pTimer, (SVSHandle)hBuzz);
else {
SVSUTIL_ASSERT (time_greater (uiNextWakeup, uiNow));
svsutil_SetAttrTimer (gMem->pTimer, (SVSHandle)hBuzz, (uiNextWakeup - uiNow) * 1000);
}
}
void ScSessionManager::ConnectToNet (unsigned int uiDelta) {
gMem->Lock ();
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Reconnecting!\n");
#endif
unsigned int uiNowS = scutil_now ();
ScSession *pSess = gSessionMan->pSessList;
while (pSess) {
if (pSess->fSessionState == SCSESSION_STATE_WAITING)
pSess->uiNextAttemptTime = uiNowS + uiDelta;
pSess = pSess->pNext;
}
SetEvent (gSessionMan->hBuzz);
scutil_IsLocalTCP (NULL);
gMem->Unlock ();
}
DWORD WINAPI ScSessionManager::AccThread_s (void *arg) {
gSessionMan->AccThread();
return 0;
}
void ScSession::FailConnection (void) {
SVSUTIL_ASSERT (gMem->IsLocked());
int i = InterlockedIncrement((long *)&iFailures) - 1;
if (i >= (int)gMachine->uiRetrySchedule)
i = gMachine->uiRetrySchedule - 1;
ScQueueList *pQueueList = gQueueMan->pqlOutgoing;
int fFoundWaiting = FALSE;
while (pQueueList) {
if ((pQueueList->pQueue->pSess == this) && (! pQueueList->pQueue->pPackets->IsEmpty())) {
fFoundWaiting = TRUE;
break;
}
pQueueList = pQueueList->pqlNext;
}
if (! fFoundWaiting) {
fSessionState = SCSESSION_STATE_INACTIVE;
return;
}
uiNextAttemptTime = scutil_now() + gMachine->asRetrySchedule[i];
fSessionState = SCSESSION_STATE_WAITING;
SetEvent (gSessionMan->hBuzz);
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connection to %s failed. Next attempt - at %08x\n", lpszHostName, uiNextAttemptTime);
#endif
}
void ScSession::OkConnection (unsigned int uiConnectionType) {
SVSUTIL_ASSERT (gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Connection to %s succeeded\n", lpszHostName);
#endif
iFailures = 0;
fSessionState = uiConnectionType;
}
void ScSession::InitConnection (void) {
gMem->Lock ();
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Initializing connection to %s...\n", lpszHostName);
#endif
uiAckTimeout = 0;
uiStoreAckTimeout = 0;
uiMyWindowSize = MSMQ_DEFAULT_WINDOW_SIZE_PACKET;
uiOtherWindowSize = 0;
usLastAckSent = 0;
usLastRelAckSent = 0;
usPacketsSent = 0;
usRelPacketsSent = 0;
usLastAckReceived = 0;
usPacketsReceived = 0;
usRelPacketsReceived = 0;
uiAckDue = 0;
SVSUTIL_ASSERT (pSentPackets == NULL);
SVSUTIL_ASSERT (pSentRelPackets == NULL);
gMem->Unlock ();
}
int ScSession::SendECP (int fRefuseConnection) {
SVSUTIL_ASSERT (! gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Sending Establish Connection Packet to %s...\n", lpszHostName);
#endif
const int iPacketSize = sizeof(CBaseHeader) + sizeof(CInternalSection) + sizeof(CECSection);
char __buf[iPacketSize];
CBaseHeader *pBaseHeader = (CBaseHeader *)__buf;
pBaseHeader->CBaseHeader::CBaseHeader(iPacketSize);
pBaseHeader->SetType(FALCON_INTERNAL_PACKET);
CInternalSection *pInternal = (CInternalSection *)pBaseHeader->GetNextSection();
pInternal->CInternalSection::CInternalSection(INTERNAL_ESTABLISH_CONNECTION_PACKET);
if (fRefuseConnection)
pInternal->SetRefuseConnectionFlag ();
CECSection *pECSection = (CECSection *)pInternal->GetNextSection ();
if (fSessionState == SCSESSION_STATE_CONNECTED_CLIENT)
pECSection->CECSection::CECSection(&gMachine->guid, &guidDest, FALSE);
else
pECSection->CECSection::CECSection(&guidDest, &gMachine->guid, uiConnectionStamp, FALSE);
// pECSession->CheckAllowNewSession(FALSE); Do we need this???
int iTotalSent = 0;
while (iTotalSent < iPacketSize) {
int iSent = send (s, &__buf[iTotalSent], iPacketSize - iTotalSent, 0);
if (iSent == SOCKET_ERROR) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP send to %s failed (%d bytes transmitted)...\n", lpszHostName, iTotalSent);
#endif
return FALSE;
}
iTotalSent += iSent;
}
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP send to %s succeeded (%d bytes transmitted)...\n", lpszHostName, iTotalSent);
#endif
return TRUE;
}
int ScSession::RecvECP (void) {
SVSUTIL_ASSERT (! gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Receiving Establish Connection Packet from %s...\n", lpszHostName);
#endif
const int iPacketSize = sizeof(CBaseHeader) + sizeof(CInternalSection) + sizeof(CECSection);
char __buf[iPacketSize];
int iTotalRecvd = 0;
while (iTotalRecvd < iPacketSize) {
int iRecvd = recv (s, &__buf[iTotalRecvd], iPacketSize - iTotalRecvd, 0);
if (iRecvd <= 0) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (%d bytes transmitted)...\n", lpszHostName, iTotalRecvd);
#endif
return FALSE;
}
iTotalRecvd += iRecvd;
}
CBaseHeader *pBaseHeader = (CBaseHeader *)__buf;
if (! (pBaseHeader->SignatureIsValid() && pBaseHeader->VersionIsValid())) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (bad sig or version)...\n", lpszHostName);
#endif
return FALSE;
}
if (pBaseHeader->GetType() != FALCON_INTERNAL_PACKET) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (bad type %08x)...\n", lpszHostName, pBaseHeader->GetType());
#endif
return FALSE;
}
if (pBaseHeader->GetPacketSize() != iPacketSize) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (bad size %d, should be %d)...\n", lpszHostName, pBaseHeader->GetPacketSize(), iPacketSize);
#endif
return FALSE;
}
CInternalSection *pInternal = (CInternalSection *)pBaseHeader->GetNextSection();
if (pInternal->GetPacketType() != INTERNAL_ESTABLISH_CONNECTION_PACKET) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (not a connection packet, type is %08x)...\n", lpszHostName, pInternal->GetPacketType());
#endif
return FALSE;
}
CECSection *pECSection = (CECSection *)pInternal->GetNextSection ();
if (fSessionState == SCSESSION_STATE_CONNECTED_CLIENT) {
guidDest = *pECSection->GetServerQMGuid();
if (pInternal->GetRefuseConnectionFlag()) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s failed (connection refused)...\n", lpszHostName);
#endif
return FALSE;
}
uiConnectionDelta = GetTickCount() - pECSection->GetTimeStamp();
} else {
guidDest = *pECSection->GetClientQMGuid();
uiConnectionStamp = pECSection->GetTimeStamp();
}
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"ECP receive from %s succeeded (guid = " SC_GUID_FORMAT L")...\n", lpszHostName, SC_GUID_ELEMENTS((&guidDest)));
#endif
//
// Disallow loopbacks - they break all possible relationships in XACT queues.
//
if (guidDest == gMachine->guid) {
if (fSessionState == SCSESSION_STATE_CONNECTED_SERVER)
SendECP (TRUE);
return FALSE;
}
return TRUE;
}
int ScSession::SendCPP (void) {
SVSUTIL_ASSERT (! gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_SESSION, L"Sending Connection Parameters Packet to %s...\n", lpszHostName);
#endif
if (fSessionState == SCSESSION_STATE_CONNECTED_CLIENT) {
uiAckTimeout = uiConnectionDelta * 80 * 10;
if (gMachine->uiMinAckTimeout > uiAckTimeout)
uiAckTimeout = gMachine->uiMinAckTimeout;
else if (uiAckTimeout > gMachine->uiMaxAckTimeout)
uiAckTimeout = gMachine->uiMaxAckTimeout;
uiStoreAckTimeout = uiConnectionDelta * 8;
if (gMachine->uiMinStoreAckTimeout > uiStoreAckTimeout)
uiStoreAckTimeout = gMachine->uiMinStoreAckTimeout;
if (uiStoreAckTimeout > uiAckTimeout)
uiStoreAckTimeout = uiAckTimeout;
}
const int iPacketSize = sizeof(CBaseHeader) + sizeof(CInternalSection) + sizeof(CCPSection);
char __buf[iPacketSize];
CBaseHeader *pBase = (CBaseHeader *)__buf;
pBase->CBaseHeader::CBaseHeader (iPacketSize);
pBase->SetType (FALCON_INTERNAL_PACKET);
CInternalSection *pInternal = (CInternalSection *)pBase->GetNextSection ();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -