⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scqueue.cxx

📁 Windows CE 6.0 Server 源码
💻 CXX
📖 第 1 页 / 共 2 页
字号:
	if (sFile)
		delete sFile;

	if (lpszQueueHost)
		svsutil_StringHashFree (gMem->pStringHash, lpszQueueHost);

	if (lpszQueueName)
		svsutil_StringHashFree (gMem->pStringHash, lpszQueueName);

	if (lpszFormatName)
		svsutil_StringHashFree (gMem->pStringHash, lpszFormatName);

	if (pSess)
		gSessionMan->ReleaseSession (pSess);

	if (pPackets) {
		pPackets->Empty (freePacketData, NULL);
		delete pPackets;
	}

	if (lpszQueueLabel)
		g_funcFree (lpszQueueLabel, g_pvFreeData);

	if (hUpdateEvent)
		CloseHandle (hUpdateEvent);
}

ScPacket *ScQueue::InsertPacket (ScPacket *pPacket) {
	SVSUTIL_ASSERT (! pPacket->pNode);
	SVSUTIL_ASSERT (! pPacket->pTimeoutNode);
	SVSUTIL_ASSERT (pPacket->uiPacketState == SCPACKET_STATE_ALIVE);

	int iRes = (NULL != (pPacket->pNode = pPackets->Insert (pPacket->hkOrderKey, pPacket)));

	if (! iRes) {
		unsigned int uiWhichSection = 0;

		sFile->DeletePacket (pPacket, uiWhichSection);
		sFile->UpdateSection (uiWhichSection);

		svsutil_FreeFixed (pPacket, gMem->pPacketMem);

		return NULL;
	}

	pPacket->pQueue = this;

	if (! qp.bIsIncoming)
		gSessionMan->PacketInserted (pSess);

	if (pPacket->tExpirationTime != INFINITE) {
		pPacket->pTimeoutNode = gMem->pTimeoutTree->Insert (pPacket->tExpirationTime, pPacket);
		SetEvent (gQueueMan->hPacketExpired);
	}

	return pPacket;
}

ScPacket *ScQueue::MakePacket (ScPacketImage *pPacketImage, int iDirEntry, int fPutInFile) {
	SVSUTIL_ASSERT (iQueueSizeB >= 0);

	int iSize = pPacketImage->PersistedSize ();

	if ((unsigned int)(iSize + iQueueSizeB) > qp.uiQuotaK * 1024)
		return NULL;

	if ((unsigned int)(gMachine->iMachineQuotaUsedB + iSize) > gMachine->uiMachineQuotaK * 1024)
		return NULL;

	ScPacket *pPacket = (ScPacket *)svsutil_GetFixed (gMem->pPacketMem);

	if (! pPacket)
		return NULL;

	pPacket->ScPacket::ScPacket();

	pPacket->hkOrderKey = pPacketImage->hkOrderKey;

	pPacket->tCreationTime   = pPacketImage->sect.pUserHeader->GetSentTime();

	unsigned int uiNow = scutil_now ();

	if (qp.bIsJournal || qp.bIsDeadLetter || qp.bIsMachineJournal)
		pPacket->tExpirationTime = INFINITE;
	else {
		unsigned int tTime;

		if (qp.bIsIncoming) {
			tTime = pPacketImage->sect.pUserHeader->GetTimeToLiveDelta ();
			if (tTime != INFINITE)
				tTime += pPacketImage->sect.pBaseHeader->GetAbsoluteTimeToQueue ();
		} else
			tTime = pPacketImage->sect.pBaseHeader->GetAbsoluteTimeToQueue ();

		if (tTime != INFINITE)
			tTime += uiNow;

		pPacket->tExpirationTime = tTime;
	}

	OBJECTID oid;
	pPacketImage->sect.pUserHeader->GetMessageID (&oid);

#if defined (SC_VERBOSE) || defined (SC_INCLUDE_CONSOLE)
	pPacket->uiMessageID     = oid.Uniquifier;
	pPacket->guidSourceQM	 = oid.Lineage;
#endif

	pPacket->uiAckType       = pPacketImage->sect.pPropHeader->GetAckType ();
	pPacket->uiAuditType     = pPacketImage->sect.pUserHeader->GetAuditing ();
	pPacket->iDirEntry	     = iDirEntry;
	pPacket->pImage			 = pPacketImage;

	pPacket->uiSeqN          = pPacketImage->sect.pXactHeader ? pPacketImage->sect.pXactHeader->GetSeqN () : 0;

	if (qp.bIsJournal || qp.bIsDeadLetter || qp.bIsMachineJournal || (pPacket->pImage->sect.pUserHeader->GetDelivery() == MQMSG_DELIVERY_RECOVERABLE)) {
		if (fPutInFile) {
			SVSUTIL_ASSERT (iDirEntry == -1);

			unsigned int uiWhichSection = 0;

			if ((! sFile->BackupPacket (pPacket, uiWhichSection)) ||
				(! sFile->UpdateSection (uiWhichSection))) {
				svsutil_FreeFixed (pPacket, gMem->pPacketMem);
				return NULL;
			}
		}
	} else
		SVSUTIL_ASSERT (iDirEntry == -1);

	tModification = uiNow;

	if (qp.bIsJournal || qp.bIsDeadLetter || qp.bIsMachineJournal || (pPacket->pImage->sect.pUserHeader->GetDelivery() == MQMSG_DELIVERY_RECOVERABLE))
		pPacket->pImage = NULL;

	SetEvent (hUpdateEvent);

	ScPacket *pPacket2 = InsertPacket (pPacket);

	if (pPacket2) {
		if (! pPacket->pImage)
			g_funcFree (pPacketImage, g_pvFreeData);

		iQueueSizeB += iSize;
		gMachine->iMachineQuotaUsedB += iSize;
	} else 
		svsutil_FreeFixed (pPacket, gMem->pPacketMem);

	return pPacket2;
}

int ScQueue::DisposeOfPacket (ScPacket *pPacket) {
	SVSUTIL_ASSERT (pPacket->pQueue == this);

	// The packet might be already gone. In this case the size has already been subtracted
	// from the queue.

	int iSize = 0;
	if (pPacket->pImage)
		iSize = pPacket->pImage->PersistedSize ();
	else if (pPacket->iDirEntry >= 0)
		iSize = sFile->GetPacketSize (pPacket);

	iQueueSizeB -= iSize;
	SVSUTIL_ASSERT (iQueueSizeB >= 0);

	gMachine->iMachineQuotaUsedB -= iSize;
	SVSUTIL_ASSERT (gMachine->iMachineQuotaUsedB >= 0);

	pPacket->uiPacketState = SCPACKET_STATE_DEAD;
	pPacket->tCreationTime = pPacket->tExpirationTime = 0;

#if defined (SC_VERBOSE) || defined (SC_INCLUDE_CONSOLE)
	pPacket->uiMessageID = 0;
	memset (&pPacket->guidSourceQM, 0, sizeof(pPacket->guidSourceQM));
#endif

	pPacket->uiAckType = 0;

	if (pPacket->iDirEntry >= 0) {
		unsigned int uiWhichSection = 0;
		sFile->DeletePacket (pPacket, uiWhichSection);
		sFile->UpdateSection (uiWhichSection);
		pPacket->iDirEntry = -1;
	}

	if (pPacket->pImage) {
		g_funcFree (pPacket->pImage, g_pvFreeData);
		pPacket->pImage = NULL;
	}

	if (pPacket->pTimeoutNode) {
		gMem->pTimeoutTree->Delete (pPacket->pTimeoutNode);
		pPacket->pTimeoutNode = NULL;
		SetEvent (gQueueMan->hPacketExpired);
	}

	if (pPacket->GetRefCount() > 1)
		return TRUE;

	if (pPacket->pNode) {
		pPackets->Delete (pPacket->pNode);
		pPacket->pNode = NULL;

		if (pPackets->IsEmpty())		// Reset the counter...
			hkReceived = 0;
	}

	pPacket->DelRef ();

	svsutil_FreeFixed (pPacket, gMem->pPacketMem);

	tModification = scutil_now ();

	return TRUE;
}

int ScQueue::Advance (ScHandleInfo *pHInfo) {
	SVSUTIL_ASSERT (pHInfo->uiHandleType == SCQMAN_HANDLE_CURSOR);
	SVSUTIL_ASSERT (this == pHInfo->pQueue);
	SVSUTIL_ASSERT (qp.bIsIncoming);

	//
	//	We will be looking for next after base
	//
	SVSTNode *pBaseNode = (! pHInfo->c.pNode) ? pPackets->Min() : pPackets->Next(pHInfo->c.pNode);

	unsigned int uiTime = scutil_now ();

	while (pBaseNode) {
		ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pBaseNode);

		if ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) && did_not_expire (pPacket->tExpirationTime, uiTime))
			break;

		pBaseNode = pPackets->Next (pBaseNode);
	}

	if (! pBaseNode)
		return FALSE;

	ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pBaseNode);
	SVSUTIL_ASSERT ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) && did_not_expire (pPacket->tExpirationTime, uiTime));

	if (pHInfo->c.pNode) {
		ScPacket *pPrevPacket = (ScPacket *)SVSTree::GetData (pHInfo->c.pNode);
		pPrevPacket->DelRef ();
		if ((pPrevPacket->GetRefCount() == 1) && (pPrevPacket->uiPacketState == SCPACKET_STATE_DEAD))
			DisposeOfPacket (pPrevPacket);
	}

	pPacket->AddRef ();
	pHInfo->c.pNode = pBaseNode;

	return TRUE;
}

int ScQueue::Backup (ScHandleInfo *pHInfo) {
	SVSUTIL_ASSERT (pHInfo->uiHandleType == SCQMAN_HANDLE_CURSOR);
	SVSUTIL_ASSERT (this == pHInfo->pQueue);
	SVSUTIL_ASSERT (qp.bIsIncoming);

	//
	//	We will be looking for prev before base
	//
	if (! pHInfo->c.pNode)
		return FALSE;

	SVSTNode *pBaseNode = pHInfo->c.pNode;

	SVSUTIL_ASSERT (pBaseNode);

	pBaseNode = pPackets->Prev(pBaseNode);

	unsigned int uiTime = scutil_now ();

	while (pBaseNode) {
		ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pBaseNode);

		if ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) && did_not_expire (pPacket->tExpirationTime, uiTime))
			break;

		pBaseNode = pPackets->Prev (pBaseNode);
	}

	if (! pBaseNode)
		return FALSE;

	ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pBaseNode);
	SVSUTIL_ASSERT ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) && did_not_expire (pPacket->tExpirationTime, uiTime));

	ScPacket *pPrevPacket = (ScPacket *)SVSTree::GetData (pHInfo->c.pNode);
	pPrevPacket->DelRef ();
	if ((pPrevPacket->GetRefCount() == 1) && (pPrevPacket->uiPacketState == SCPACKET_STATE_DEAD))
		DisposeOfPacket (pPrevPacket);

	pPacket->AddRef ();
	pHInfo->c.pNode = pBaseNode;

	return TRUE;
}

int ScQueue::Reset (ScHandleInfo *pHInfo) {
	if (pHInfo->c.pNode) {
		ScPacket *pPrevPacket = (ScPacket *)SVSTree::GetData (pHInfo->c.pNode);
		pPrevPacket->DelRef ();
		if ((pPrevPacket->GetRefCount() == 1) && (pPrevPacket->uiPacketState == SCPACKET_STATE_DEAD))
			DisposeOfPacket (pPrevPacket);
	}

	pHInfo->c.pNode = NULL;

	return TRUE;
}

int ScQueue::BringToMemory (ScPacket *pPacket) {
	if ((pPacket->iDirEntry >= 0) && (! pPacket->pImage))
		return sFile->RestorePacket(pPacket);

	if (pPacket->pImage)
		return TRUE;

#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_FATAL, L"Could not retrieve packet information.\n");
#endif

	return FALSE;
}

void ScQueue::FreeFromMemory (ScPacket *pPacket) {
	if ((pPacket->iDirEntry >= 0) && pPacket->pImage) {
		g_funcFree (pPacket->pImage, g_pvFreeData);
		pPacket->pImage = NULL;
	}
}

void ScQueue::UpdateFile (void) {
	sFile->UpdateSection (SCFILE_UPDATE_HEADER);
}

void ScQueue::Purge (unsigned int uiPurgeType) {
	//
	//	Properly reject all messages...
	//
	unsigned int l_uiQuotaK = qp.uiQuotaK;
	qp.uiQuotaK = 0;

	SVSTNode *pMinNode = NULL;

	for ( ; ; ) {
		SVSTNode *pNode = pMinNode ? pPackets->Next (pMinNode) : pPackets->Min();
		if (! pNode)
			break;

		ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);

		if (pPacket->uiPacketState != SCPACKET_STATE_ALIVE) {
			pMinNode = pNode;
			continue;
		}

		if (qp.bTransactional) {
			if (BringToMemory (pPacket))
				gQueueMan->ForwardTransactionalResponse (pPacket->pImage, uiPurgeType, NULL, NULL);
		}

		gQueueMan->RejectPacket (pPacket, uiPurgeType, pPacket->pQueue);
		DisposeOfPacket (pPacket);
	}

	qp.uiQuotaK = l_uiQuotaK;
}

int ScQueue::PurgeMessage (OBJECTID *poid, unsigned int uiPurgeType) {
	//
	//	Properly reject all messages...
	//

	SVSTNode *pNode = pPackets->Min();

	while (pNode) {
		ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
		if (BringToMemory (pPacket)) {
			OBJECTID oid2;
			pPacket->pImage->sect.pUserHeader->GetMessageID (&oid2);
			if (memcmp (&oid2, poid, sizeof (oid2)) == 0) {
				if (qp.bTransactional)
					gQueueMan->ForwardTransactionalResponse (pPacket->pImage, uiPurgeType, NULL, NULL);

				gQueueMan->RejectPacket (pPacket, uiPurgeType, pPacket->pQueue);
				DisposeOfPacket (pPacket);

				return TRUE;
			}
			FreeFromMemory (pPacket);
		}

		pNode = pPackets->Next (pNode);
	}

	return FALSE;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -