📄 scorder.cxx
字号:
pSeq->pNextHash = pSeqBuckets[iPos];
pSeqBuckets[iPos] = pSeq;
pSeq->fActive = FALSE;
pSeq->ipAddr.S_un.S_addr = INADDR_NONE;
pSeq->pNextActive = NULL;
pSeq->pPrevActive = NULL;
pSeq->pQueue = pQueueLocal;
pSeq->p = &pCluster->pos[i];
}
}
}
CloseHandle (hFile);
PruneStringHash ();
return TRUE;
}
void ScSequenceCollection::SendAcks (void) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Sending acks...\n");
#endif
ScOrderSeq *pSeq = pSeqActive;
unsigned int uiNowS = scutil_now ();
while (pSeq) {
SVSUTIL_ASSERT (pSeq->fActive);
SVSUTIL_ASSERT (pSeq->p);
SVSUTIL_ASSERT (pSeq->pQueue);
ScOrderSeq *pSeqNext = pSeq->pNextActive;
if (uiNowS - pSeq->p->uiLastAccessS < SCSEQUENCE_R_ACKPERIOD * gMachine->uiOrderAckScale) {
SVSUTIL_ASSERT (pSeq->p->llCurrentSeqID && pSeq->p->uiCurrentSeqN);
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Sending ack for sequence %016I64x:%d to ack queue %s from transact queue %s\n", pSeq->p->llCurrentSeqID, pSeq->p->uiCurrentSeqN, GetStringFromGUID(&pSeq->p->guidOrderAck), pSeq->pQueue->lpszFormatName);
#endif
gQueueMan->SendOrderAck (pSeq);
} else { // Deactivate it...
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Deactivating order sequence %016I64x:%d to ack queue %s from transact queue %s\n", pSeq->p->llCurrentSeqID, pSeq->p->uiCurrentSeqN, GetStringFromGUID(&pSeq->p->guidOrderAck), pSeq->pQueue->lpszFormatName);
#endif
if (pSeq == pSeqActive) {
SVSUTIL_ASSERT (! pSeq->pPrevActive);
pSeqActive = pSeq->pNextActive;
} else
SVSUTIL_ASSERT (pSeq->pPrevActive);
if (pSeq->pPrevActive)
pSeq->pPrevActive->pNextActive = pSeq->pNextActive;
if (pSeq->pNextActive)
pSeq->pNextActive->pPrevActive = pSeq->pPrevActive;
pSeq->fActive = FALSE;
pSeq->pNextActive = pSeq->pPrevActive = NULL;
}
pSeq = pSeqNext;
}
if (pSeqActive)
svsutil_SetAttrTimer (gMem->pTimer, (SVSHandle)hSequencePulse, SCSEQUENCE_R_ACKSEND * gMachine->uiOrderAckScale * 1000);
}
//
// Persistent String-GUID map
//
StringGuidHash *ScSequenceCollection::GetHashBucket (WCHAR *sz) {
unsigned int uiNdx = Hash (sz);
StringGuidHash *pH = pStringBuckets[uiNdx];
while (pH) {
if (wcsicmp (pH->szString, sz) == 0)
return pH;
pH = pH->pNextInString;
}
return NULL;
}
StringGuidHash *ScSequenceCollection::GetHashBucket (GUID *pguid) {
unsigned int uiNdx = Hash (pguid);
StringGuidHash *pH = pGuidBuckets[uiNdx];
while (pH) {
if (pH->guid == *pguid)
return pH;
pH = pH->pNextInGUID;
}
return NULL;
}
int ScSequenceCollection::AddNewPairToHash (WCHAR *szString, GUID *pguid, int fAddToFile) {
if (GetHashBucket (szString)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Duplicate string %s in hash\n", szString);
#endif
return ERROR_DUPLICATE_TAG;
}
if (GetHashBucket (pguid)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Duplicate guid " SC_GUID_FORMAT L"in hash\n", SC_GUID_ELEMENTS(pguid));
#endif
return ERROR_DUPLICATE_TAG;
}
if (fAddToFile) {
if (! OpenStrings ())
return FALSE;
uiLastStringAccessT = GetTickCount ();
ScStringStoreEntry e;
e.guid = *pguid;
e.uiStringSize = sizeof(WCHAR) * (1 + wcslen (szString));
DWORD dwCurrentFilePosition = SetFilePointer (hStringFile, 0, NULL, FILE_CURRENT);
DWORD dwWrit = 0;
if ((! WriteFile (hStringFile, &e, offsetof(ScStringStoreEntry, szString), &dwWrit, NULL)) ||
(dwWrit != offsetof(ScStringStoreEntry, szString)) ||
(! WriteFile (hStringFile, szString, e.uiStringSize, &dwWrit, NULL)) ||
(dwWrit != e.uiStringSize)) {
if (dwCurrentFilePosition != 0xffffffff) {
SetFilePointer (hStringFile, dwCurrentFilePosition, NULL, FILE_BEGIN);
SetEndOfFile (hStringFile);
FlushFileBuffers (hStringFile);
}
return FALSE;
}
FlushFileBuffers (hStringFile);
}
unsigned int cStringHash = Hash (szString);
unsigned int cGuidHash = Hash (pguid);
StringGuidHash *pH = (StringGuidHash *)g_funcAlloc (offsetof (StringGuidHash, szString) + sizeof(WCHAR)*(1 + wcslen (szString)),g_pvAllocData);
if (! pH) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Can't alloc StringGuidHash! - out of memory\n");
#endif
return ERROR_OUTOFMEMORY;
}
pH->guid = *pguid;
pH->uiRef = 0;
wcscpy (pH->szString, szString);
pH->pNextInGUID = pGuidBuckets[cGuidHash];
pGuidBuckets[cGuidHash] = pH;
pH->pNextInString = pStringBuckets[cStringHash];
pStringBuckets[cStringHash] = pH;
return ERROR_SUCCESS;
}
int ScSequenceCollection::LoadStringHash (void) {
DWORD dwAttrib = GetFileAttributes (lpszStringName);
if (dwAttrib == 0xffffffff) { // Does not exist. Check for .new file
WCHAR szNewName[_MAX_PATH];
if (FAILED(StringCchCopyW(szNewName,_MAX_PATH,lpszStringName)) ||
FAILED(StringCchCatW(szNewName,_MAX_PATH,L".new")))
{
SVSUTIL_ASSERT(0);
return FALSE;
}
if (0xffffffff != GetFileAttributes (szNewName))
MoveFile (szNewName, lpszStringName);
}
HANDLE hFile = CreateFile (lpszStringName, GENERIC_READ, 0, NULL, OPEN_ALWAYS, FILE_FLAG_RANDOM_ACCESS, NULL);
if (hFile == INVALID_HANDLE_VALUE)
return FALSE;
int iRes = FALSE;
WCHAR szString [MSMQ_SC_MAX_URI_BYTES/sizeof(WCHAR)];
for ( ; ; ) {
ScStringStoreEntry e;
DWORD dwRead = 0;
if (! ReadFile (hFile, &e, offsetof (ScStringStoreEntry, szString), &dwRead, NULL)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"String store corrupt: read failed (header)\n");
#endif
break;
}
if (dwRead == 0) {
iRes = TRUE;
break;
}
if (dwRead != offsetof (ScStringStoreEntry, szString)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"String store corrupt: failed reading %d bytes of header\n", offsetof (ScStringStoreEntry, szString));
#endif
break;
}
if ((e.uiStringSize > sizeof(szString)) || (e.uiStringSize == 0) || (e.uiStringSize & 1)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"String store corrupt: too big or 0-sized string (%d bytes)\n", e.uiStringSize);
#endif
break;
}
if ((! ReadFile (hFile, szString, e.uiStringSize, &dwRead, NULL)) || (dwRead != e.uiStringSize)) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"String store corrupt: read failed (string)\n");
#endif
break;
}
int cChar = e.uiStringSize / sizeof(WCHAR);
if (szString[cChar - 1] != '\0') { // MUST zero-terminate!
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"String store corrupt: string non-zero-terminated\n");
#endif
break;
}
if (ERROR_SUCCESS != AddNewPairToHash (szString, &e.guid, FALSE))
break;
}
CloseHandle (hFile);
return iRes;
}
void ScSequenceCollection::PruneStringHash (void) {
for (int i = 0 ; i < SCSEQUENCE_HASH_BUCKETS ; ++i) {
StringGuidHash *p = pStringBuckets[i];
while (p) {
p->uiRef = 0;
p = p->pNextInString;
}
}
for (i = 0 ; i < SCSEQUENCE_HASH_BUCKETS ; ++i) {
ScOrderSeq *pSeq = pSeqBuckets[i];
while (pSeq) {
StringGuidHash *pH = GetHashBucket (&pSeq->p->guidOrderAck);
if (pH)
pH->uiRef++;
else {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Error: no string for queue name guid" SC_GUID_FORMAT L"\n", SC_GUID_ELEMENTS((&pSeq->p->guidOrderAck)));
#endif
SVSUTIL_ASSERT (0);
}
pH = GetHashBucket (&pSeq->p->guidQueueName);
if (pH)
pH->uiRef++;
else {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Error: no string for queue name guid" SC_GUID_FORMAT L"\n", SC_GUID_ELEMENTS((&pSeq->p->guidQueueName)));
#endif
SVSUTIL_ASSERT (0);
}
pH = GetHashBucket (&pSeq->p->guidStreamId);
if (pH)
pH->uiRef++;
else {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Error: no string for streamid guid" SC_GUID_FORMAT L"\n", SC_GUID_ELEMENTS((&pSeq->p->guidStreamId)));
#endif
SVSUTIL_ASSERT (0);
}
pSeq = pSeq->pNextHash;
}
}
int iEmpty = 0;
int iTotal = 0;
for (i = 0 ; i < SCSEQUENCE_HASH_BUCKETS ; ++i) {
StringGuidHash *p = pStringBuckets[i];
while (p) {
++iTotal;
if (p->uiRef == 0)
++iEmpty;
p = p->pNextInString;
}
}
if (iEmpty * SCSEQUENCE_STALEFACTOR > iTotal) { // Rewrite
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Rewriting string hash database\n");
#endif
CloseStrings ();
WCHAR szNewFile[_MAX_PATH];
wcscpy (szNewFile, lpszStringName);
wcscat (szNewFile, L".new");
HANDLE hFile = CreateFile (szNewFile, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, 0, NULL);
if (hFile == INVALID_HANDLE_VALUE) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_ORDER, L"Can't recreate %s\n", szNewFile);
#endif
return;
}
int fError = FALSE;
for (i = 0 ; (i < SCSEQUENCE_HASH_BUCKETS) && (! fError); ++i) {
StringGuidHash *p = pStringBuckets[i];
while (p) {
++iTotal;
if (p->uiRef != 0) {
ScStringStoreEntry e;
e.guid = p->guid;
e.uiStringSize = sizeof(WCHAR) * (1 + wcslen (p->szString));
DWORD dwWrit = 0;
if ((! WriteFile (hStringFile, &e, offsetof(ScStringStoreEntry, szString), &dwWrit, NULL)) ||
(dwWrit != offsetof(ScStringStoreEntry, szString)) ||
(! WriteFile (hStringFile, p->szString, e.uiStringSize, &dwWrit, NULL)) ||
(dwWrit != e.uiStringSize)) {
fError = TRUE;
break;
}
}
p = p->pNextInString;
}
}
CloseHandle (hFile);
if (fError)
DeleteFile (szNewFile);
else {
DeleteFile (lpszStringName);
MoveFile (szNewFile, lpszStringName);
}
}
}
WCHAR *ScSequenceCollection::GetStringFromGUID (GUID *pguid) {
StringGuidHash *pH = GetHashBucket (pguid);
if (! pH)
return NULL;
return pH->szString;
}
int ScSequenceCollection::HashStringToGUID (WCHAR *szString, GUID *pGuid) {
StringGuidHash *pH = GetHashBucket (szString);
if (pH) {
*pGuid = pH->guid;
return TRUE;
}
int iRes;
do {
scutil_uuidgen (pGuid);
iRes = AddNewPairToHash (szString, pGuid, TRUE);
} while (iRes == ERROR_DUPLICATE_TAG);
return iRes == ERROR_SUCCESS ? TRUE : FALSE;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -