📄 scqueue.cxx
字号:
if (sFile)
delete sFile;
if (lpszQueueHost)
svsutil_StringHashFree (gMem->pStringHash, lpszQueueHost);
if (lpszQueueName)
svsutil_StringHashFree (gMem->pStringHash, lpszQueueName);
if (lpszFormatName)
svsutil_StringHashFree (gMem->pStringHash, lpszFormatName);
if (pSess)
gSessionMan->ReleaseSession (pSess);
if (pPackets) {
pPackets->Empty (freePacketData, NULL);
delete pPackets;
}
if (lpszQueueLabel)
g_funcFree (lpszQueueLabel, g_pvFreeData);
if (hUpdateEvent)
CloseHandle (hUpdateEvent);
}
ScPacket *ScQueue::InsertPacket (ScPacket *pPacket) {
SVSUTIL_ASSERT (! pPacket->pNode);
SVSUTIL_ASSERT (! pPacket->pTimeoutNode);
SVSUTIL_ASSERT (pPacket->uiPacketState == SCPACKET_STATE_ALIVE);
int iRes = (NULL != (pPacket->pNode = pPackets->Insert (pPacket->hkOrderKey, pPacket)));
if (! iRes) {
unsigned int uiWhichSection = 0;
sFile->DeletePacket (pPacket, uiWhichSection);
sFile->UpdateSection (uiWhichSection);
svsutil_FreeFixed (pPacket, gMem->pPacketMem);
return NULL;
}
pPacket->pQueue = this;
if (! qp.bIsIncoming)
gSessionMan->PacketInserted (pSess);
if (pPacket->tExpirationTime != INFINITE) {
pPacket->pTimeoutNode = gMem->pTimeoutTree->Insert (pPacket->tExpirationTime, pPacket);
SetEvent (gQueueMan->hPacketExpired);
}
return pPacket;
}
ScPacket *ScQueue::MakePacket (ScPacketImage *pPacketImage, int iDirEntry, int fPutInFile) {
SVSUTIL_ASSERT (iQueueSizeB >= 0);
int iSize = pPacketImage->PersistedSize ();
if ((unsigned int)(iSize + iQueueSizeB) > qp.uiQuotaK * 1024)
return NULL;
if ((unsigned int)(gMachine->iMachineQuotaUsedB + iSize) > gMachine->uiMachineQuotaK * 1024)
return NULL;
ScPacket *pPacket = (ScPacket *)svsutil_GetFixed (gMem->pPacketMem);
if (! pPacket)
return NULL;
pPacket->ScPacket::ScPacket();
pPacket->hkOrderKey = pPacketImage->hkOrderKey;
pPacket->tCreationTime = pPacketImage->sect.pUserHeader->GetSentTime();
unsigned int uiNow = scutil_now ();
if (qp.bIsJournal || qp.bIsDeadLetter || qp.bIsMachineJournal)
pPacket->tExpirationTime = INFINITE;
else {
unsigned int tTime;
if (qp.bIsIncoming) {
tTime = pPacketImage->sect.pUserHeader->GetTimeToLiveDelta ();
if (tTime != INFINITE)
tTime += pPacketImage->sect.pBaseHeader->GetAbsoluteTimeToQueue ();
} else
tTime = pPacketImage->sect.pBaseHeader->GetAbsoluteTimeToQueue ();
if (tTime != INFINITE)
tTime += uiNow;
pPacket->tExpirationTime = tTime;
}
OBJECTID oid;
pPacketImage->sect.pUserHeader->GetMessageID (&oid);
#if defined (SC_VERBOSE) || defined (SC_INCLUDE_CONSOLE)
pPacket->uiMessageID = oid.Uniquifier;
pPacket->guidSourceQM = oid.Lineage;
#endif
pPacket->uiAckType = pPacketImage->sect.pPropHeader->GetAckType ();
pPacket->uiAuditType = pPacketImage->sect.pUserHeader->GetAuditing ();
pPacket->iDirEntry = iDirEntry;
pPacket->pImage = pPacketImage;
pPacket->uiSeqN = pPacketImage->sect.pXactHeader ? pPacketImage->sect.pXactHeader->GetSeqN () : 0;
if (qp.bIsJournal || qp.bIsDeadLetter || qp.bIsMachineJournal || (pPacket->pImage->sect.pUserHeader->GetDelivery() == MQMSG_DELIVERY_RECOVERABLE)) {
if (fPutInFile) {
SVSUTIL_ASSERT (iDirEntry == -1);
unsigned int uiWhichSection = 0;
if ((! sFile->BackupPacket (pPacket, uiWhichSection)) ||
(! sFile->UpdateSection (uiWhichSection))) {
svsutil_FreeFixed (pPacket, gMem->pPacketMem);
return NULL;
}
}
} else
SVSUTIL_ASSERT (iDirEntry == -1);
tModification = uiNow;
if (qp.bIsJournal || qp.bIsDeadLetter || qp.bIsMachineJournal || (pPacket->pImage->sect.pUserHeader->GetDelivery() == MQMSG_DELIVERY_RECOVERABLE))
pPacket->pImage = NULL;
SetEvent (hUpdateEvent);
ScPacket *pPacket2 = InsertPacket (pPacket);
if (pPacket2) {
if (! pPacket->pImage)
g_funcFree (pPacketImage, g_pvFreeData);
iQueueSizeB += iSize;
gMachine->iMachineQuotaUsedB += iSize;
} else
svsutil_FreeFixed (pPacket, gMem->pPacketMem);
return pPacket2;
}
int ScQueue::DisposeOfPacket (ScPacket *pPacket) {
SVSUTIL_ASSERT (pPacket->pQueue == this);
// The packet might be already gone. In this case the size has already been subtracted
// from the queue.
int iSize = 0;
if (pPacket->pImage)
iSize = pPacket->pImage->PersistedSize ();
else if (pPacket->iDirEntry >= 0)
iSize = sFile->GetPacketSize (pPacket);
iQueueSizeB -= iSize;
SVSUTIL_ASSERT (iQueueSizeB >= 0);
gMachine->iMachineQuotaUsedB -= iSize;
SVSUTIL_ASSERT (gMachine->iMachineQuotaUsedB >= 0);
pPacket->uiPacketState = SCPACKET_STATE_DEAD;
pPacket->tCreationTime = pPacket->tExpirationTime = 0;
#if defined (SC_VERBOSE) || defined (SC_INCLUDE_CONSOLE)
pPacket->uiMessageID = 0;
memset (&pPacket->guidSourceQM, 0, sizeof(pPacket->guidSourceQM));
#endif
pPacket->uiAckType = 0;
if (pPacket->iDirEntry >= 0) {
unsigned int uiWhichSection = 0;
sFile->DeletePacket (pPacket, uiWhichSection);
sFile->UpdateSection (uiWhichSection);
pPacket->iDirEntry = -1;
}
if (pPacket->pImage) {
g_funcFree (pPacket->pImage, g_pvFreeData);
pPacket->pImage = NULL;
}
if (pPacket->pTimeoutNode) {
gMem->pTimeoutTree->Delete (pPacket->pTimeoutNode);
pPacket->pTimeoutNode = NULL;
SetEvent (gQueueMan->hPacketExpired);
}
if (pPacket->GetRefCount() > 1)
return TRUE;
if (pPacket->pNode) {
pPackets->Delete (pPacket->pNode);
pPacket->pNode = NULL;
if (pPackets->IsEmpty()) // Reset the counter...
hkReceived = 0;
}
pPacket->DelRef ();
svsutil_FreeFixed (pPacket, gMem->pPacketMem);
tModification = scutil_now ();
return TRUE;
}
int ScQueue::Advance (ScHandleInfo *pHInfo) {
SVSUTIL_ASSERT (pHInfo->uiHandleType == SCQMAN_HANDLE_CURSOR);
SVSUTIL_ASSERT (this == pHInfo->pQueue);
SVSUTIL_ASSERT (qp.bIsIncoming);
//
// We will be looking for next after base
//
SVSTNode *pBaseNode = (! pHInfo->c.pNode) ? pPackets->Min() : pPackets->Next(pHInfo->c.pNode);
unsigned int uiTime = scutil_now ();
while (pBaseNode) {
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pBaseNode);
if ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) && did_not_expire (pPacket->tExpirationTime, uiTime))
break;
pBaseNode = pPackets->Next (pBaseNode);
}
if (! pBaseNode)
return FALSE;
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pBaseNode);
SVSUTIL_ASSERT ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) && did_not_expire (pPacket->tExpirationTime, uiTime));
if (pHInfo->c.pNode) {
ScPacket *pPrevPacket = (ScPacket *)SVSTree::GetData (pHInfo->c.pNode);
pPrevPacket->DelRef ();
if ((pPrevPacket->GetRefCount() == 1) && (pPrevPacket->uiPacketState == SCPACKET_STATE_DEAD))
DisposeOfPacket (pPrevPacket);
}
pPacket->AddRef ();
pHInfo->c.pNode = pBaseNode;
return TRUE;
}
int ScQueue::Backup (ScHandleInfo *pHInfo) {
SVSUTIL_ASSERT (pHInfo->uiHandleType == SCQMAN_HANDLE_CURSOR);
SVSUTIL_ASSERT (this == pHInfo->pQueue);
SVSUTIL_ASSERT (qp.bIsIncoming);
//
// We will be looking for prev before base
//
if (! pHInfo->c.pNode)
return FALSE;
SVSTNode *pBaseNode = pHInfo->c.pNode;
SVSUTIL_ASSERT (pBaseNode);
pBaseNode = pPackets->Prev(pBaseNode);
unsigned int uiTime = scutil_now ();
while (pBaseNode) {
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pBaseNode);
if ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) && did_not_expire (pPacket->tExpirationTime, uiTime))
break;
pBaseNode = pPackets->Prev (pBaseNode);
}
if (! pBaseNode)
return FALSE;
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pBaseNode);
SVSUTIL_ASSERT ((pPacket->uiPacketState == SCPACKET_STATE_ALIVE) && did_not_expire (pPacket->tExpirationTime, uiTime));
ScPacket *pPrevPacket = (ScPacket *)SVSTree::GetData (pHInfo->c.pNode);
pPrevPacket->DelRef ();
if ((pPrevPacket->GetRefCount() == 1) && (pPrevPacket->uiPacketState == SCPACKET_STATE_DEAD))
DisposeOfPacket (pPrevPacket);
pPacket->AddRef ();
pHInfo->c.pNode = pBaseNode;
return TRUE;
}
int ScQueue::Reset (ScHandleInfo *pHInfo) {
if (pHInfo->c.pNode) {
ScPacket *pPrevPacket = (ScPacket *)SVSTree::GetData (pHInfo->c.pNode);
pPrevPacket->DelRef ();
if ((pPrevPacket->GetRefCount() == 1) && (pPrevPacket->uiPacketState == SCPACKET_STATE_DEAD))
DisposeOfPacket (pPrevPacket);
}
pHInfo->c.pNode = NULL;
return TRUE;
}
int ScQueue::BringToMemory (ScPacket *pPacket) {
if ((pPacket->iDirEntry >= 0) && (! pPacket->pImage))
return sFile->RestorePacket(pPacket);
if (pPacket->pImage)
return TRUE;
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_FATAL, L"Could not retrieve packet information.\n");
#endif
return FALSE;
}
void ScQueue::FreeFromMemory (ScPacket *pPacket) {
if ((pPacket->iDirEntry >= 0) && pPacket->pImage) {
g_funcFree (pPacket->pImage, g_pvFreeData);
pPacket->pImage = NULL;
}
}
void ScQueue::UpdateFile (void) {
sFile->UpdateSection (SCFILE_UPDATE_HEADER);
}
void ScQueue::Purge (unsigned int uiPurgeType) {
//
// Properly reject all messages...
//
unsigned int l_uiQuotaK = qp.uiQuotaK;
qp.uiQuotaK = 0;
SVSTNode *pMinNode = NULL;
for ( ; ; ) {
SVSTNode *pNode = pMinNode ? pPackets->Next (pMinNode) : pPackets->Min();
if (! pNode)
break;
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
if (pPacket->uiPacketState != SCPACKET_STATE_ALIVE) {
pMinNode = pNode;
continue;
}
if (qp.bTransactional) {
if (BringToMemory (pPacket))
gQueueMan->ForwardTransactionalResponse (pPacket->pImage, uiPurgeType, NULL, NULL);
}
gQueueMan->RejectPacket (pPacket, uiPurgeType, pPacket->pQueue);
DisposeOfPacket (pPacket);
}
qp.uiQuotaK = l_uiQuotaK;
}
int ScQueue::PurgeMessage (OBJECTID *poid, unsigned int uiPurgeType) {
//
// Properly reject all messages...
//
SVSTNode *pNode = pPackets->Min();
while (pNode) {
ScPacket *pPacket = (ScPacket *)SVSTree::GetData (pNode);
if (BringToMemory (pPacket)) {
OBJECTID oid2;
pPacket->pImage->sect.pUserHeader->GetMessageID (&oid2);
if (memcmp (&oid2, poid, sizeof (oid2)) == 0) {
if (qp.bTransactional)
gQueueMan->ForwardTransactionalResponse (pPacket->pImage, uiPurgeType, NULL, NULL);
gQueueMan->RejectPacket (pPacket, uiPurgeType, pPacket->pQueue);
DisposeOfPacket (pPacket);
return TRUE;
}
FreeFromMemory (pPacket);
}
pNode = pPackets->Next (pNode);
}
return FALSE;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -