⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ntp_iocompletionport.c

📁 一个开源的NTP服务器客户端实现代码
💻 C
字号:
#ifdef HAVE_CONFIG_H# include <config.h>#endif#if defined (HAVE_IO_COMPLETION_PORT)#include <stddef.h>#include <stdio.h>#include <process.h>#include "ntp_stdlib.h"#include "syslog.h"#include "ntp_machine.h"#include "ntp_fp.h"#include "ntp.h"#include "ntpd.h"#include "ntp_refclock.h"#include "ntp_iocompletionport.h"#include "transmitbuff.h"/* * Request types */enum {	SOCK_RECV,	SOCK_SEND,	CLOCK_READ,	CLOCK_WRITE};typedef struct IoCompletionInfo {	OVERLAPPED			overlapped;	int				request_type;	recvbuf_t			*buff;} IoCompletionInfo;/* * local function definitions */static int QueueIORead( struct refclockio *, recvbuf_t *buff, IoCompletionInfo *lpo);static int OnSocketRecv(DWORD, IoCompletionInfo *, DWORD);static int OnIoReadComplete(DWORD, IoCompletionInfo *, DWORD);static int OnWriteComplete(DWORD, IoCompletionInfo *, DWORD);#define BUFCHECK_SECS	10static void	TransmitCheckThread(void *NotUsed);static HANDLE hHeapHandle = NULL;static HANDLE hIoCompletionPort = NULL;static HANDLE WaitableIoEventHandle = NULL;#define MAXHANDLES 3HANDLE WaitHandles[MAXHANDLES] = { NULL, NULL, NULL };IoCompletionInfo *GetHeapAlloc(char *fromfunc){	IoCompletionInfo *lpo;	lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,			     HEAP_ZERO_MEMORY,			     sizeof(IoCompletionInfo));//	lpo = (IoCompletionInfo *) calloc(1, sizeof(IoCompletionInfo));#ifdef DEBUG	if (debug > 1) {		printf("Allocation %d memory for %s, ptr %x\n", sizeof(IoCompletionInfo), fromfunc, lpo);	}#endif	return (lpo);}voidFreeHeap(IoCompletionInfo *lpo, char *fromfunc){#ifdef DEBUG	if (debug > 1)	{		printf("Freeing memory for %s, ptr %x\n", fromfunc, lpo);	}#endif	HeapFree(hHeapHandle, 0, lpo);//	free(lpo);}HANDLEget_io_event(){	return( WaitableIoEventHandle );}/*  This function will add an entry to the I/O completion port *  that will signal the I/O thread to exit (gracefully) */static voidsignal_io_completion_port_exit(){	if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) {		msyslog(LOG_ERR, "Can't request service thread to exit: %m");		exit(1);	}}static voidiocompletionthread(void *NotUsed){	BOOL bSuccess = FALSE;	int errstatus;	DWORD BytesTransferred = 0;	DWORD Key = 0;	IoCompletionInfo * lpo = NULL;	/*	Set the thread priority high enough so I/O will	 *	preempt normal recv packet processing, but not	 * 	higher than the timer sync thread.	 */	if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL)) {		msyslog(LOG_ERR, "Can't set thread priority: %m");	}	while (TRUE) {		bSuccess = GetQueuedCompletionStatus(hIoCompletionPort, 					  &BytesTransferred, 					  &Key, 					  & (LPOVERLAPPED) lpo, 					  INFINITE);		if (lpo == NULL)		{#ifdef DEBUG			msyslog(LOG_INFO, "Overlapped IO Thread Exits: %m");	#endif			break; /* fail */		}		/*		 * Deal with errors		 */		if (!bSuccess)		{			errstatus = GetLastError();			if (BytesTransferred == 0 && errstatus == WSA_OPERATION_ABORTED)			{				msyslog(LOG_ERR, "Transfer Operation aborted: %m");			}			else			{				msyslog(LOG_ERR, "Error transferring packet after %d bytes: %m", BytesTransferred);			}		}		/*		 * Invoke the appropriate function based on		 * the value of the request_type		 */		switch(lpo->request_type)		{		case CLOCK_READ:			OnIoReadComplete(Key, lpo, BytesTransferred);			break;		case SOCK_RECV:			OnSocketRecv(Key, lpo, BytesTransferred);			break;		case SOCK_SEND:		case CLOCK_WRITE:			OnWriteComplete(Key, lpo, BytesTransferred);			break;		default:#if DEBUG			if (debug > 3) {				printf("Unknown request type %d found in completion port\n",					lpo->request_type);			}#endif			break;		}	}}/*  Create/initialise the I/O creation port */extern voidinit_io_completion_port(	void	){	/*	 * Create a handle to the Heap	 */	hHeapHandle = HeapCreate(0, 20*sizeof(IoCompletionInfo), 0);	if (hHeapHandle == NULL)	{		msyslog(LOG_ERR, "Can't initialize Heap: %m");		exit(1);	}	/* Create the event used to signal an IO event	 */	WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, "WaitableIoEventHandle");	if (WaitableIoEventHandle == NULL) {		msyslog(LOG_ERR, "Can't create I/O event handle: %m");		exit(1);	}	/* Create the IO completion port	 */	hIoCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);	if (hIoCompletionPort == NULL) {		msyslog(LOG_ERR, "Can't create I/O completion port: %m");		exit(1);	}		/*	 * Initialize the Wait Handles	 */	WaitHandles[0] = CreateEvent(NULL, FALSE, FALSE, "WaitHandles0"); /* exit request */	WaitHandles[1] = get_timer_handle();	WaitHandles[2] = get_io_event();	/* Have one thread servicing I/O - there were 4, but this would 	 * somehow cause NTP to stop replying to ntpq requests; TODO 	 */	_beginthread(iocompletionthread, 0, NULL);}	extern voiduninit_io_completion_port(	void	){	if (hIoCompletionPort != NULL) {		/*  Get each of the service threads to exit		*/		signal_io_completion_port_exit();	}}static int QueueIORead( struct refclockio *rio, recvbuf_t *buff, IoCompletionInfo *lpo) {	memset(lpo, 0, sizeof(IoCompletionInfo));	memset(buff, 0, sizeof(recvbuf_t));	lpo->request_type = CLOCK_READ;	lpo->buff = buff;	buff->fd = rio->fd;	if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, (LPOVERLAPPED) lpo)) {		DWORD Result = GetLastError();		switch (Result) {						case NO_ERROR :		case ERROR_HANDLE_EOF :		case ERROR_IO_PENDING :			break ;		/*		 * Something bad happened		 */		default:			msyslog(LOG_ERR, "Can't read from Refclock: %m");			freerecvbuf(buff);			return 0;		}	}	return 1;}/* Return 1 on Successful Read */static int OnIoReadComplete(DWORD i, IoCompletionInfo *lpo, DWORD Bytes){	recvbuf_t *buff;	recvbuf_t *newbuff;	struct refclockio * rio = (struct refclockio *) i;	/*	 * Get the recvbuf pointer from the overlapped buffer.	 */	buff = (recvbuf_t *) lpo->buff;	/*	 * Get a new recv buffer for the next packet	 */	newbuff = get_free_recv_buffer();	if (newbuff == NULL) {		/*		 * recv buffers not available so we drop the packet		 * and reuse the buffer.		 */		newbuff = buff;	}	else 	{		if (Bytes > 0) { /* ignore 0 bytes read due to timeout's */			get_systime(&buff->recv_time);			buff->recv_length = (int) Bytes;			buff->receiver = rio->clock_recv;			buff->dstadr = NULL;			buff->recv_srcclock = rio->srcclock;			add_full_recv_buffer(buff);		}		else		{			freerecvbuf(buff);		}	}	if( !SetEvent( WaitableIoEventHandle ) ) {#ifdef DEBUG		if (debug > 3) {			printf( "Error %d setting IoEventHandle\n", GetLastError() );		}#endif	}	QueueIORead( rio, newbuff, lpo );	return 1;}/*  Add a reference clock data structures I/O handles to *  the I/O completion port. Return 1 if any error. */  intio_completion_port_add_clock_io(	struct refclockio *rio	){	IoCompletionInfo *lpo;	recvbuf_t *buff;	if (NULL == CreateIoCompletionPort((HANDLE) rio->fd, hIoCompletionPort, (DWORD) rio, 0)) {		msyslog(LOG_ERR, "Can't add COM port to i/o completion port: %m");		return 1;	}	lpo = (IoCompletionInfo *) GetHeapAlloc("io_completion_port_add_clock_io");	if (lpo == NULL)	{		msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");		return 1;	}	buff = get_free_recv_buffer();	if (buff == NULL)	{		msyslog(LOG_ERR, "Can't allocate memory for clock socket: %m");		FreeHeap(lpo, "io_completion_port_add_clock_io");		return 1;	}	QueueIORead( rio, buff, lpo );	return 0;}/* Queue a receiver on a socket. Returns 0 if no buffer can be queued */static unsigned long QueueSocketRecv(SOCKET s, recvbuf_t *buff, IoCompletionInfo *lpo) {		int AddrLen;	lpo->request_type = SOCK_RECV;	lpo->buff = buff;	if (buff != NULL) {		DWORD BytesReceived = 0;		DWORD Flags = 0;		buff->fd = s;		AddrLen = sizeof(struct sockaddr_in);		if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1, 						&BytesReceived, &Flags, 						(struct sockaddr *) &buff->recv_srcadr, (LPINT) &AddrLen, 						(LPOVERLAPPED) lpo, NULL)) {			DWORD Result = WSAGetLastError();			switch (Result) {				case NO_ERROR :				case WSA_IO_INCOMPLETE :				case WSA_WAIT_IO_COMPLETION :				case WSA_IO_PENDING :					break ;				case WSAENOTSOCK :					netsyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m");					/* return the buffer */					freerecvbuf(buff);					return 0;					break;				case WSAEFAULT :					netsyslog(LOG_ERR, "The buffers parameter is incorrect: %m");					/* return the buffer */					freerecvbuf(buff);					return 0;				break;				default :				  /* nop */ ;			}		}	}	else 		return 0;	return 1;}/* Returns 0 if any Error */static int OnSocketRecv(DWORD i, IoCompletionInfo *lpo, DWORD Bytes){	struct recvbuf *buff = NULL;	recvbuf_t *newbuff;	struct interface * inter = (struct interface *) i;		/*  Convert the overlapped pointer back to a recvbuf pointer.	*/		buff = (struct recvbuf *) lpo->buff;	get_systime(&buff->recv_time);		/*	 * Get a new recv buffer for the next packet	 */	newbuff = get_free_recv_buffer();	if (newbuff == NULL) {		/*		 * recv buffers not available so we drop the packet		 * and reuse the buffer.		 */		newbuff = buff;	}	else 	{		if (Bytes > 0 && inter->ignore_packets == ISC_FALSE) {				buff->recv_length = (int) Bytes;			buff->receiver = receive; 			buff->dstadr = inter;#ifdef DEBUG			if (debug > 3)  				printf("Received %d bytes from %s\n", Bytes, stoa(&buff->recv_srcadr));#endif			add_full_recv_buffer(buff);		}		else {			freerecvbuf(buff);		}	}	if( !SetEvent( WaitableIoEventHandle ) ) {#ifdef DEBUG		if (debug > 3) {			printf( "Error %d setting IoEventHandle\n", GetLastError() );		}#endif	}	QueueSocketRecv(inter->fd, newbuff, lpo);	return 1;}/*  Add a socket handle to the I/O completion port, and send an I/O *  read request to the kernel. * *  Note: As per the winsock documentation, we use WSARecvFrom. Using *        ReadFile() is less efficient. */extern intio_completion_port_add_socket(SOCKET fd, struct interface *inter){	IoCompletionInfo *lpo;	recvbuf_t *buff;	if (fd != INVALID_SOCKET) {		if (NULL == CreateIoCompletionPort((HANDLE) fd, hIoCompletionPort,						   (DWORD) inter, 0)) {			msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m");			return 1;		}	}	lpo = (IoCompletionInfo *) GetHeapAlloc("io_completion_port_add_socket");	if (lpo == NULL)	{		msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");		return 1;	}	buff = get_free_recv_buffer();	if (buff == NULL)	{		msyslog(LOG_ERR, "Can't allocate memory for network socket: %m");		FreeHeap(lpo, "io_completion_port_add_socket");		return 1;	}	QueueSocketRecv(fd, buff, lpo);	return 0;}static int OnWriteComplete(DWORD Key, IoCompletionInfo *lpo, DWORD Bytes){	transmitbuf_t *buff;	(void) Bytes;	(void) Key;	buff = (transmitbuf_t *) lpo->buff;	free_transmit_buffer(buff);	FreeHeap(lpo, "OnWriteComplete");	return 1;}DWORD	io_completion_port_sendto(	struct interface *inter,		struct pkt *pkt,		int len, 	struct sockaddr_storage* dest){	transmitbuf_t *buff = NULL;	DWORD Result = ERROR_SUCCESS;	int errval;	int AddrLen;	IoCompletionInfo *lpo;	DWORD BytesSent = 0;	DWORD Flags = 0;	lpo = (IoCompletionInfo *) GetHeapAlloc("io_completion_port_sendto");	if (lpo == NULL)		return ERROR_OUTOFMEMORY;	if (len <= sizeof(buff->pkt)) {		buff = get_free_transmit_buffer();		if (buff == NULL) {			msyslog(LOG_ERR, "No more transmit buffers left - data discarded");			FreeHeap(lpo, "io_completion_port_sendto");			return ERROR_OUTOFMEMORY;		}		memcpy(&buff->pkt, pkt, len);		buff->wsabuf.buf = buff->pkt;		buff->wsabuf.len = len;		AddrLen = sizeof(struct sockaddr_in);		lpo->request_type = SOCK_SEND;		lpo->buff = (recvbuf_t *) buff;		Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, AddrLen, (LPOVERLAPPED) lpo, NULL);		if(Result == SOCKET_ERROR)		{			errval = WSAGetLastError();			switch (errval) {			case NO_ERROR :			case WSA_IO_INCOMPLETE :			case WSA_WAIT_IO_COMPLETION :			case WSA_IO_PENDING :				Result = ERROR_SUCCESS;				break ;			/*			 * Something bad happened			 */			default :				netsyslog(LOG_ERR, "WSASendTo - error sending message: %m");				free_transmit_buffer(buff);				FreeHeap(lpo, "io_completion_port_sendto");				break;			}		}#ifdef DEBUG		if (debug > 3)			printf("WSASendTo - %d bytes to %s : %d\n", len, stoa(dest), Result);#endif		return (Result);	}	else {#ifdef DEBUG		if (debug) printf("Packet too large: %d Bytes\n", len);#endif		return ERROR_INSUFFICIENT_BUFFER;	}}/* * Async IO Write */DWORD	io_completion_port_write(	HANDLE fd,		char *pkt,		int len){	DWORD errval;	transmitbuf_t *buff = NULL;	DWORD lpNumberOfBytesWritten;	DWORD Result = ERROR_INSUFFICIENT_BUFFER;	IoCompletionInfo *lpo;	lpo = (IoCompletionInfo *) GetHeapAlloc("io_completion_port_write");	if (lpo == NULL)		return ERROR_OUTOFMEMORY;	if (len <= sizeof(buff->pkt)) {		buff = get_free_transmit_buffer();		if (buff == NULL) {			msyslog(LOG_ERR, "No more transmit buffers left - data discarded");			FreeHeap(lpo, "io_completion_port_write");		}		lpo->request_type = CLOCK_WRITE;		lpo->buff = (recvbuf_t *)buff;		memcpy(&buff->pkt, pkt, len);		Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, (LPOVERLAPPED) lpo);		if(Result == SOCKET_ERROR)		{			errval = WSAGetLastError();			switch (errval) {			case NO_ERROR :			case WSA_IO_INCOMPLETE :			case WSA_WAIT_IO_COMPLETION :			case WSA_IO_PENDING :				Result = ERROR_SUCCESS;				break ;			default :				netsyslog(LOG_ERR, "WriteFile - error sending message: %m");				free_transmit_buffer(buff);				FreeHeap(lpo, "io_completion_port_write");				break;			}		}#ifdef DEBUG			if (debug > 2) {				printf("WriteFile - %d bytes %d\n", len, Result);			}#endif			if (Result) return len;	}	else {#ifdef DEBUG		if (debug) printf("Packet too large: %d Bytes\n", len);#endif	}	return Result;}/* * GetReceivedBuffers * Note that this is in effect the main loop for processing requests * both send and receive. This should be reimplemented */int GetReceivedBuffers(){	DWORD Index = WaitForMultipleObjectsEx(MAXHANDLES, WaitHandles, FALSE, INFINITE, TRUE);	switch (Index) {	case WAIT_OBJECT_0 + 0 : /* exit request */		exit(0);		break;	case WAIT_OBJECT_0 + 1 : /* timer */		timer();		break;	case WAIT_OBJECT_0 + 2 : /* Io event */# ifdef DEBUG		if ( debug > 3 )		{			printf( "IoEvent occurred\n" );		}# endif		break;	case WAIT_IO_COMPLETION : /* loop */	case WAIT_TIMEOUT :		break;	case WAIT_FAILED:		msyslog(LOG_ERR, "ntpd: WaitForMultipleObjectsEx Failed: Error: %m");		break;		/* For now do nothing if not expected */	default:		break;							} /* switch */	return (full_recvbuffs());	/* get received buffers */}#else  static int NonEmptyCompilationUnit;#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -