⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 srmpsend.cxx

📁 Windows CE 6.0 Server 源码
💻 CXX
📖 第 1 页 / 共 3 页
字号:
//
// Copyright (c) Microsoft Corporation.  All rights reserved.
//
//
// Use of this source code is subject to the terms of the Microsoft shared
// source or premium shared source license agreement under which you licensed
// this source code. If you did not accept the terms of the license agreement,
// you are not authorized to use this source code. For the terms of the license,
// please see the license agreement between you and Microsoft or, if applicable,
// see the SOURCE.RTF on your install media or the root of your tools installation.
// THE SOURCE CODE IS PROVIDED "AS IS", WITH NO WARRANTIES.
//
/*++


Module Name:

    SrmpSend.cxx

Abstract:

    Creates an HTTP packet based on message and sends it to intented recepient.
 
--*/

#include <windows.h>
#include <wininet.h>
#include <mq.h>
#include <service.h>

#include "sc.hxx"
#include "scsrmp.hxx"
#include "scsman.hxx"
#include "scpacket.hxx"
#include "SrmpAccept.hxx"
#include "scdefs.hxx"
#include "srmpparse.hxx"
#include "fntoken.h"
#include "scutil.hxx"
#include "scqueue.hxx"
#include "scqman.hxx"
#include "SrmpFwd.hxx"

// use SVSXMLEncode::Append() on values below because we don't want to escape the quote characters.
const WCHAR cszXMLHeader[]          = L"<?xml version=\"1.0\" ?>";
const WCHAR cszEnvelopeStartNS[]    = L"<se:Envelope xmlns:se=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns=\"http://schemas.xmlsoap.org/srmp/\">";
const WCHAR cszMustUnderstandNS[]   = L" se:mustUnderstand=\"1\">";
const WCHAR cszPathNS[]             = L"<path xmlns=\"http://schemas.xmlsoap.org/rp/\"";
const WCHAR cszPropertiesNS[]       = L"<properties";
const WCHAR cszServicesNS[]         = L"<services";
const WCHAR cszStreamNS[]           = L"<Stream";
const WCHAR cszMsmqNS[]             = L"<Msmq xmlns=\"msmq.namespace.xml\">";

const WCHAR cszHeaderNS[]           = L"se:Header";
const WCHAR cszBodyNS[]             = L"se:Body";
const WCHAR cszEnvelopeNS[]         = L"se:Envelope";

DWORD CalcSrmpFwdSize(WCHAR *szQueueName, ScPacketImage *pPacketImage);

BOOL  SendPath(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);
BOOL  SendProps(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);
BOOL  SendServices(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);
BOOL  SendStream(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);
BOOL  SendStreamRcpt(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);
BOOL  SendDeliveryRcpt(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);
BOOL  SendCommitRcpt(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);
BOOL  SendMsmq(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);
BOOL  SendUserHeader(SVSXMLEncode *pXmlEncode, ScPacketImage *pImage);

DWORD GetExpiresFromPacketImage(ScPacketImage *pImage);
BOOL  SetMQCurrentTime(SVSXMLEncode *pXmlEncode, const WCHAR *szElementName);
BOOL  SetMsgID(SVSXMLEncode *pXmlEncode, OBJECTID *pMsgId);

int BuildHttpHeadersAndMimeBody(ScPacketImage *pImage, CHAR *szSoapEnv, DWORD ccSoapEnv, BOOL fSecure, PSrmpIOCTLPacket pIOPacket, DWORD ccHeaders, SVSSimpleBuffer &cBodyBuf);
CHAR* BuildSoapEnvelope(ScPacketImage *pImage, DWORD *pccSendBuffer);


BOOL StatusCodeSuccess(DWORD dwStatus) {
	return (dwStatus == 200) || (dwStatus == 201) || (dwStatus == 202) || (dwStatus == 203) || (dwStatus == 204) || (dwStatus == 205);
}

BOOL StatusCodeRetriableError(DWORD dwStatus) {
	if (!gMachine->fUntrustedNetwork) {
		// on a trusted network we're willing to take chance that there could
		// be some other network error causing the status code to be incorrect.
		return ! StatusCodeSuccess(dwStatus);
	}

	// On an untrusted net it's possible a malicious app has pointed us off to
	// a non-existant on not setup machine, in which case we'll only retry on a 
	// certain set of errors.
	return (dwStatus == 408) || (dwStatus == 500) || (dwStatus == 502) || (dwStatus == 503) || (dwStatus == 504);
}


#define SRMP_INTERNET_TIMEOUT  (30*1000)  /* 30 seconds */

class CSrmpCallBack {
	friend void CALLBACK SrmpCallback(HINTERNET hInternet,DWORD_PTR dwContext,DWORD dwInternetStatus,LPVOID lpvStatusInformation,DWORD dwStatusInformationLength);
	HANDLE hEventComplete;
	HANDLE hEventClosed;

public:
	DWORD  dwResult;
	DWORD  dwError;

	CSrmpCallBack(void) {
		memset(this,0,sizeof(*this));
		dwResult = 500;
		hEventComplete = CreateEvent(NULL,FALSE,FALSE,NULL);
		hEventClosed   = CreateEvent(NULL,FALSE,FALSE,NULL);
	}

	BOOL IsInitialized(void) { return (hEventComplete && hEventClosed ) ? TRUE : FALSE; }

	BOOL Wait(DWORD dwTimeout) {
		return (WAIT_TIMEOUT != WaitForSingleObject(hEventComplete,dwTimeout));
	}

	void Failed(void) {
		// make sure we don't block forever on deinitializing + waiting for hEventClosed
		if (hEventClosed) {
			CloseHandle(hEventClosed);
			hEventClosed = 0;
		}
	}

	~CSrmpCallBack(void) {
		if (hEventClosed) {
			// make sure we get closed to make sure that our async caller isn't called after class is deallocated.
			WaitForSingleObject(hEventClosed,INFINITE);
			CloseHandle(hEventClosed);
		}

		if (hEventComplete)
			CloseHandle(hEventComplete);
	}
};

#if defined (InternetSetStatusCallback)
// Wininet defines this to InternetSetStatusCallback to (A|W) based on ANSI/UNICODE,
// httplite only exports InternetSetStatusCallback.
// 
#undef InternetSetStatusCallback
INTERNETAPI_(INTERNET_STATUS_CALLBACK) InternetSetStatusCallback(
																 IN HINTERNET hInternet,
																 IN INTERNET_STATUS_CALLBACK lpfnInternetCallback
																 );
#endif

int ScSession::InitializeWininet(CSrmpCallBack *pCallback) {
	DWORD          dwTimeout = SRMP_INTERNET_TIMEOUT;
	INTERNET_PORT  iPort     = (qType == SCFILE_QP_FORMAT_HTTPS) ? INTERNET_DEFAULT_HTTPS_PORT : INTERNET_DEFAULT_HTTP_PORT;

	if (NULL == (hInternetSession = InternetOpenA(NULL,INTERNET_OPEN_TYPE_PRECONFIG,NULL,NULL,INTERNET_FLAG_ASYNC))) {
		scerror_DebugOutM(VERBOSE_MASK_SRMP,(L"InternetOpen fails, GLE=0x%08x\r\n",GetLastError()));
		fSessionState = SCSESSION_STATE_INACTIVE;
		return FALSE;
	}

	InternetSetOption(hInternetSession, INTERNET_OPTION_RECEIVE_TIMEOUT, &dwTimeout, sizeof(dwTimeout));
	InternetSetOption(hInternetSession, INTERNET_OPTION_SEND_TIMEOUT, &dwTimeout, sizeof(dwTimeout));
	InternetSetOption(hInternetSession, INTERNET_OPTION_DATA_RECEIVE_TIMEOUT, &dwTimeout, sizeof(dwTimeout));
	InternetSetOption(hInternetSession, INTERNET_OPTION_DATA_SEND_TIMEOUT, &dwTimeout, sizeof(dwTimeout));
	InternetSetOption(hInternetSession, INTERNET_OPTION_CONNECT_TIMEOUT , &dwTimeout,sizeof(dwTimeout));

	if (NULL == (hInternetConnect = InternetConnectA(hInternetSession,lpszmbHostName,iPort,NULL,NULL,INTERNET_SERVICE_HTTP,0,(DWORD)pCallback))) {
		scerror_DebugOutM(VERBOSE_MASK_SRMP,(L"InternetConnect fails, GLE=0x%08x\r\n",GetLastError()));
		fSessionState = SCSESSION_STATE_INACTIVE;
		return FALSE;
	}

	if (INTERNET_INVALID_STATUS_CALLBACK == InternetSetStatusCallback(hInternetConnect,&SrmpCallback)) {
		scerror_DebugOutM(VERBOSE_MASK_SRMP,(L"InternetConnect fails, GLE=0x%08x\r\n",GetLastError()));
		fSessionState = SCSESSION_STATE_INACTIVE;
		return FALSE;
	}

	return TRUE;
}

// We'll try to send a message this many times before we end the session.
#define  SRMP_MAX_RETRIES             5
// Wait this many milliseconds on HTTP request recoverable failures.
#define  SRMP_RETRY_WAIT              (25*1000)

void ScSession::ServiceThreadHttpW(void) {
#if defined (SC_VERBOSE)
	scerror_DebugOut (VERBOSE_MASK_SESSION, L"Sending HTTP thread for %s\r\n", lpszHostName);
#endif

	int            iStatus   = 200;
	BOOL           fClearAttrTimer = FALSE;
	int            iRetries  = 0;

	fSessionState = SCSESSION_STATE_CONNECTING;

	CSrmpCallBack connectCallback;
	if (!connectCallback.IsInitialized()) {
#if defined (SC_VERBOSE)
		scerror_DebugOut(VERBOSE_MASK_SRMP,L"ServiceThreadHttpW cannot CreateEvent, GLE=0x%08x\r\n",GetLastError());
#endif
		return;
	}

	gMem->Lock();

	int uiSCSESSION_IDLE_TIMEOUT = gMachine->uiIdleTimeout * 1000;

	if (!InitializeWininet(&connectCallback)) {
		connectCallback.Failed();
		goto done;
	}

	// connectCallback.Wait(2000);

	fSessionState = SCSESSION_STATE_OPERATING;

	SetEvent(hEvent);	// Do the first scan...
	while (fSessionState == SCSESSION_STATE_OPERATING) {
		ScPacket *pPacket = NULL;
		ScQueue  *pQueue  = NULL;
		SVSUTIL_ASSERT (gMem->IsLocked ());

		gMem->Unlock();
		int iResp = WaitForSingleObject (hEvent, uiSCSESSION_IDLE_TIMEOUT);

		gMem->Lock ();
		if (fClearAttrTimer) {
			svsutil_ClearAttrTimer(gMem->pTimer,(SVSHandle)hEvent);
			fClearAttrTimer = FALSE;
		}
		
		if (iResp == WAIT_TIMEOUT)
			break;

		if (fSessionState == SCSESSION_STATE_EXITING) {
			// don't goto done; because another thread has initiated FinishSession()
			gMem->Unlock ();
			return; 
		}

		if (! ExtractNextPacket(pPacket, pQueue, pQueue))
			continue;

		// message to remote queue
		iStatus = BuildAndSendSRMP(pPacket,pQueue);
		SVSUTIL_ASSERT (gMem->IsLocked());

		if (StatusCodeRetriableError(iStatus)) {
			iRetries++;
			SVSUTIL_ASSERT(iRetries <= SRMP_MAX_RETRIES);

			if (iRetries == SRMP_MAX_RETRIES) {
				scerror_DebugOutM(VERBOSE_MASK_SRMP,(L"ServiceThreadHttpW cannot successfully send message, closing down session to try again later\r\n"));
				break;
			}
		
			// On server errors we assume it's temporary and that we can try again later.
			BOOL fSwallowEvent = (pSentPackets || pSentRelPackets) ? TRUE : FALSE;

			ReturnUnacketPacketsToQueues();
			// we have to do this because when packet is inserted back 
			// into the queue SetEvent(hEvent) is called.
			if (fSwallowEvent)
				WaitForSingleObject(hEvent,0);
				
			if (fSessionState == SCSESSION_STATE_OPERATING) {
				fClearAttrTimer = TRUE;
				svsutil_SetAttrTimer(gMem->pTimer,(SVSHandle)hEvent,SRMP_RETRY_WAIT);
			}
			continue;
		}
		iRetries = 0;

		// either we succeeded or we encountered an non-recoverable error, in either
		// event discard the packet and move onto next message in queue.

		// Dispose of packet.  We only queue up one at a time so check is trivial.
		if (pSentPackets) {
			svsutil_FreeFixed(pSentPackets, gMem->pAckNodeMem);
			pSentPackets = NULL;
		}
		else if (pSentRelPackets) {
			svsutil_FreeFixed(pSentRelPackets, gMem->pAckNodeMem);
			pSentRelPackets = NULL;
		}
		gQueueMan->JournalPacket(pPacket);
		pQueue->DisposeOfPacket(pPacket);
		SetEvent(hEvent); // immediatly try to send next packet.
	}

done:
	SVSUTIL_ASSERT(gMem->IsLocked());

	if (fClearAttrTimer)
		svsutil_ClearAttrTimer(gMem->pTimer,(SVSHandle)hEvent);

	FinishSession();
	fSessionState = SCSESSION_STATE_INACTIVE;
	gSessionMan->ReleaseSession (this);
	gMem->Unlock();
}

// put a preceding '/' in front of name if needed.
PSTR AllocURL(PCWSTR wszURL) {
	PSTR pszOut = 0;
	int iAddSlash = (wszURL[0] != '\\' && wszURL[0] != '/') ? 1 : 0;

	int  iOutLen = WideCharToMultiByte(CP_ACP, 0, wszURL, -1, 0, 0, 0, 0);
	if(!iOutLen)
		goto error;

	iOutLen += iAddSlash;

	pszOut = (PSTR)g_funcAlloc(iOutLen*sizeof(WCHAR), g_pvAllocData);
	if(!pszOut)
		goto error;


	if (iAddSlash) {
		pszOut[0] = '/';
	}

	if(WideCharToMultiByte(CP_ACP, 0, wszURL, -1, pszOut+iAddSlash, iOutLen-iAddSlash, 0, 0))
		return pszOut;

error:
	if (pszOut)
		g_funcFree(pszOut,g_pvFreeData);
	return FALSE;
}


int ScSession::BuildAndSendSRMP(ScPacket *pPacket, ScQueue *pQueue)  {
	CHAR            *szSoapEnv = NULL;
	DWORD           ccSoapEnv;
	int             iStatusCode = 503; // if we fail it's most likely because of malloc failure, we can try again
	CHAR            *szURL     = NULL;
	CHAR            *szTarget  = NULL;
	ScPacketImage   *pNewImage = NULL;
	SrmpIOCTLPacket ioPacket;
	CHAR szHeaders[2048];
	SVSSimpleBuffer cBodyBuf;

	// SendUP() stuff...
	SVSUTIL_ASSERT (gMem->IsLocked ());
	SVSUTIL_ASSERT (! pQueue->qp.bIsIncoming);

	scerror_DebugOutM (VERBOSE_MASK_SRMP, (L"Sending user packet %08x ptr %08x queue %s to %s\n", pPacket->uiMessageID, pPacket, pQueue->lpszFormatName, lpszHostName));

	if (pQueue->qp.bTransactional) {
		SVSUTIL_ASSERT (pPacket->iDirEntry >= 0);
		SVSUTIL_ASSERT (pPacket->pNode);

		SVSTNode *pPrevNode = pQueue->pPackets->Prev (pPacket->pNode);
		ScPacket *pPrev = pPrevNode ? (ScPacket *)SVSTree::GetData(pPrevNode) : NULL;

		pPacket->uiPacketState = SCPACKET_STATE_WAITORDERACK;
		pPacket->pImage->sect.pXactHeader->SetPrevSeqN (pPrev ? pPrev->uiSeqN : 0);
	} else {
		SentPacket *pSP = (SentPacket *)svsutil_GetFixed (gMem->pAckNodeMem);
		if (NULL == pSP) {
			scerror_DebugOutM (VERBOSE_MASK_SESSION, (L"No memory - failing session %s...\n", lpszHostName));
			return iStatusCode;
		}

		pSP->pPacket      = pPacket;

		++usPacketsSent;
		if (! usPacketsSent)
			++usPacketsSent;

		if (pPacket->iDirEntry >= 0) {
			++usRelPacketsSent;
			if (! usRelPacketsSent)
				++usRelPacketsSent;

			pSP->usNum = usRelPacketsSent;
			pSP->pNext = pSentRelPackets;

			pSentRelPackets = pSP;
		} else {
			pSP->usNum = usPacketsSent;
			pSP->pNext = pSentPackets;

			pSentPackets = pSP;
		}
	}

	unsigned int tTRQ = pPacket->pImage->sect.pBaseHeader->GetAbsoluteTimeToQueue ();
	unsigned int tBas = pPacket->pImage->sect.pUserHeader->GetSentTime ();
	unsigned int tNow = scutil_now ();

	if (tTRQ != INFINITE)
		pPacket->pImage->sect.pBaseHeader->SetAbsoluteTimeToQueue (tTRQ - (tNow - tBas));


	++gSessionMan->fBusy;
	pNewImage = scqman_DupImage(pPacket->pImage);

	pPacket->pImage->sect.pBaseHeader->SetAbsoluteTimeToQueue (tTRQ);
	pPacket->pImage->sect.pBaseHeader->IncludeSession (FALSE);

	pQueue->FreeFromMemory(pPacket);

	if (! pNewImage) {
		scerror_DebugOutM (VERBOSE_MASK_SESSION, (L"No memory - failing session %s...\n", lpszHostName));
		--gSessionMan->fBusy;
		return iStatusCode;
	}
	BOOL fSecure = IsSecure();

	// If we are to forward an SRMP message create and send it in ForwardSrmpMessage.
	if (pNewImage->sect.pUserHeader->SrmpIsIncluded()) {
		WCHAR *szQueueName;
		if (! gMachine->RouteLocalReverseLookup(pQueue->lpszQueueName,&szQueueName))
			szQueueName = pQueue->lpszQueueName;

		CSrmpFwd cSrmpFwd(this,szQueueName,pNewImage->sect.pFwdViaHeader ? (WCHAR*)pNewImage->sect.pFwdViaHeader->GetData() : L"");
		gMem->Unlock();

		if (!cSrmpFwd.IsInitailized())
			goto done;
		
		iStatusCode = ForwardSrmpMessage(&cSrmpFwd,pNewImage,pQueue->lpszQueueName);
		goto done;
	}

	gMem->Unlock();

	if (NULL == (szSoapEnv = BuildSoapEnvelope(pNewImage,&ccSoapEnv)))
		goto done;

	memset(&ioPacket,0,sizeof(ioPacket));
	ioPacket.pszHeaders = szHeaders;

	if (! BuildHttpHeadersAndMimeBody(pNewImage,szSoapEnv,ccSoapEnv,fSecure,&ioPacket,sizeof(szHeaders),cBodyBuf))
		goto done;

	if (NULL == (szURL = AllocURL(pQueue->lpszQueueName)))
		goto done;

	iStatusCode = SendHttpMsg(szURL,fSecure,&ioPacket); 
done:
	SVSUTIL_ASSERT(! gMem->IsLocked());

	if (szURL)
		g_funcFree(szURL,g_pvFreeData);

	if (szSoapEnv)
		g_funcFree(szSoapEnv,g_pvFreeData);

	if (pNewImage)
		g_funcFree(pNewImage,g_pvFreeData);
	
	gMem->Lock();
	--gSessionMan->fBusy;
	return iStatusCode;
}


CHAR *BuildSoapEnvelope(ScPacketImage *pImage, DWORD *pccSendBuffer) {
	SVSXMLEncode    xmlEncode(4096);
	CUserHeader     *pUserHeader       = pImage->sect.pUserHeader;
	CSoapSection    *pSoapBodySection  = pImage->sect.pSoapBodySection;
	DWORD           cbAlloc = 16384;
	CHAR            *szSendBuffer;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -