⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scqman.cxx

📁 Windows CE 6.0 Server 源码
💻 CXX
📖 第 1 页 / 共 4 页
字号:
	if (fWhat == MQMSG_CLASS_NACK_BAD_DST_Q) {
		if (! (pImage->flags.fSecureSession || (gMachine->fResponseByIp && pImage->flags.fHaveIpv4Addr)) )
			return;
	}

	CEodHeader *pEodHeader = pImage->sect.pEodHeader;

	GUID guidStreamId;
	GUID guidQueueName;

	// 1. Extract all names, convert to GUIDs.

	if (pEodHeader) {
		WCHAR *pStreamURI = (WCHAR *)pEodHeader->GetPointerToStreamId ();
		if (! pStreamURI) {
#if defined (SC_VERBOSE)
			scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - no stream id\n");
#endif
			return;
		}

		if (! gSeqMan->HashStringToGUID (pStreamURI, &guidStreamId)) {
#if defined (SC_VERBOSE)
			scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - stream id doesn't hash\n");
#endif
			return;
		}
	} else	// Must be binary!
		guidStreamId = *(GUID *)pImage->sect.pUserHeader->GetSourceQM ();

	QUEUE_FORMAT qf;
	pImage->sect.pUserHeader->GetDestinationQueue (&qf);

	WCHAR *lpszTransactName = scutil_QFtoString (&qf);

	if (! lpszTransactName) {
#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - destination name doesn't parse\n");
#endif
		return;
	}

	if (! gSeqMan->HashStringToGUID (lpszTransactName, &guidQueueName)) {
#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - destination name doesn't hash\n");
#endif
		g_funcFree (lpszTransactName, g_pvFreeData);
		return;
	}

	g_funcFree (lpszTransactName, g_pvFreeData);

	WCHAR *szAckQueueName = NULL;
	int   fFreeAckQueueName = FALSE;

	ScOrderSeq *pSeq = gSeqMan->HashOrderSequence (NULL, &guidStreamId, &guidQueueName, NULL, 0);
	if (! pSeq) {
		if (pEodHeader && pEodHeader->GetOrderQueueSizeInBytes()) {
			// SRMP message already has queue name in it.
			szAckQueueName = (WCHAR*) pEodHeader->GetPointerToOrderQueue();
		} else if (pEodHeader) {
			szAckQueueName = NULL;
		}
		else {
			szAckQueueName = AllocOrderQueue ((pSourceQM && (guidStreamId == *pSourceQM)) ? szHostName : NULL, &guidStreamId);
			fFreeAckQueueName = TRUE;
		}
	} else
		szAckQueueName = gSeqMan->GetStringFromGUID (&pSeq->p->guidOrderAck);

	if (! szAckQueueName) {
#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - don't have order queue name\n");
#endif
		return;
	}

	QUEUE_FORMAT qfa;
	if (! scutil_StringToQF (&qfa, szAckQueueName)) {
#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - can't parse %s\n", szAckQueueName);
#endif
		if (fFreeAckQueueName)
			g_funcFree (szAckQueueName, g_pvFreeData);

		return;
	}

	if (fFreeAckQueueName)
		g_funcFree (szAckQueueName, g_pvFreeData);

	ScPacketImage *pAckImage = scqman_ConstructAckImage (pImage, &qfa, fWhat, FALSE);
	if (! pAckImage) {
#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ack - can't construct ack image\n");
#endif
		return;
	}

	pAckImage->sect.pUserHeader->SetSourceQM (&gMachine->guid);

	OBJECTID mid;
	mid.Lineage    = gMachine->guid;
	mid.Uniquifier = gQueueMan->GetNextID();
	pAckImage->sect.pUserHeader->SetMessageID (&mid);

	ScQueue *pQueueAck = FindQueueByPacketImage (pAckImage, TRUE);

	if (pQueueAck) {
		pAckImage->hkOrderKey = ++pQueueAck->hkReceived;

#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_ORDER, L"Sending to order queue %s...\n", pQueueAck->lpszFormatName);
#endif
	}

	if (! (pQueueAck && pQueueAck->MakePacket (pAckImage, -1, TRUE))) {
		g_funcFree (pAckImage, g_pvFreeData);
#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Packet creation failed...\n");
#endif
	}
}

void ScQueueManager::ForwardAck (ScPacketImage *pImage) {
	SVSUTIL_ASSERT (gMem->IsLocked());

#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Forwarding ACK/NACK...\n");
#endif

	int fFreeNeeded = TRUE;

	for ( ; ; ) {
#if defined (SC_VERBOSE)
		{
			OBJECTID __ooid;
			pImage->sect.pPropHeader->GetCorrelationID ((unsigned char *)&__ooid);
			scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Processing ACK/NACK class %08x correlation ID %08x " SC_GUID_FORMAT L"\n",
				pImage->sect.pPropHeader->GetClass(), __ooid.Uniquifier, SC_GUID_ELEMENTS((&__ooid.Lineage)));
		}
#endif
		QUEUE_FORMAT qf;

		pImage->sect.pUserHeader->GetDestinationQueue (&qf);

		if (qf.Suffix() != QUEUE_SUFFIX_TYPE_NONE)
			break;

		pImage->sect.pUserHeader->SetSourceQM (&gMachine->guid);

		OBJECTID mid;
		mid.Lineage    = gMachine->guid;
		mid.Uniquifier = gQueueMan->GetNextID();
		pImage->sect.pUserHeader->SetMessageID (&mid);

		ScQueue *pQueue = FindQueueByPacketImage (pImage, TRUE);

		if (! pQueue) {
#if defined (SC_VERBOSE)
			scerror_DebugOut (VERBOSE_MASK_PACKETS | VERBOSE_MASK_QUEUE, L"Cannot find/create admin queue\n");
#endif
			break;
		}

		pImage->hkOrderKey = ++pQueue->hkReceived;

#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Sending to admin queue %s...\n", pQueue->lpszFormatName);
#endif

		if (pQueue->MakePacket (pImage, -1, TRUE))
			fFreeNeeded = FALSE;
		else {
#if defined (SC_VERBOSE)
			scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Packet creation failed...\n");
#endif
		}

		break;
	}

	if (fFreeNeeded)
		g_funcFree (pImage, g_pvFreeData);
}

void ScQueueManager::ExpirationCheck (void) {
	SVSUTIL_ASSERT (gMem->IsLocked ());

#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_PACKETS, L"Checking message expiration...\n");
#endif
	//
	//	Garbage collection takes on two entities:
	//	outgoing queues and incoming queues that
	//	are neither journals nor DLQ
	//
	//	First, check all outgoing queues
	//
	DWORD dwTimeout = INFINITE;
	unsigned int	uiTime = scutil_now ();

	for ( ; ; ) {
		SVSTNode *pNode = gMem->pTimeoutTree->Min ();
		if (! pNode)
			break;

		ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
		if (did_not_expire (pPacket->tExpirationTime, uiTime)) {
			dwTimeout = (pPacket->tExpirationTime - uiTime) * 1000;
			if (dwTimeout == 0)
				dwTimeout = 500;

			break;
		}

		SVSUTIL_ASSERT (pPacket->pTimeoutNode == pNode);
		gMem->pTimeoutTree->Delete (pNode);
		pPacket->pTimeoutNode = NULL;

		SVSUTIL_ASSERT ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) || ((pPacket->uiPacketState == SCPACKET_STATE_WAITORDERACK) && pPacket->pQueue->qp.bTransactional && (! pPacket->pQueue->qp.bIsIncoming)));
		SVSUTIL_ASSERT (pPacket->pQueue);
		SVSUTIL_ASSERT (pPacket->pNode);

#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_PACKETS | VERBOSE_MASK_QUEUE, L"Packet ID%08x from queue %s has expired.\n", pPacket->uiMessageID, pPacket->pQueue->lpszFormatName);
#endif

		if (pPacket->pQueue->qp.bTransactional) {
			if (pPacket->pQueue->BringToMemory (pPacket))
				ForwardTransactionalResponse (pPacket->pImage, MQMSG_CLASS_NACK_RECEIVE_TIMEOUT, NULL, NULL);
		}

		RejectPacket (pPacket, MQMSG_CLASS_NACK_RECEIVE_TIMEOUT, pPacket->pQueue);
		pPacket->pQueue->DisposeOfPacket (pPacket);
	}

	if (dwTimeout != INFINITE)
		svsutil_SetAttrTimer (gMem->pTimer, (SVSHandle)hPacketExpired, dwTimeout);
}

void ScQueueManager::PeriodicCheck (void) {
	//
	//	Close all files - each queue will actually manage closeure automagically
	//
#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_INIT, L"Running periodic checks...\n");
#endif

	ScQueueList *pql = pqlIncoming;
	while (pql) {
		pql->pQueue->sFile->Close (FALSE);
		pql = pql->pqlNext;
	}

	unsigned int uiNow = scutil_now();
	pql = pqlOutgoing;
	while (pql) {
		ScQueueList *pqlNext = pql->pqlNext;
		ScQueue *pQueue = pql->pQueue;

		pQueue->sFile->Close (FALSE);
		if (time_less (pQueue->tModification + SCQMAN_OUTQUEUE_DELETION, uiNow) &&
			pQueue->pPackets->IsEmpty() && (! pQueue->uiOpen) && (! pQueue->qp.bIsProtected) &&
			((pQueue->pSess->fSessionState == SCSESSION_STATE_INACTIVE) || (pQueue->pSess->fSessionState == SCSESSION_STATE_WAITING)))
			DeleteQueue (pQueue, TRUE);

		pql = pqlNext;
	}

	gSeqMan->Maintain ();
}

void ScQueueManager::OrderAckCheck (void) {
#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_ORDER, L"Checking for order messages for which order ack has not been received.\n");
#endif

	ScQueueList *pql = pqlOutgoing;
	while (pql) {
		ScQueueList *pqlNext = pql->pqlNext;
		ScQueue *pQueue = pql->pQueue;

		if (pQueue->qp.bTransactional) {
			SVSTNode *pNode = pQueue->pPackets->Min ();
			if (pNode) {
				ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
				if (pPacket->uiPacketState == SCPACKET_STATE_WAITORDERACK) {
					if (pQueue->uiFirstUnackedSeqN && (pPacket->uiSeqN == pQueue->uiFirstUnackedSeqN)) {
						//
						//	We should now resend the sequence...
						//
						while (pPacket) {
							if (pPacket->uiPacketState == SCPACKET_STATE_WAITORDERACK) {
#if defined (SC_VERBOSE)
								scerror_DebugOut (VERBOSE_MASK_ORDER, L"Returning packet " SC_GUID_FORMAT L":%08x, queue %s to active state\n", 
									SC_GUID_ELEMENTS ((&pPacket->guidSourceQM)), pPacket->uiMessageID, pQueue->lpszFormatName);
#endif
								pPacket->uiPacketState = SCPACKET_STATE_ALIVE;

								--iPacketsWaitingOrderAck;

								SVSUTIL_ASSERT (iPacketsWaitingOrderAck >= 0);
							}

							pNode = pQueue->pPackets->Next (pNode);
							pPacket = pNode ? (ScPacket *)SVSTree::GetData (pNode) : NULL;
						}

						gSessionMan->PacketInserted (pQueue->pSess);
					} else
						pQueue->uiFirstUnackedSeqN = pPacket->uiSeqN;
				} else
					pQueue->uiFirstUnackedSeqN = 0;
			}
		}

		pql = pqlNext;
	}

	if (iPacketsWaitingOrderAck)
		svsutil_SetAttrTimer (gMem->pTimer, (SVSHandle)hOrderAckTimer, gMachine->uiOrderAckScale * SCQMAN_ORDERACKPERIODIC * 1000);
}

//
//	This gets called when order ack is received for a queue...
//
void ScQueueManager::ReceiveOrderAck (void) {
	for ( ; ; ) {
#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_ORDER, L"Receiving order ack...\n");
#endif

		SVSTNode *pNode = pQueueOrderAck->pPackets->Min ();
		if (! pNode)
			break;

		ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
		if (pQueueOrderAck->BringToMemory (pPacket) && pPacket->pImage->sect.pPropHeader->GetClass() == MQMSG_CLASS_ORDER_ACK) {
			QUEUE_FORMAT qf;
			WCHAR *lpszFormatName = NULL;
			ScQueue *pQueue = NULL;
			if ((pPacket->pImage->sect.pPropHeader->GetBodySize() == sizeof(ScOrderAckBody)) &&
				pPacket->pImage->sect.pUserHeader->GetResponseQueue (&qf) &&
				(lpszFormatName = scutil_QFtoString (&qf)) &&
				(pQueue = FindOutgoingByFormat (lpszFormatName))) {
				ScOrderAckBody oa;
				pPacket->pImage->sect.pPropHeader->GetBody ((unsigned char *)&oa, sizeof(oa));
#if defined (SC_VERBOSE)
				scerror_DebugOut (VERBOSE_MASK_ORDER, L"Received order %016I64x:%d\n", oa.m_liSeqID, oa.m_ulSeqN);
#endif
				if (oa.m_liSeqID == pQueue->llSeqID) {
					for ( ; ; ) {
						SVSTNode *pNode2   = pQueue->pPackets->Min ();
						if (! pNode2)
							break;

						ScPacket *pPacket2 = (ScPacket *)SVSTree::GetData (pNode2);
						SVSUTIL_ASSERT (pPacket2->uiSeqN);
						if (pPacket2->uiSeqN > oa.m_ulSeqN)
							break;
#if defined (SC_VERBOSE)
						scerror_DebugOut (VERBOSE_MASK_ORDER, L"Order ack matches packet ptr %08x uuid %08x order no %016I64x:%d\n", pPacket2, pPacket2->uiMessageID, pQueue->llSeqID, pPacket2->uiSeqN);
#endif
						gQueueMan->JournalPacket (pPacket2);
						pQueue->DisposeOfPacket (pPacket2);
					}
				}
#if defined (SC_VERBOSE)
				else {
					scerror_DebugOut (VERBOSE_MASK_ORDER, L"Order ACK seqID %016I64x does not match queue's %016I64x\n", oa.m_liSeqID, pQueue->llSeqID);
				}
#endif
			}
#if defined (SC_VERBOSE)
			else
				scerror_DebugOut (VERBOSE_MASK_ORDER, L"Misplaced order ack - wrong sized body or unknown queue (%s)\n", lpszFormatName ? lpszFormatName : L"no queue");
#endif
			if (lpszFormatName)
				g_funcFree (lpszFormatName, g_pvFreeData);
		}
#if defined (SC_VERBOSE)
		else if (pPacket->pImage)
			scerror_DebugOut (VERBOSE_MASK_ORDER, L"Transactional response (class %d) received in queue %s is ignored\n", pPacket->pImage->sect.pPropHeader->GetClass(), pQueueOrderAck->lpszFormatName);
		else
			scerror_DebugOut (VERBOSE_MASK_ORDER, L"No image for packet " SC_GUID_FORMAT L"::%08x in order ack queue %s\n", SC_GUID_ELEMENTS ((&pPacket->guidSourceQM)), pPacket->uiMessageID, pQueueOrderAck->lpszFormatName);
#endif
		pQueueOrderAck->DisposeOfPacket (pPacket);
	}
}

void ScQueueManager::MainThread(void) {
	HANDLE ah[6];

	gMem->Lock ();
	ah[0] = gSessionMan->hBuzz;
	ah[1] = hPacketExpired;
	ah[2] = hOrderAckTimer;
	ah[3] = pQueueOrderAck->hUpdateEvent;
	ah[4] = gSeqMan->hSequencePulse;

	ah[SVSUTIL_ARRLEN (ah) - 1] = ScSessionManager::hNetUP;	// THIS MUST BE THE LAST ONE!!!

	DWORD c = SVSUTIL_ARRLEN (ah);
	if (! ah[c-1])
		--c;

	gMem->Unlock ();

	for ( ; ; ) {
		DWORD dwHowCalled = WaitForMultipleObjects (c, ah, FALSE, SCQMAN_PERIODIC);

		if (dwHowCalled == WAIT_FAILED) {
			SVSUTIL_ASSERT (0);
			break;
		}

		gMem->Lock ();

		if (dwHowCalled == WAIT_OBJECT_0)
			gSessionMan->ConnService ();

		if (dwHowCalled == (WAIT_OBJECT_0 + 1))
			ExpirationCheck ();

		if (dwHowCalled == (WAIT_OBJECT_0 + 2))
			OrderAckCheck ();

		if (dwHowCalled == (WAIT_OBJECT_0 + 3))
			ReceiveOrderAck ();

		if (dwHowCalled == (WAIT_OBJECT_0 + 4))
			gSeqMan->SendAcks ();

		if (dwHowCalled == (WAIT_OBJECT_0 + 5))
			gSessionMan->ConnectToNet (SCSESSION_LANAUP_DELAY);

		if (dwHowCalled == WAIT_TIMEOUT)
			PeriodicCheck ();

		gMem->Unlock ();
	}
}

DWORD WINAPI ScQueueManager::MainThread_s (void *arg) {
#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_INIT, L"Queue manager thread has started...\n");
#endif

	ScQueueManager *pQueueMan = (ScQueueManager *)arg;
	pQueueMan->MainThread();
	return 0;
}

void ScQueueManager::Start (void) {
	SVSUTIL_ASSERT (gMem->IsLocked());

#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_INIT, L"Starting queue manager main thread...\n");
#endif

	DWORD tid;
	hMainThread = CreateThread (NULL, 0, ScQueueManager::MainThread_s, this, 0, &tid);
}

void ScQueueManager::Stop (void) {
	SVSUTIL_ASSERT (gMem->IsLocked());

#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_INIT, L"Stopping queue manager main thread...\n");
#endif

	if (hMainThread) {
#if defined (SC_VERBOSE)
		scerror_DebugOut (VERBOSE_MASK_INIT, L"Terminating queue manager thread...\n");
#endif
		TerminateThread (hMainThread, 0);
		::CloseHandle (hMainThread);
	}

	hMainThread = NULL;
}

void ScQueueManager::SaveMessageID (void) {
	SVSUTIL_ASSERT (gMem->IsLocked());

	HKEY hKey;
	LONG hr = RegOpenKeyEx (HKEY_LOCAL_MACHINE, MSMQ_SC_REGISTRY_KEY, 0, KEY_WRITE, &hKey);

	if (hr != ERROR_SUCCESS)
		return;

	RegSetValueEx (hKey, L"MessageID", 0, REG_DWORD, (BYTE *)&uiMessageID, sizeof(DWORD));

	RegCloseKey (hKey);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -