📄 notify.cpp
字号:
it = m_listMsgQueues.begin();
it->init();
}
else
return NULL;
}
notification_sink sink(dwNotificationType, pwszUSN, pwszQueryString, *it, hOwner);
if(m_listSinks.push_front(sink))
return m_listSinks.begin()->getHandle();
else
return NULL;
}
/******************************************************************************
* notification_mgr::deregister_notification:
*
* - deregister notification_sinks associated with the given hSubsription
*
******************************************************************************/
void notification_mgr::deregister_notification(HANDLE hSubscription)
{
ce::gate<ce::critical_section> _gate(m_cs);
// can't assume that hSubscription is a valid iterator so I look for it in the list
for(ce::list<notification_sink>::iterator it = m_listSinks.begin(), itEnd = m_listSinks.end(); it != itEnd; ++it)
if(it->getHandle() == hSubscription)
{
m_listSinks.erase(it);
break;
}
}
/******************************************************************************
* notification_mgr::cleanup_notifications:
*
* - removes all notification_sinks created by hOwner from the manager
*
******************************************************************************/
void notification_mgr::cleanup_notifications(HANDLE hOwner)
{
ce::gate<ce::critical_section> _gate(m_cs);
// erase sink(s) created by the hOwner
for(ce::list<notification_sink>::iterator it = m_listSinks.begin(), itEnd = m_listSinks.end(); it != itEnd;)
if(it->getOwner() == hOwner)
m_listSinks.erase(it++);
else
++it;
// erase message queue(s) created by the hOwner
for(ce::list<msg_queue>::iterator it = m_listMsgQueues.begin(), itEnd = m_listMsgQueues.end(); it != itEnd;)
if(it->owner() == hOwner)
{
it->deinit();
m_listMsgQueues.erase(it++);
}
else
++it;
}
/******************************************************************************
* notification_sink::ctor
*
* - notification sink constructor
*
******************************************************************************/
const static int MAXIMUM_WATERMARK_LEVEL = 5;
notification_sink::notification_sink(DWORD dwType, LPCWSTR pwszUSN, LPCWSTR pwszQueryString, msg_queue& MsgQueue, HANDLE hOwner)
: m_dwType(dwType),
m_strUSN(pwszUSN),
m_strQueryString(pwszQueryString),
m_hOwner(hOwner),
m_MsgQueue(MsgQueue),
m_Watermark(Watermark(MAXIMUM_WATERMARK_LEVEL, 0))
{
}
/******************************************************************************
* notification_sink::event
*
* - performs an event notification
* - writes a upnp event to the message queue
*
******************************************************************************/
void notification_sink::event(LPCWSTR pwszMessage, DWORD dwEventSEQ)
{
ce::gate<ce::critical_section_with_copy> _gate(m_cs);
DWORD dwMsgSize = wcslen(pwszMessage) * sizeof(*pwszMessage);
event_msg_hdr hdr;
hdr.hSubscription = getHandle();
hdr.nt = NOTIFY_PROP_CHANGE;
hdr.nNumberOfBlocks = (dwMsgSize + MAX_MSG_SIZE - 1) / MAX_MSG_SIZE;
hdr.dwEventSEQ = dwEventSEQ;
ce::gate<msg_queue> _msg_gate(m_MsgQueue);
if(!m_MsgQueue.check_terminate_msg())
return;
// write header to message queue
if(WriteMsgQueue(m_MsgQueue, &hdr, sizeof(hdr), 0, 0))
{
int i;
// write event message body to message queue
for(i = 0; i < hdr.nNumberOfBlocks; ++i)
{
// message is unicode text so the length is even
Assert(dwMsgSize / 2 == (dwMsgSize + 1) / 2);
// MAX_MSG_SIZE is also even
Assert(MAX_MSG_SIZE / 2 == (MAX_MSG_SIZE + 1) / 2);
// ... this means there can't be a valid message of 1 byte - we use 1 byte message as terminator message
Assert(__min(MAX_MSG_SIZE, dwMsgSize - i * MAX_MSG_SIZE) != 1);
if(!WriteMsgQueue(m_MsgQueue, (char*)pwszMessage + i * MAX_MSG_SIZE, __min(MAX_MSG_SIZE, dwMsgSize - i * MAX_MSG_SIZE), 0, 0))
{
m_MsgQueue.write_terminate_msg();
break;
}
}
if(i == hdr.nNumberOfBlocks)
{
DebugMessageQueueSize(m_MsgQueue, __FUNCTION__);
return;
}
}
DebugMessageQueueSize(m_MsgQueue, __FUNCTION__);
TraceTag(ttidError, "%s: Error writing event message to queue. [%d]", __FUNCTION__, GetLastError());
}
/******************************************************************************
* notification_sink::byebye
*
* - performs a byebye notification
* - writes a byebye event to the message queue
*
******************************************************************************/
void notification_sink::byebye(PSSDP_REQUEST pSsdpRequest)
{
ce::gate<ce::critical_section_with_copy> _gate(m_cs);
event_msg_hdr hdr = {0};
ce::wstring strUSN;
hdr.hSubscription = getHandle();
hdr.nt = NOTIFY_BYEBYE;
hdr.nNumberOfBlocks = 1;
if(!ce::MultiByteToWideChar(CP_ACP, pSsdpRequest->Headers[SSDP_USN], -1, &strUSN))
{
TraceTag(ttidError, "%s: Invalid or missing USN header in byebye message", __FUNCTION__);
return;
}
ce::gate<msg_queue> _msg_gate(m_MsgQueue);
if(!m_MsgQueue.check_terminate_msg())
return;
if(WriteMsgQueue(m_MsgQueue, &hdr, sizeof(hdr), 0, 0))
{
if(WriteMsgQueue(m_MsgQueue, const_cast<LPWSTR>(static_cast<LPCWSTR>(strUSN)), strUSN.length() * sizeof(wchar_t), 0, 0))
{
DebugMessageQueueSize(m_MsgQueue, __FUNCTION__);
return;
}
m_MsgQueue.write_terminate_msg();
}
DebugMessageQueueSize(m_MsgQueue, __FUNCTION__);
TraceTag(ttidError, "%s: Error writing byebye message to queue. [%d]", __FUNCTION__, GetLastError());
}
/******************************************************************************
* notification_sink::alive
*
* - performs an alive notification
* - writes an alive event to the message queue
*
******************************************************************************/
void notification_sink::alive(PSSDP_REQUEST pSsdpRequest)
{
ce::gate<ce::critical_section_with_copy> _gate(m_cs);
event_msg_hdr hdr = {0};
ce::wstring strUSN;
ce::wstring strLocation;
ce::wstring strNLS;
hdr.hSubscription = getHandle();
hdr.nt = NOTIFY_ALIVE;
hdr.nNumberOfBlocks = 3;
if(!ce::MultiByteToWideChar(CP_ACP, pSsdpRequest->Headers[SSDP_USN], -1, &strUSN))
{
TraceTag(ttidError, "%s: Invalid or missing USN header in alive message", __FUNCTION__);
return;
}
if(!ce::MultiByteToWideChar(CP_ACP, pSsdpRequest->Headers[SSDP_LOCATION], -1, &strLocation))
{
TraceTag(ttidError, "%s: Invalid or missing LOCATION header in alive message", __FUNCTION__);
return;
}
if(!ce::MultiByteToWideChar(CP_ACP, pSsdpRequest->Headers[SSDP_NLS], -1, &strNLS))
{
hdr.nNumberOfBlocks = 2;
}
if(1 != sscanf(pSsdpRequest->Headers[SSDP_CACHECONTROL], "max-age=%d", &hdr.dwLifeTime))
{
TraceTag(ttidError, "%s: Invalid or missing CACHE-CONTROL header in alive message", __FUNCTION__);
return;
}
ce::gate<msg_queue> _msg_gate(m_MsgQueue);
if(!m_MsgQueue.check_terminate_msg())
return;
if(WriteMsgQueue(m_MsgQueue, &hdr, sizeof(hdr), 0, 0))
{
if(WriteMsgQueue(m_MsgQueue, const_cast<LPWSTR>(static_cast<LPCWSTR>(strUSN)), strUSN.length() * sizeof(wchar_t), 0, 0))
if(WriteMsgQueue(m_MsgQueue, const_cast<LPWSTR>(static_cast<LPCWSTR>(strLocation)), strLocation.length() * sizeof(wchar_t), 0, 0))
if(hdr.nNumberOfBlocks == 2 || WriteMsgQueue(m_MsgQueue, const_cast<LPWSTR>(static_cast<LPCWSTR>(strNLS)), strNLS.length() * sizeof(wchar_t), 0, 0))
{
DebugMessageQueueSize(m_MsgQueue, __FUNCTION__);
return;
}
m_MsgQueue.write_terminate_msg();
}
DebugMessageQueueSize(m_MsgQueue, __FUNCTION__);
TraceTag(ttidError, "%s: Error writing alive message to queue. [%d]", __FUNCTION__, GetLastError());
}
/******************************************************************************
* DebugMessageQueueSize:
*
* - outputs the debug message queue size
*
******************************************************************************/
void DebugMessageQueueSize(msg_queue msgQueue, const char* function)
{
#ifdef DEBUG
MSGQUEUEINFO msg_info;
msg_info.dwSize = sizeof(MSGQUEUEINFO);
if(GetMsgQueueInfo(msgQueue, &msg_info))
{
TraceTag(ttidTrace, "%s: msg queue size: %d", function, msg_info.dwCurrentMessages);
}
#endif
}
/******************************************************************************
* init:
*
* - initialize the message queue
*
******************************************************************************/
void msg_queue::init()
{
MSGQUEUEOPTIONS options = {0};
//
// before checking if msgqueque with this name already exists; because of this
// cbMaxMessage can't be 0
//
options.cbMaxMessage = 1;
options.dwSize = sizeof(MSGQUEUEOPTIONS);
// create message queue
m_handle = CreateMsgQueue(m_strName, &options);
#ifdef DEBUG
DWORD errCode;
Assert((errCode = GetLastError()) == ERROR_ALREADY_EXISTS);
if(errCode != ERROR_ALREADY_EXISTS) TraceTag(ttidError, "CreateMsgQueue failed not already exists [%d]", errCode);
Assert(m_handle.valid());
#endif
}
/******************************************************************************
* deinit:
*
* - ensure that the message queue is closed
*
******************************************************************************/
void msg_queue::deinit()
{
CloseMsgQueue(m_handle);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -