📄 xorprtm.c
字号:
* Clear event state to avoid spurious signals.
*/
ResetEvent(pp->cevent);
result = RegisterWaitForSingleObject(&(pp->cwait), pp->cevent,
pipe_connect_cb, pp, INFINITE,
WT_EXECUTEINIOTHREAD |
WT_EXECUTEONLYONCE);
if (result == 0) {
result = GetLastError();
TRACE1(CONFIGURATION, "Error %u RegisterWaitForSingleObject()", result);
goto fail;
}
/*
* Register a pool thread to wait for data to be received on the pipe.
* We don't cause this to be activated until we post a read request
* from within the connection callback.
* XXX: We want the read callback to be called whenever the
* object is signalled, not just once.
*/
ResetEvent(pp->revent);
result = RegisterWaitForSingleObject(&(pp->rwait), pp->revent,
pipe_read_cb, pp, INFINITE,
WT_EXECUTEINIOTHREAD |
WT_EXECUTEONLYONCE);
if (result == 0) {
result = GetLastError();
TRACE1(CONFIGURATION, "Error %u RegisterWaitForSingleObject()", result);
goto fail;
}
/*
* Post the connection request. If it returns non-zero, then the
* connection attempt is pending and the thread will be signalled
* when complete. If it returns zero, then there's a problem.
* ERROR_NO_DATA means the client disconnected, but we didn't
* call DisconnectNamedPipe().
* ConnectNamedPipe() does not reset the event object associated
* with the OVERLAPPED parameter.
*/
result = ConnectNamedPipe(pp->pipe, &pp->cov);
if (result == 0) {
result = GetLastError();
if (result == ERROR_PIPE_LISTENING) {
TRACE0(NETWORK, "Error: listening; Reconnecting named pipe");
result = ConnectNamedPipe(pp->pipe, &pp->cov);
}
if (result == ERROR_PIPE_CONNECTED) {
TRACE0(NETWORK, "Error: named pipe already connected");
goto fail;
}
if (result == ERROR_NO_DATA) {
TRACE0(NETWORK, "Error: previous session not cleaned up");
goto fail;
}
}
pp->state = PIPE_STATE_LISTEN;
retval = 0;
fail:
if (retval == -1) {
if (pp->cwait != NULL) {
UnregisterWaitEx(pp->cwait, pp->cevent);
ResetEvent(pp->cevent);
pp->cwait = NULL;
}
if (pp->rwait != NULL) {
UnregisterWaitEx(pp->rwait, pp->revent);
ResetEvent(pp->revent);
pp->rwait = NULL;
}
}
TRACE1(ENTER, "Leaving pipe_listen", pp);
return (retval);
}
/*
* Disconnect, but do not close, a pipe handle; and deregister
* any pending waiter threads from its event handles.
*
* XXX: This must be called from primary thread, or lock held if not!
*/
void
pipe_disconnect(pipe_instance_t *pp)
{
TRACE0(ENTER, "Entering pipe_disconnect");
if (pp == NULL)
return;
/*
* Cancel pending I/O before deregistering the callback,
* and disconnect the pipe, to avoid race conditions.
* We also reset the event(s) to avoid being signalled for
* things which haven't actually happened yet.
*
* XXX: To avoid races during shutdown, we may have to
* NULL out the second argument to UnregisterWaitEx().
* We can't, however, do that from a service thread.
*/
if (pp->cwait != NULL) {
UnregisterWaitEx(pp->cwait, pp->cevent);
ResetEvent(pp->cevent);
pp->cwait = NULL;
}
if (pp->rwait != NULL) {
UnregisterWaitEx(pp->rwait, pp->revent);
ResetEvent(pp->revent);
pp->rwait = NULL;
}
if (pp->pipe != NULL) {
CancelIo(pp->pipe);
if (pp->state == PIPE_STATE_CONNECTED ||
pp->state == PIPE_STATE_LISTEN) {
DisconnectNamedPipe(pp->pipe);
}
}
pp->state = PIPE_STATE_INIT;
TRACE0(ENTER, "Leaving pipe_disconnect");
}
void
pipe_destroy(pipe_instance_t *pp)
{
TRACE0(ENTER, "Leaving pipe_destroy");
if (pp == NULL)
return;
pipe_disconnect(pp);
if (pp->revent != NULL) {
CloseHandle(pp->revent);
pp->rov.hEvent = pp->revent = NULL;
}
if (pp->cevent != NULL) {
CloseHandle(pp->cevent);
pp->cov.hEvent = pp->cevent = NULL;
}
if (pp->pipe != NULL) {
CloseHandle(pp->pipe);
pp->pipe = NULL;
}
DeleteCriticalSection(&pp->rcs);
free(pp);
TRACE0(ENTER, "Leaving pipe_destroy");
}
void CALLBACK
pipe_connect_cb(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
{
pipe_instance_t *pp;
DWORD result;
DWORD nbytes;
pp = (pipe_instance_t *)lpParameter;
EnterCriticalSection(&pp->rcs);
TRACE1(ENTER, "Entering pipe_connect_cb %p", lpParameter);
/* XXX CHECK STATE */
if (pp->state != PIPE_STATE_LISTEN) {
TRACE0(NETWORK, "WARNING: pipe state is not LISTEN");
}
/*
* If you forgot to reset the event object, the following call
* will block until the connection is actually made. We wish to
* run as lockless as possible, so do not block the service thread.
*/
/*
result = GetOverlappedResult(pp->pipe, &pp->cov, &nbytes, TRUE);
*/
pp->state = PIPE_STATE_CONNECTED;
/*
* Post an overlapped read request to capture a message from
* the client.
*/
result = ReadFile(pp->pipe, pp->rbuf, pp->rsize, NULL, &pp->rov);
if (result == 0) {
result = GetLastError();
if (result != ERROR_IO_PENDING) {
TRACE1(ANY, "WARNING: pipe_connect_cb read returned %d", result);
}
}
/* XXX: We need to be able to deal with errors immediately to
* avoid races. */
TRACE0(ENTER, "Leaving pipe_connect_cb");
LeaveCriticalSection(&pp->rcs);
}
/*
* Callback which invokes pipe_listen().
*
* When we are in pipe_read_cb(), we may try to call pipe_listen()
* (after tearing down an old connection). This can cause an infinite
* loop as they execute in the same helper thread, and pipe_listen()
* will try to reschedule pipe_read_cb().
* Therefore, use QueueUserWorkItem() to make sure that pipe_listen()
* is invoked after a context switch.
*/
void WINAPI
pipe_relisten_cb(void *ctx)
{
pipe_instance_t *pp;
pp = (pipe_instance_t *)ctx;
EnterCriticalSection(&pp->rcs);
TRACE1(ENTER, "Entering pipe_relisten_cb %p", ctx);
pipe_disconnect(pp);
pipe_listen(pp);
TRACE0(ENTER, "Leaving pipe_relisten_cb");
LeaveCriticalSection(&pp->rcs);
}
void WINAPI
pipe_reread_cb(void *ctx)
{
pipe_instance_t *pp;
DWORD result;
int failed;
pp = (pipe_instance_t *)ctx;
EnterCriticalSection(&pp->rcs);
TRACE1(ENTER, "Entering pipe_reread_cb %p", ctx);
failed = 0;
if (pp->state != PIPE_STATE_CONNECTED) {
TRACE0(NETWORK, "WARNING: not PIPE_STATE_CONNECTED");
}
/*
* Tear down and wire up read thread callback again.
* This is probably inefficient.
*/
UnregisterWaitEx(pp->rwait, pp->revent);
ResetEvent(pp->revent); /* XXX ReadFile() should do this for us? */
pp->rwait = NULL;
/*
* Post a new read request. Deal with fatal errors.
*/
result = ReadFile(pp->pipe, pp->rbuf, pp->rsize, NULL, &pp->rov);
if (result == 0) {
result = GetLastError();
if (result != ERROR_IO_PENDING) {
TRACE1(ANY, "WARNING: pipe_reread_cb read returned %d", result);
}
if (result == ERROR_BROKEN_PIPE) {
failed = 1;
goto fail;
}
}
/*
* Now, and only now, do we kick off the read thread, in order
* to avoid being preempted if the client disconnects.
*/
result = RegisterWaitForSingleObject(&(pp->rwait), pp->revent,
pipe_read_cb, pp, INFINITE,
WT_EXECUTEINIOTHREAD |
WT_EXECUTEONLYONCE);
if (result == 0) {
result = GetLastError();
TRACE1(CONFIGURATION, "Error %u RegisterWaitForSingleObject()", result);
failed = 1;
}
fail:
/*
* If a fatal error occurred, disconnect the pipe client, and
* listen for a new connection on this instance.
*/
if (failed) {
ResetEvent(pp->revent);
QueueUserWorkItem(
(LPTHREAD_START_ROUTINE)pipe_relisten_cb, (PVOID)pp, WT_EXECUTEINIOTHREAD);
}
out:
TRACE0(ENTER, "Leaving pipe_reread_cb");
LeaveCriticalSection(&pp->rcs);
}
void CALLBACK
pipe_read_cb(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
{
struct rt_msghdr *rtm;
pipe_instance_t *pp;
DWORD result;
DWORD nbytes;
pp = (pipe_instance_t *)lpParameter;
EnterCriticalSection(&pp->rcs);
TRACE1(ENTER, "Entering pipe_read_cb %p", lpParameter);
if (pp->state != PIPE_STATE_CONNECTED) {
TRACE0(NETWORK, "WARNING: not PIPE_STATE_CONNECTED, bailing.");
/*
* XXX: Is something racy, or is it just me?
* Try to avoid deadlocking by returning if we
* got called when we weren't connected.
*/
goto out;
}
result = GetOverlappedResult(pp->pipe, &pp->rov, &nbytes, TRUE);
if (result == 0) {
result = GetLastError();
TRACE1(NETWORK, "WARNING: pipe_read_cb read returned %d", result);
if (result == ERROR_BROKEN_PIPE) {
/*
* We must queue the new listen on a separate thread to
* avoid infinite recursion.
*/
TRACE0(NETWORK, "Posting listen again.");
ResetEvent(pp->revent);
QueueUserWorkItem(
(LPTHREAD_START_ROUTINE)pipe_relisten_cb, (PVOID)pp, WT_EXECUTEINIOTHREAD);
goto out;
}
}
TRACE1(NETWORK, "Read %d bytes from named pipe.", nbytes);
/*
* Perform sanity checks on input message.
* XXX: We should use a more appropriate errno value.
* We use -1 as ENOBUFS, etc are not part of the namespace.
*/
rtm = (struct rt_msghdr *)&pp->rbuf[0];
if (rtm->rtm_version != RTM_VERSION) {
TRACE1(NETWORK, "Invalid rtm_version %d, dropping.", rtm->rtm_version);
goto drop;
}
/*
* Sanity check size.
*/
if (rtm->rtm_msglen > nbytes ||
nbytes < sizeof(struct rt_msghdr)) {
TRACE1(NETWORK, "Invalid rtm_msglen %d, dropping.", rtm->rtm_msglen);
rtm->rtm_errno = -1;
goto drop;
}
if (rtm->rtm_pid == 0) {
TRACE1(NETWORK, "Invalid rtm_pid %d, dropping.", rtm->rtm_pid);
rtm->rtm_errno = -1;
goto bounce;
}
switch (rtm->rtm_type) {
case RTM_ADD:
result = rtm_add_route(rtm, nbytes);
if (result == 0) {
TRACE0(NETWORK, "route added successfully");
} else {
TRACE0(NETWORK, "failed to add route");
}
rtm->rtm_errno = result;
break;
case RTM_DELETE:
result = rtm_delete_route(rtm, nbytes);
if (result == 0) {
TRACE0(NETWORK, "route deleted successfully");
} else {
TRACE0(NETWORK, "failed to delete route");
}
rtm->rtm_errno = result;
break;
default:
TRACE1(NETWORK, "Invalid rtm_type %d, dropping.", rtm->rtm_type);
rtm->rtm_errno = -1;
break;
}
bounce:
/*
* There is currently no analogue of the BSD SO_LOOPBACK option.
* XXX: Normally processes will hear their own messages echoed across
* the routing socket emulation pipe. Because the broadcast technique
* uses blocking NT I/O, processes must read back their own message
* after issuing it.
*/
broadcast_pipe_message(pp->rbuf, nbytes);
drop:
TRACE0(NETWORK, "Posting read again.");
ResetEvent(pp->revent);
QueueUserWorkItem(
(LPTHREAD_START_ROUTINE)pipe_reread_cb, (PVOID)pp, WT_EXECUTEINIOTHREAD);
out:
TRACE0(ENTER, "Leaving pipe_read_cb");
LeaveCriticalSection(&pp->rcs);
}
static
VOID
FreeEventEntry (
PQUEUE_ENTRY pqeEntry)
{
EE_Destroy(CONTAINING_RECORD(pqeEntry, EVENT_ENTRY, qeEventQueueLink));
}
DWORD
EE_Create (
ROUTING_PROTOCOL_EVENTS rpeEvent,
MESSAGE mMessage,
PEVENT_ENTRY *ppeeEventEntry)
{
DWORD dwErr = NO_ERROR;
PEVENT_ENTRY peeEntry; /* scratch */
/* validate parameters */
if (!ppeeEventEntry)
return ERROR_INVALID_PARAMETER;
*ppeeEventEntry = NULL;
/* allocate the interface entry structure */
MALLOC(&peeEntry, sizeof(EVENT_ENTRY), &dwErr);
if (dwErr != NO_ERROR)
return dwErr;
/* initialize various fields */
InitializeQueueHead(&(peeEntry->qeEventQueueLink));
peeEntry->rpeEvent = rpeEvent;
peeEntry->mMessage = mMessage;
*ppeeEventEntry = peeEntry;
return dwErr;
}
DWORD
EE_Destroy (
PEVENT_ENTRY peeEventEntry)
{
if (!peeEventEntry)
return NO_ERROR;
FREE(peeEventEntry);
return NO_ERROR;
}
DWORD
EnqueueEvent(
ROUTING_PROTOCOL_EVENTS rpeEvent,
MESSAGE mMessage)
{
DWORD dwErr = NO_ERROR;
PEVENT_ENTRY peeEntry = NULL;
dwErr = EE_Create(rpeEvent, mMessage, &peeEntry);
/* destroyed in EE_DequeueEvent */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -