📄 scqman.cxx
字号:
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
//
// Use of this source code is subject to the terms of the Microsoft shared
// source or premium shared source license agreement under which you licensed
// this source code. If you did not accept the terms of the license agreement,
// you are not authorized to use this source code. For the terms of the license,
// please see the license agreement between you and Microsoft or, if applicable,
// see the SOURCE.RTF on your install media or the root of your tools installation.
// THE SOURCE CODE IS PROVIDED "AS IS", WITH NO WARRANTIES.
//
/*++
Module Name:
scqman.cxx
Abstract:
Small client queue manager class support
--*/
#include <sc.hxx>
#include <scqman.hxx>
#include <scqueue.hxx>
#include <scsman.hxx>
#include <scpacket.hxx>
#include <sccomp.hxx>
#include <scorder.hxx>
BOOL UriToQueueFormat(const WCHAR *szQueue, DWORD dwQueueChars, QUEUE_FORMAT *pQueue, WCHAR **ppszQueueBuffer);
static void scqman_Signal (void *pvData) {
SetEvent ((HANDLE)pvData);
}
//
// Constructor
//
ScQueueManager::ScQueueManager (unsigned int a_uiMessageID) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_INIT, L"Allocating queue manager...\n");
#endif
pqlIncoming = NULL;
pqlOutgoing = NULL;
pQueueDLQ = NULL;
pQueueJournal = NULL;
pQueueOrderAck = NULL;
pQueueOutFRS = NULL;
uiMessageID = a_uiMessageID;
iPacketsWaitingOrderAck = 0;
pHandleMem = svsutil_AllocFixedMemDescr (sizeof(ScHandleInfo), SCQMAN_HANDLE_INCR);
pHandles = SVSNewSimpleHandleSystem(SC_MAX_HANDLES);
hPacketExpired = CreateEvent (NULL, FALSE, FALSE, NULL);
hOrderAckTimer = CreateEvent (NULL, FALSE, FALSE, NULL);
SetEvent (hPacketExpired);
SetEvent (hOrderAckTimer);
fBusy = 0;
hMainThread = NULL;
}
//
// Destructor
//
ScQueueManager::~ScQueueManager(void) {
SVSUTIL_ASSERT (gMem->IsLocked());
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_INIT, L"Deallocating queue manager...\n");
#endif
SVSUTIL_ASSERT (! hMainThread);
CloseAllHandles ();
while (pqlIncoming) {
ScQueueList *pNext = pqlIncoming->pqlNext;
delete pqlIncoming->pQueue;
delete pqlIncoming;
pqlIncoming = pNext;
}
while (pqlOutgoing) {
ScQueueList *pNext = pqlOutgoing->pqlNext;
delete pqlOutgoing->pQueue;
delete pqlOutgoing;
pqlOutgoing = pNext;
}
if (pHandles)
delete pHandles;
if (pHandleMem)
svsutil_ReleaseFixedNonEmpty (pHandleMem);
if (hPacketExpired)
::CloseHandle (hPacketExpired);
if (hOrderAckTimer)
::CloseHandle (hOrderAckTimer);
}
//
// Locate routing (external) target
//
//
static int GetRoutingTarget (int fSecureSession, WCHAR *lpszFormatName, ScPacketImage *pImage, WCHAR **ppRouteTarget) { // FALSE == not allowed to route at all
SVSUTIL_ASSERT(gMem->IsLocked());
*ppRouteTarget = NULL;
// First, attempt static routing table, first by target queue name...
if (gMachine->RouteTo (lpszFormatName, ppRouteTarget))
return TRUE;
// Then by destination machine...
GUID DestQM = *pImage->sect.pUserHeader->GetDestQM ();
if (gMachine->RouteTo (&DestQM, ppRouteTarget))
return TRUE;
// And finally by source machine...
GUID SourceQM = *pImage->sect.pUserHeader->GetSourceQM ();
if (gMachine->RouteFrom (&SourceQM, ppRouteTarget))
return TRUE;
// If the queue is not specifically mentioned, we can still route it,
// but only if it arrived on secure connection.
return fSecureSession; // if that, route everything
}
static int GetIpRoute (ScPacketImage *pImage, WCHAR **ppRouteTarget, DWORD dwFormatType) { // FALSE == not allowed
if ((! pImage->flags.fHaveIpv4Addr) || (! gMachine->fResponseByIp))
return FALSE;
WCHAR *szFormat;
if (dwFormatType == SCFILE_QP_FORMAT_HTTP)
szFormat = MSMQ_SC_FORMAT_DIRECT_HTTP;
else if (dwFormatType == SCFILE_QP_FORMAT_HTTPS)
szFormat = MSMQ_SC_FORMAT_DIRECT_HTTPS;
else
szFormat = MSMQ_SC_FORMAT_DIRECT_TCP;
WCHAR szBuffer[_MAX_PATH];
if (dwFormatType == SCFILE_QP_FORMAT_HTTP || dwFormatType == SCFILE_QP_FORMAT_HTTPS) {
if (FAILED(StringCchPrintfW(szBuffer, _MAX_PATH, L"%s%d.%d.%d.%d/msmq\\private$\\route$$$",szFormat,pImage->ipSourceAddr.S_un.S_un_b.s_b1, pImage->ipSourceAddr.S_un.S_un_b.s_b2, pImage->ipSourceAddr.S_un.S_un_b.s_b3, pImage->ipSourceAddr.S_un.S_un_b.s_b4)))
return FALSE;
}
else {
if (FAILED(StringCchPrintfW(szBuffer, _MAX_PATH, L"%s%d.%d.%d.%d\\private$\\route$$$",szFormat,pImage->ipSourceAddr.S_un.S_un_b.s_b1, pImage->ipSourceAddr.S_un.S_un_b.s_b2, pImage->ipSourceAddr.S_un.S_un_b.s_b3, pImage->ipSourceAddr.S_un.S_un_b.s_b4)))
return FALSE;
}
return NULL != (*ppRouteTarget = svsutil_wcsdup (szBuffer));
}
// takes a string in URL form and finds end of Virtual Root portion. Returns either beginning of queue name or NULL on failure.
static WCHAR * IncrementVirtualRoot(LPWSTR lpszURL) {
SVSUTIL_ASSERT (gMem->IsLocked());
LPWSTR szQueueBase = lpszURL;
DWORD ccVRootBase;
int i = 0;
// Look at whatever is between first and 2 slashes as base of vroot, run through
// table of known vroots that map to \SrmpIsapi.dll and if we hit a match return
// pointer to string past the vroot (which is HTTPD specific) and to first part of MSMQ name.
// skip past initial /
if (*szQueueBase == L'\\' || *szQueueBase == L'/')
szQueueBase++;
while (*szQueueBase) {
if ((*szQueueBase == L'\\') || (*szQueueBase == L'/'))
break;
szQueueBase++;
}
if (! (*szQueueBase))
return NULL;
szQueueBase++;
if (! (*szQueueBase))
return NULL;
ccVRootBase = (szQueueBase - lpszURL);
for (i = 0; i < MAX_VROOTS; i++) {
if (! gMachine->VRootList[i].wszVRoot)
return NULL;
if ((ccVRootBase == gMachine->VRootList[i].ccVRoot) && (0 == wcsnicmp(gMachine->VRootList[i].wszVRoot,lpszURL,ccVRootBase)))
return szQueueBase;
}
return NULL;
}
static ScQueue *CrackQueueName (WCHAR *lpszFormatName, ScQueueParms &qp, ScQueueList *pqlIncoming) {
WCHAR *lpszQueueName = NULL;
ScQueue *pQueue = NULL;
WCHAR szHttpDup[_MAX_PATH];
WCHAR *pszHttpDup = szHttpDup;
SVSUTIL_ASSERT (qp.bIsIncoming == TRUE);
if (gMachine->fUseSRMP && ((SCFILE_QP_FORMAT_HTTP == qp.bFormatType) || (SCFILE_QP_FORMAT_HTTPS == qp.bFormatType))) {
// HTTP(s)://localhost/(VrootName)/...
// BUGBUG - should I change RouteLocal to do a http://./msmq/... to allow machine name to change?
int cchNameLen = wcslen (lpszFormatName) + 1;
if (cchNameLen > SVSUTIL_ARRLEN(szHttpDup)) {
pszHttpDup = (WCHAR *)g_funcAlloc (cchNameLen * sizeof(WCHAR), g_pvAllocData);
if (! pszHttpDup) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_FATAL, L"Can't allocate %d bytes for queue name processing\n", cchNameLen * sizeof(WCHAR));
#endif
return NULL;
}
}
memcpy (pszHttpDup, lpszFormatName, cchNameLen * sizeof(WCHAR));
scutil_ReplaceBackSlashesWithSlashes(pszHttpDup);
lpszQueueName = pszHttpDup + ((SCFILE_QP_FORMAT_HTTP == qp.bFormatType) ? SVSUTIL_CONSTSTRLEN(MSMQ_SC_FORMAT_DIRECT_HTTP) : SVSUTIL_CONSTSTRLEN(MSMQ_SC_FORMAT_DIRECT_HTTPS));
lpszQueueName = wcschr (lpszQueueName, L'/'); // skip past the host name
if (lpszQueueName) {
lpszQueueName = IncrementVirtualRoot(lpszQueueName);
if (lpszQueueName)
lpszQueueName = wcsrchr (lpszQueueName, L'/');
}
} else
lpszQueueName = wcsrchr (lpszFormatName, L'\\');
if (lpszQueueName) {
lpszQueueName += 1;
WCHAR *lpszHashed = svsutil_StringHashCheck (gMem->pStringHash, lpszQueueName);
if (lpszHashed) {
ScQueueList *pql = pqlIncoming;
while (pql) {
if ((pql->pQueue->lpszQueueName == lpszHashed) && (! pql->pQueue->qp.bIsJournal) &&
(! pql->pQueue->qp.bIsDeadLetter) && (! pql->pQueue->qp.bIsMachineJournal)) {
pQueue = pql->pQueue;
break;
}
pql = pql->pqlNext;
}
}
}
if (pszHttpDup != szHttpDup)
g_funcFree (pszHttpDup, g_pvFreeData);
return pQueue;
}
ScQueue *ScQueueManager::FindOrMakeOutgoingQueue(LPWSTR lpszFormatName) {
ScQueue *pQueue = FindOutgoingByFormat(lpszFormatName);
if (! pQueue) {
ScQueueParms qp;
memset (&qp, 0, sizeof(qp));
qp.uiQuotaK = (unsigned int)gMachine->uiDefaultOutQuotaK;
qp.bIsRouterQueue = TRUE;
pQueue = MakeOutgoingQueue (lpszFormatName, &qp, NULL);
}
return pQueue;
}
ScQueue *ScQueueManager::FindQueueByPacketImage (ScPacketImage *pImage, int fResponse) {
SVSUTIL_ASSERT (gMem->IsLocked());
QUEUE_FORMAT qf;
WCHAR *lpszFwdQueue = NULL;
BOOL fFwdQueue = FALSE;
// We have an SRMP message with <fwd><via> specifying the next hop, this is queue to put packet into.
WCHAR *szNextHop = pImage->sect.pFwdViaHeader ? GetNextHopOnFwdList(pImage) : NULL;
if (szNextHop) {
if (! UriToQueueFormat(szNextHop,wcslen(szNextHop),&qf,&lpszFwdQueue))
return NULL;
}
else
pImage->sect.pUserHeader->GetDestinationQueue (&qf);
if (qf.Suffix() != QUEUE_SUFFIX_TYPE_NONE)
return NULL;
// We are direct now.
WCHAR *lpszFormatName = scutil_QFtoString (&qf);
if (lpszFwdQueue) {
g_funcFree(lpszFwdQueue,g_pvFreeData);
fFwdQueue = TRUE;
}
if (! lpszFormatName)
return NULL;
ScQueue *pQueue = NULL;
if (qf.GetType() != QUEUE_FORMAT_TYPE_DIRECT) { // Non-direct queues can only be routed by endpoints or put into OutFRS.
WCHAR *lpszRouteTarget = NULL;
if (GetRoutingTarget (pImage->flags.fSecureSession, lpszFormatName, pImage, &lpszRouteTarget)) {
if (lpszRouteTarget) { // Since we can't host non-direct queues, don't bother looking for incoming...
pQueue = FindOutgoingByFormat (lpszRouteTarget);
if (! pQueue) {
ScQueueParms qp;
memset (&qp, 0, sizeof(qp));
qp.uiQuotaK = (unsigned int)gMachine->uiDefaultOutQuotaK;
qp.bIsRouterQueue = TRUE;
pQueue = MakeOutgoingQueue (lpszRouteTarget, &qp, NULL);
}
} else if (!fFwdQueue)
pQueue = pQueueOutFRS;
}
g_funcFree (lpszFormatName, g_pvFreeData);
return pQueue;
}
ScQueueParms qp;
memset (&qp, 0, sizeof(qp));
WCHAR *lpszHostName = NULL;
WCHAR *lpszQueueName = NULL;
if (scutil_ParseNonLocalDirectFormatName (lpszFormatName, lpszHostName, lpszQueueName, &qp)) { // Failure == Local or error
SVSUTIL_ASSERT (qp.bIsIncoming == FALSE);
g_funcFree (lpszHostName, g_pvFreeData);
g_funcFree (lpszQueueName, g_pvFreeData);
WCHAR *lpszRouteTarget = NULL;
int fFreeRouteTarget = FALSE;
if (! GetRoutingTarget (pImage->flags.fSecureSession, lpszFormatName, pImage, &lpszRouteTarget)) {
if (fFwdQueue || (!(fResponse && (fFreeRouteTarget = GetIpRoute (pImage, &lpszRouteTarget,qp.bFormatType))))) {
g_funcFree (lpszFormatName, g_pvFreeData);
return NULL;
}
}
pQueue = FindOrMakeOutgoingQueue (lpszRouteTarget ? lpszRouteTarget : lpszFormatName);
g_funcFree (lpszFormatName, g_pvFreeData);
if (fFreeRouteTarget)
g_funcFree (lpszRouteTarget, g_pvFreeData);
return pQueue;
}
pQueue = CrackQueueName(lpszFormatName, qp, pqlIncoming);
g_funcFree (lpszFormatName, g_pvFreeData);
return pQueue;
}
//
// Locate incoming queue by format name
//
ScQueue *ScQueueManager::FindIncomingByFormat (WCHAR *lpszFormatName, int *pfQueueType) {
SVSUTIL_ASSERT (gMem->IsLocked());
WCHAR *lpszHashed = svsutil_StringHashCheck (gMem->pStringHash, lpszFormatName);
if (! lpszHashed) {
ScQueueParms qpp;
ScQueue *pQueue = NULL;
WCHAR *lpszHostName = NULL;
WCHAR *lpszQueueName = NULL;
memset (&qpp, 0, sizeof(qpp));
// see if it's local HTTP queue
if (scutil_ParseNonLocalDirectFormatName (lpszFormatName, lpszHostName, lpszQueueName, &qpp)) { // failure means it's local, or an error
if (lpszHostName)
g_funcFree (lpszHostName, g_pvFreeData);
if (lpszQueueName)
g_funcFree (lpszQueueName, g_pvFreeData);
return NULL;
}
if (qpp.bIsIncoming)
pQueue = CrackQueueName (lpszFormatName, qpp, pqlIncoming);
if (pQueue && pfQueueType)
*pfQueueType = qpp.bFormatType;
return pQueue;
}
ScQueueList *pql = pqlIncoming;
while (pql) {
if (pql->pQueue->lpszFormatName == lpszHashed)
return pql->pQueue;
pql = pql->pqlNext;
}
return NULL;
}
//
// Locate outgoing queue by format name
//
ScQueue *ScQueueManager::FindOutgoingByFormat (WCHAR *lpszFormatName) {
SVSUTIL_ASSERT (gMem->IsLocked());
WCHAR *lpszHashed = svsutil_StringHashCheck (gMem->pStringHash, lpszFormatName);
if (! lpszHashed)
return NULL;
ScQueueList *pql = pqlOutgoing;
while (pql) {
if (pql->pQueue->lpszFormatName == lpszHashed)
return pql->pQueue;
pql = pql->pqlNext;
}
return NULL;
}
ScQueue *ScQueueManager::FinishCreation (ScQueue *pQueue, ScQueue *pJournal, int fDelFiles) {
SVSUTIL_ASSERT (gMem->IsLocked());
int fError = FALSE;
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_QUEUE, L"Finishing creation of a queue...\n");
#endif
if ((! pQueue) || (! pQueue->fInitialized) || (pQueue->qp.bHasJournal &&
((! pJournal) || (! pJournal->fInitialized) ||
(! pJournal->qp.bIsJournal)))) {
#if defined (SC_VERBOSE)
scerror_DebugOut (VERBOSE_MASK_QUEUE, L"Queue creation failed because of %s...\n", (! pQueue) ? L"alloc failure" : ((! pQueue->fInitialized) ? L"init failure" : L"flag inconsistency"));
#endif
fError = TRUE;
}
//
// Synchronization on queue is not required yet
// because queue pointer has not yet been published.
//
// There is no way for threads to find out about it.
//
//
// Now publish it...
//
while (! fError) {
if (pQueue->qp.bIsIncoming) {
ScQueueList *pqlNew = new ScQueueList (pQueue, pqlIncoming);
if (! pqlNew) {
fError = TRUE;
break;
}
pqlIncoming = pqlNew;
if (pJournal) {
pqlNew = new ScQueueList (pJournal, pqlIncoming);
if (! pqlNew) {
pqlNew = pqlIncoming;
pqlIncoming = pqlIncoming->pqlNext;
pqlIncoming->pqlPrev = NULL;
delete pqlNew;
fError = TRUE;
break;
}
pqlIncoming = pqlNew;
pQueue->pJournal = pJournal;
}
} else {
SVSUTIL_ASSERT (! pJournal);
ScQueueList *pqlNew = new ScQueueList (pQueue, pqlOutgoing);
if (pqlNew)
pqlOutgoing = pqlNew;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -