📄 ntp_iocompletionport.c
字号:
#ifdef HAVE_CONFIG_H# include <config.h>#endif#if defined (HAVE_IO_COMPLETION_PORT)#include <stddef.h>#include <stdio.h>#include <process.h>#include "ntp_stdlib.h"#include "syslog.h"#include "ntp_machine.h"#include "ntp_fp.h"#include "ntp.h"#include "ntpd.h"#include "ntp_refclock.h"#include "ntp_iocompletionport.h"#include "transmitbuff.h"#include "ntp_request.h"#include "ntp_io.h"/* * Request types */enum { SOCK_RECV, SOCK_SEND, CLOCK_READ, CLOCK_WRITE};typedef struct IoCompletionInfo { OVERLAPPED overlapped; int request_type; union { recvbuf_t *rbuf; transmitbuf_t *tbuf; } buff_space;} IoCompletionInfo;#define recv_buf buff_space.rbuf#define trans_buf buff_space.tbuf/* * local function definitions */static int QueueIORead( struct refclockio *, recvbuf_t *buff, IoCompletionInfo *lpo);static int OnSocketRecv(DWORD, IoCompletionInfo *, DWORD, int);static int OnIoReadComplete(DWORD, IoCompletionInfo *, DWORD, int);static int OnWriteComplete(DWORD, IoCompletionInfo *, DWORD, int);static HANDLE hHeapHandle = NULL;static HANDLE hIoCompletionPort = NULL;static HANDLE WaitableIoEventHandle = NULL;static HANDLE WaitableExitEventHandle = NULL;#define MAXHANDLES 3HANDLE WaitHandles[MAXHANDLES] = { NULL, NULL, NULL };#define USE_HEAPIoCompletionInfo *GetHeapAlloc(char *fromfunc){ IoCompletionInfo *lpo;#ifdef USE_HEAP lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY, sizeof(IoCompletionInfo));#else lpo = (IoCompletionInfo *) calloc(1, sizeof(IoCompletionInfo));#endif#ifdef DEBUG if (debug > 3) { printf("Allocation %d memory for %s, ptr %x\n", sizeof(IoCompletionInfo), fromfunc, lpo); }#endif return (lpo);}voidFreeHeap(IoCompletionInfo *lpo, char *fromfunc){#ifdef DEBUG if (debug > 3) { printf("Freeing memory for %s, ptr %x\n", fromfunc, lpo); }#endif#ifdef USE_HEAP HeapFree(hHeapHandle, 0, lpo);#else free(lpo);#endif}transmitbuf_t *get_trans_buf(){ transmitbuf_t *tb = calloc(sizeof(transmitbuf_t), 1); tb->wsabuf.len = 0; tb->wsabuf.buf = (char *) &tb->pkt; return (tb);}voidfree_trans_buf(transmitbuf_t *tb){ free(tb);}HANDLEget_io_event(){ return( WaitableIoEventHandle );}HANDLEget_exit_event(){ return( WaitableExitEventHandle );}/* This function will add an entry to the I/O completion port * that will signal the I/O thread to exit (gracefully) */static voidsignal_io_completion_port_exit(){ if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) { msyslog(LOG_ERR, "Can't request service thread to exit: %m"); exit(1); }}static voidiocompletionthread(void *NotUsed){ BOOL bSuccess = FALSE; int errstatus = 0; DWORD BytesTransferred = 0; DWORD Key = 0; IoCompletionInfo * lpo = NULL; /* Set the thread priority high enough so I/O will * preempt normal recv packet processing, but not * higher than the timer sync thread. */ if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL)) { msyslog(LOG_ERR, "Can't set thread priority: %m"); } while (TRUE) { bSuccess = GetQueuedCompletionStatus(hIoCompletionPort, &BytesTransferred, &Key, & (LPOVERLAPPED) lpo, INFINITE); if (lpo == NULL) {#ifdef DEBUG if (debug > 2) { printf("Overlapped IO Thread Exits: \n"); }#endif break; /* fail */ } /* * Deal with errors */ errstatus = 0; if (!bSuccess) { errstatus = GetLastError(); if (BytesTransferred == 0 && errstatus == WSA_OPERATION_ABORTED) {#ifdef DEBUG if (debug > 2) { printf("Transfer Operation aborted\n"); }#endif } else { msyslog(LOG_ERR, "Error transferring packet after %d bytes: %m", BytesTransferred); } } /* * Invoke the appropriate function based on * the value of the request_type */ switch(lpo->request_type) { case CLOCK_READ: OnIoReadComplete(Key, lpo, BytesTransferred, errstatus); break; case SOCK_RECV: OnSocketRecv(Key, lpo, BytesTransferred, errstatus); break; case SOCK_SEND: case CLOCK_WRITE: OnWriteComplete(Key, lpo, BytesTransferred, errstatus); break; default:#if DEBUG if (debug > 2) { printf("Unknown request type %d found in completion port\n", lpo->request_type); }#endif break; } }}/* Create/initialise the I/O creation port */extern voidinit_io_completion_port( void ){ /* * Create a handle to the Heap */ hHeapHandle = HeapCreate(0, 20*sizeof(IoCompletionInfo), 0); if (hHeapHandle == NULL) { msyslog(LOG_ERR, "Can't initialize Heap: %m"); exit(1); } /* Create the event used to signal an IO event */ WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, "WaitableIoEventHandle"); if (WaitableIoEventHandle == NULL) { msyslog(LOG_ERR, "Can't create I/O event handle: %m - another process may be running - EXITING"); exit(1); } /* Create the event used to signal an exit event */ WaitableExitEventHandle = CreateEvent(NULL, FALSE, FALSE, "WaitableExitEventHandle"); if (WaitableExitEventHandle == NULL) { msyslog(LOG_ERR, "Can't create exit event handle: %m - another process may be running - EXITING"); exit(1); } /* Create the IO completion port */ hIoCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (hIoCompletionPort == NULL) { msyslog(LOG_ERR, "Can't create I/O completion port: %m"); exit(1); } /* * Initialize the Wait Handles */ WaitHandles[0] = get_io_event(); WaitHandles[1] = get_exit_event(); /* exit request */ WaitHandles[2] = get_timer_handle(); /* Have one thread servicing I/O - there were 4, but this would * somehow cause NTP to stop replying to ntpq requests; TODO */ _beginthread(iocompletionthread, 0, NULL);} extern voiduninit_io_completion_port( void ){ if (hIoCompletionPort != NULL) { /* Get each of the service threads to exit */ signal_io_completion_port_exit(); }}static int QueueIORead( struct refclockio *rio, recvbuf_t *buff, IoCompletionInfo *lpo) { lpo->request_type = CLOCK_READ; lpo->recv_buf = buff; buff->fd = rio->fd; if (!ReadFile((HANDLE) buff->fd, buff->wsabuff.buf, buff->wsabuff.len, NULL, (LPOVERLAPPED) lpo)) { DWORD Result = GetLastError(); switch (Result) { case NO_ERROR : case ERROR_HANDLE_EOF : case ERROR_IO_PENDING : break ; /* * Something bad happened */ default: msyslog(LOG_ERR, "Can't read from Refclock: %m"); freerecvbuf(buff); return 0; } } return 1;}/* Return 1 on Successful Read */static int OnIoReadComplete(DWORD i, IoCompletionInfo *lpo, DWORD Bytes, int errstatus){ recvbuf_t *buff; recvbuf_t *newbuff; struct refclockio * rio = (struct refclockio *) i; l_fp arrival_time; get_systime(&arrival_time); /* * Get the recvbuf pointer from the overlapped buffer. */ buff = (recvbuf_t *) lpo->recv_buf; /* * Get a new recv buffer for the next packet */ newbuff = get_free_recv_buffer_alloc(); if (newbuff == NULL) { /* * recv buffers not available so we drop the packet * and reuse the buffer. */ newbuff = buff; } else { /* * ignore 0 bytes read due to timeout's and closure on fd */ if (Bytes > 0 && errstatus != WSA_OPERATION_ABORTED) { memcpy(&buff->recv_time, &arrival_time, sizeof(arrival_time)); buff->recv_length = (int) Bytes; buff->receiver = rio->clock_recv; buff->dstadr = NULL; buff->recv_srcclock = rio->srcclock; add_full_recv_buffer(buff); if( !SetEvent( WaitableIoEventHandle ) ) {#ifdef DEBUG if (debug > 3) { printf( "Error %d setting IoEventHandle\n", GetLastError() ); }#endif } } else { freerecvbuf(buff); } } QueueIORead( rio, newbuff, lpo ); return 1;}/* Add a reference clock data structures I/O handles to * the I/O completion port. Return 1 if any error. */ intio_completion_port_add_clock_io( struct refclockio *rio ){ IoCompletionInfo *lpo; recvbuf_t *buff; if (NULL == CreateIoCompletionPort((HANDLE) rio->fd, hIoCompletionPort, (DWORD) rio, 0)) { msyslog(LOG_ERR, "Can't add COM port to i/o completion port: %m"); return 1; } lpo = (IoCompletionInfo *) GetHeapAlloc("io_completion_port_add_clock_io"); if (lpo == NULL) { msyslog(LOG_ERR, "Can't allocate heap for completion port: %m"); return 1; } buff = get_free_recv_buffer_alloc(); if (buff == NULL) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -