📄 scqman.cxx
字号:
if (fWhat == MQMSG_CLASS_NACK_BAD_DST_Q) {
if (! (pImage->flags.fSecureSession || (gMachine->fResponseByIp && pImage->flags.fHaveIpv4Addr)) )
return;
}
CEodHeader *pEodHeader = pImage->sect.pEodHeader;
GUID guidStreamId;
GUID guidQueueName;
// 1. Extract all names, convert to GUIDs.
if (pEodHeader) {
WCHAR *pStreamURI = (WCHAR *)pEodHeader->GetPointerToStreamId ();
if (! pStreamURI) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - no stream id\n");
#endif
return;
}
if (! gSeqMan->HashStringToGUID (pStreamURI, &guidStreamId)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - stream id doesn't hash\n");
#endif
return;
}
} else // Must be binary!
guidStreamId = *(GUID *)pImage->sect.pUserHeader->GetSourceQM ();
QUEUE_FORMAT qf;
pImage->sect.pUserHeader->GetDestinationQueue (&qf);
WCHAR *lpszTransactName = scutil_QFtoString (&qf);
if (! lpszTransactName) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - destination name doesn't parse\n");
#endif
return;
}
if (! gSeqMan->HashStringToGUID (lpszTransactName, &guidQueueName)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - destination name doesn't hash\n");
#endif
g_funcFree (lpszTransactName, g_pvFreeData);
return;
}
g_funcFree (lpszTransactName, g_pvFreeData);
WCHAR *szAckQueueName = NULL;
int fFreeAckQueueName = FALSE;
ScOrderSeq *pSeq = gSeqMan->HashOrderSequence (NULL, &guidStreamId, &guidQueueName, NULL, 0);
if (! pSeq) {
if (pEodHeader && pEodHeader->GetOrderQueueSizeInBytes()) {
// SRMP message already has queue name in it.
szAckQueueName = (WCHAR*) pEodHeader->GetPointerToOrderQueue();
} else if (pEodHeader) {
szAckQueueName = NULL;
}
else {
szAckQueueName = AllocOrderQueue ((pSourceQM && (guidStreamId == *pSourceQM)) ? szHostName : NULL, &guidStreamId);
fFreeAckQueueName = TRUE;
}
} else
szAckQueueName = gSeqMan->GetStringFromGUID (&pSeq->p->guidOrderAck);
if (! szAckQueueName) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - don't have order queue name\n");
#endif
return;
}
QUEUE_FORMAT qfa;
if (! scutil_StringToQF (&qfa, szAckQueueName)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - can't parse %s\n", szAckQueueName);
#endif
if (fFreeAckQueueName)
g_funcFree (szAckQueueName, g_pvFreeData);
return;
}
if (fFreeAckQueueName)
g_funcFree (szAckQueueName, g_pvFreeData);
ScPacketImage *pAckImage = scqman_ConstructAckImage (pImage, &qfa, fWhat, FALSE);
if (! pAckImage) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - can't construct ack image\n");
#endif
return;
}
pAckImage->sect.pUserHeader->SetSourceQM (&gMachine->guid);
OBJECTID mid;
mid.Lineage = gMachine->guid;
mid.Uniquifier = gQueueMan->GetNextID();
pAckImage->sect.pUserHeader->SetMessageID (&mid);
ScQueue *pQueueAck = FindQueueByPacketImage (pAckImage, TRUE);
if (pQueueAck) {
pAckImage->hkOrderKey = ++pQueueAck->hkReceived;
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Sending to order queue %s...\n", pQueueAck->lpszFormatName);
#endif
}
if (! (pQueueAck && pQueueAck->MakePacket (pAckImage, -1, TRUE))) {
g_funcFree (pAckImage, g_pvFreeData);
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Packet creation failed...\n");
#endif
}
}
void ScQueueManager::ForwardAck (ScPacketImage *pImage) {
SVSUTIL_ASSERT (gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ACK/NACK...\n");
#endif
int fFreeNeeded = TRUE;
for ( ; ; ) {
#if defined (SC_VERBOSE)
{
OBJECTID __ooid;
pImage->sect.pPropHeader->GetCorrelationID ((unsigned char *)&__ooid);
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Processing ACK/NACK class %08x correlation ID %08x " SC_GUID_FORMAT L"\n",
pImage->sect.pPropHeader->GetClass(), __ooid.Uniquifier, SC_GUID_ELEMENTS((&__ooid.Lineage)));
}
#endif
QUEUE_FORMAT qf;
pImage->sect.pUserHeader->GetDestinationQueue (&qf);
if (qf.Suffix() != QUEUE_SUFFIX_TYPE_NONE)
break;
pImage->sect.pUserHeader->SetSourceQM (&gMachine->guid);
OBJECTID mid;
mid.Lineage = gMachine->guid;
mid.Uniquifier = gQueueMan->GetNextID();
pImage->sect.pUserHeader->SetMessageID (&mid);
ScQueue *pQueue = FindQueueByPacketImage (pImage, TRUE);
if (! pQueue) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS | VERBOSE_MASK_QUEUE, L"Cannot find/create admin queue\n");
#endif
break;
}
pImage->hkOrderKey = ++pQueue->hkReceived;
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Sending to admin queue %s...\n", pQueue->lpszFormatName);
#endif
if (pQueue->MakePacket (pImage, -1, TRUE))
fFreeNeeded = FALSE;
else {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Packet creation failed...\n");
#endif
}
break;
}
if (fFreeNeeded)
g_funcFree (pImage, g_pvFreeData);
}
void ScQueueManager::ExpirationCheck (void) {
SVSUTIL_ASSERT (gMem->IsLocked ());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Checking message expiration...\n");
#endif
//
// Garbage collection takes on two entities:
// outgoing queues and incoming queues that
// are neither journals nor DLQ
//
// First, check all outgoing queues
//
DWORD dwTimeout = INFINITE;
unsigned int uiTime = scutil_now ();
for ( ; ; ) {
SVSTNode *pNode = gMem->pTimeoutTree->Min ();
if (! pNode)
break;
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
if (did_not_expire (pPacket->tExpirationTime, uiTime)) {
dwTimeout = (pPacket->tExpirationTime - uiTime) * 1000;
if (dwTimeout == 0)
dwTimeout = 500;
break;
}
SVSUTIL_ASSERT (pPacket->pTimeoutNode == pNode);
gMem->pTimeoutTree->Delete (pNode);
pPacket->pTimeoutNode = NULL;
SVSUTIL_ASSERT ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) || ((pPacket->uiPacketState == SCPACKET_STATE_WAITORDERACK) && pPacket->pQueue->qp.bTransactional && (! pPacket->pQueue->qp.bIsIncoming)));
SVSUTIL_ASSERT (pPacket->pQueue);
SVSUTIL_ASSERT (pPacket->pNode);
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_PACKETS | VERBOSE_MASK_QUEUE, L"Packet ID%08x from queue %s has expired.\n", pPacket->uiMessageID, pPacket->pQueue->lpszFormatName);
#endif
if (pPacket->pQueue->qp.bTransactional) {
if (pPacket->pQueue->BringToMemory (pPacket))
ForwardTransactionalResponse (pPacket->pImage, MQMSG_CLASS_NACK_RECEIVE_TIMEOUT, NULL, NULL);
}
RejectPacket (pPacket, MQMSG_CLASS_NACK_RECEIVE_TIMEOUT, pPacket->pQueue);
pPacket->pQueue->DisposeOfPacket (pPacket);
}
if (dwTimeout != INFINITE)
svsutil_SetAttrTimer (gMem->pTimer, (SVSHandle)hPacketExpired, dwTimeout);
}
void ScQueueManager::PeriodicCheck (void) {
//
// Close all files - each queue will actually manage closeure automagically
//
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_INIT, L"Running periodic checks...\n");
#endif
ScQueueList *pql = pqlIncoming;
while (pql) {
pql->pQueue->sFile->Close (FALSE);
pql = pql->pqlNext;
}
unsigned int uiNow = scutil_now();
pql = pqlOutgoing;
while (pql) {
ScQueueList *pqlNext = pql->pqlNext;
ScQueue *pQueue = pql->pQueue;
pQueue->sFile->Close (FALSE);
if (time_less (pQueue->tModification + SCQMAN_OUTQUEUE_DELETION, uiNow) &&
pQueue->pPackets->IsEmpty() && (! pQueue->uiOpen) && (! pQueue->qp.bIsProtected) &&
((pQueue->pSess->fSessionState == SCSESSION_STATE_INACTIVE) || (pQueue->pSess->fSessionState == SCSESSION_STATE_WAITING)))
DeleteQueue (pQueue, TRUE);
pql = pqlNext;
}
gSeqMan->Maintain ();
}
void ScQueueManager::OrderAckCheck (void) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Checking for order messages for which order ack has not been received.\n");
#endif
ScQueueList *pql = pqlOutgoing;
while (pql) {
ScQueueList *pqlNext = pql->pqlNext;
ScQueue *pQueue = pql->pQueue;
if (pQueue->qp.bTransactional) {
SVSTNode *pNode = pQueue->pPackets->Min ();
if (pNode) {
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
if (pPacket->uiPacketState == SCPACKET_STATE_WAITORDERACK) {
if (pQueue->uiFirstUnackedSeqN && (pPacket->uiSeqN == pQueue->uiFirstUnackedSeqN)) {
//
// We should now resend the sequence...
//
while (pPacket) {
if (pPacket->uiPacketState == SCPACKET_STATE_WAITORDERACK) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Returning packet " SC_GUID_FORMAT L":%08x, queue %s to active state\n",
SC_GUID_ELEMENTS ((&pPacket->guidSourceQM)), pPacket->uiMessageID, pQueue->lpszFormatName);
#endif
pPacket->uiPacketState = SCPACKET_STATE_ALIVE;
--iPacketsWaitingOrderAck;
SVSUTIL_ASSERT (iPacketsWaitingOrderAck >= 0);
}
pNode = pQueue->pPackets->Next (pNode);
pPacket = pNode ? (ScPacket *)SVSTree::GetData (pNode) : NULL;
}
gSessionMan->PacketInserted (pQueue->pSess);
} else
pQueue->uiFirstUnackedSeqN = pPacket->uiSeqN;
} else
pQueue->uiFirstUnackedSeqN = 0;
}
}
pql = pqlNext;
}
if (iPacketsWaitingOrderAck)
svsutil_SetAttrTimer (gMem->pTimer, (SVSHandle)hOrderAckTimer, gMachine->uiOrderAckScale * SCQMAN_ORDERACKPERIODIC * 1000);
}
//
// This gets called when order ack is received for a queue...
//
void ScQueueManager::ReceiveOrderAck (void) {
for ( ; ; ) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Receiving order ack...\n");
#endif
SVSTNode *pNode = pQueueOrderAck->pPackets->Min ();
if (! pNode)
break;
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
if (pQueueOrderAck->BringToMemory (pPacket) && pPacket->pImage->sect.pPropHeader->GetClass() == MQMSG_CLASS_ORDER_ACK) {
QUEUE_FORMAT qf;
WCHAR *lpszFormatName = NULL;
ScQueue *pQueue = NULL;
if ((pPacket->pImage->sect.pPropHeader->GetBodySize() == sizeof(ScOrderAckBody)) &&
pPacket->pImage->sect.pUserHeader->GetResponseQueue (&qf) &&
(lpszFormatName = scutil_QFtoString (&qf)) &&
(pQueue = FindOutgoingByFormat (lpszFormatName))) {
ScOrderAckBody oa;
pPacket->pImage->sect.pPropHeader->GetBody ((unsigned char *)&oa, sizeof(oa));
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Received order %016I64x:%d\n", oa.m_liSeqID, oa.m_ulSeqN);
#endif
if (oa.m_liSeqID == pQueue->llSeqID) {
for ( ; ; ) {
SVSTNode *pNode2 = pQueue->pPackets->Min ();
if (! pNode2)
break;
ScPacket *pPacket2 = (ScPacket *)SVSTree::GetData (pNode2);
SVSUTIL_ASSERT (pPacket2->uiSeqN);
if (pPacket2->uiSeqN > oa.m_ulSeqN)
break;
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Order ack matches packet ptr %08x uuid %08x order no %016I64x:%d\n", pPacket2, pPacket2->uiMessageID, pQueue->llSeqID, pPacket2->uiSeqN);
#endif
gQueueMan->JournalPacket (pPacket2);
pQueue->DisposeOfPacket (pPacket2);
}
}
#if defined (SC_VERBOSE)
else {
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Order ACK seqID %016I64x does not match queue's %016I64x\n", oa.m_liSeqID, pQueue->llSeqID);
}
#endif
}
#if defined (SC_VERBOSE)
else
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Misplaced order ack - wrong sized body or unknown queue (%s)\n", lpszFormatName ? lpszFormatName : L"no queue");
#endif
if (lpszFormatName)
g_funcFree (lpszFormatName, g_pvFreeData);
}
#if defined (SC_VERBOSE)
else if (pPacket->pImage)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Transactional response (class %d) received in queue %s is ignored\n", pPacket->pImage->sect.pPropHeader->GetClass(), pQueueOrderAck->lpszFormatName);
else
scerror_DebugOut (VERBOSE_MASK_ORDER, L"No image for packet " SC_GUID_FORMAT L"::%08x in order ack queue %s\n", SC_GUID_ELEMENTS ((&pPacket->guidSourceQM)), pPacket->uiMessageID, pQueueOrderAck->lpszFormatName);
#endif
pQueueOrderAck->DisposeOfPacket (pPacket);
}
}
void ScQueueManager::MainThread(void) {
HANDLE ah[6];
gMem->Lock ();
ah[0] = gSessionMan->hBuzz;
ah[1] = hPacketExpired;
ah[2] = hOrderAckTimer;
ah[3] = pQueueOrderAck->hUpdateEvent;
ah[4] = gSeqMan->hSequencePulse;
ah[SVSUTIL_ARRLEN (ah) - 1] = ScSessionManager::hNetUP; // THIS MUST BE THE LAST ONE!!!
DWORD c = SVSUTIL_ARRLEN (ah);
if (! ah[c-1])
--c;
gMem->Unlock ();
for ( ; ; ) {
DWORD dwHowCalled = WaitForMultipleObjects (c, ah, FALSE, SCQMAN_PERIODIC);
if (dwHowCalled == WAIT_FAILED) {
SVSUTIL_ASSERT (0);
break;
}
gMem->Lock ();
if (dwHowCalled == WAIT_OBJECT_0)
gSessionMan->ConnService ();
if (dwHowCalled == (WAIT_OBJECT_0 + 1))
ExpirationCheck ();
if (dwHowCalled == (WAIT_OBJECT_0 + 2))
OrderAckCheck ();
if (dwHowCalled == (WAIT_OBJECT_0 + 3))
ReceiveOrderAck ();
if (dwHowCalled == (WAIT_OBJECT_0 + 4))
gSeqMan->SendAcks ();
if (dwHowCalled == (WAIT_OBJECT_0 + 5))
gSessionMan->ConnectToNet (SCSESSION_LANAUP_DELAY);
if (dwHowCalled == WAIT_TIMEOUT)
PeriodicCheck ();
gMem->Unlock ();
}
}
DWORD WINAPI ScQueueManager::MainThread_s (void *arg) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_INIT, L"Queue manager thread has started...\n");
#endif
ScQueueManager *pQueueMan = (ScQueueManager *)arg;
pQueueMan->MainThread();
return 0;
}
void ScQueueManager::Start (void) {
SVSUTIL_ASSERT (gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_INIT, L"Starting queue manager main thread...\n");
#endif
DWORD tid;
hMainThread = CreateThread (NULL, 0, ScQueueManager::MainThread_s, this, 0, &tid);
}
void ScQueueManager::Stop (void) {
SVSUTIL_ASSERT (gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_INIT, L"Stopping queue manager main thread...\n");
#endif
if (hMainThread) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_INIT, L"Terminating queue manager thread...\n");
#endif
TerminateThread (hMainThread, 0);
::CloseHandle (hMainThread);
}
hMainThread = NULL;
}
void ScQueueManager::SaveMessageID (void) {
SVSUTIL_ASSERT (gMem->IsLocked());
HKEY hKey;
LONG hr = RegOpenKeyEx (HKEY_LOCAL_MACHINE, MSMQ_SC_REGISTRY_KEY, 0, KEY_WRITE, &hKey);
if (hr != ERROR_SUCCESS)
return;
RegSetValueEx (hKey, L"MessageID", 0, REG_DWORD, (BYTE *)&uiMessageID, sizeof(DWORD));
RegCloseKey (hKey);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -