📄 sock_iocp.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "sock.h"#include <mswsock.h>#include <stdio.h>typedef enum { SOCK_INVALID, SOCK_LISTENER, SOCK_SOCKET } SOCK_TYPE;typedef int SOCK_STATE;#define SOCK_ACCEPTING 0x0001#define SOCK_ACCEPTED 0x0002#define SOCK_CONNECTING 0x0004#define SOCK_READING 0x0008#define SOCK_WRITING 0x0010typedef struct sock_buffer{ int use_iov; DWORD num_bytes; OVERLAPPED ovl; void *buffer; int bufflen; SOCK_IOV *iov; int iovlen; int index; int total; int (*progress_update)(int,void*);} sock_buffer;typedef struct sock_state_t{ SOCK_TYPE type; SOCK_STATE state; SOCKET sock; /* listener/accept structures */ SOCKET listen_sock; char accept_buffer[sizeof(struct sockaddr_in)*2+32]; /* read and write structures */ sock_buffer read; sock_buffer write; /* user pointer */ void *user_ptr;} sock_state_t;#define DEFAULT_NUM_RETRIES 10static int g_connection_attempts = DEFAULT_NUM_RETRIES;static int g_num_cp_threads = 2;/* utility allocator functions */typedef struct BlockAllocator_struct * BlockAllocator;BlockAllocator BlockAllocInit(unsigned int blocksize, int count, int incrementsize, void *(* alloc_fn)(unsigned int size), void (* free_fn)(void *p));int BlockAllocFinalize(BlockAllocator *p);void * BlockAlloc(BlockAllocator p);int BlockFree(BlockAllocator p, void *pBlock);struct BlockAllocator_struct{ void **pNextFree; void *(* alloc_fn)(size_t size); void (* free_fn)(void *p); struct BlockAllocator_struct *pNextAllocation; unsigned int nBlockSize; int nCount, nIncrementSize;#ifdef WITH_ALLOCATOR_LOCKING MPIDU_Lock_t lock;#endif};static int g_nLockSpinCount = 100;#ifdef WITH_ALLOCATOR_LOCKINGtypedef volatile long MPIDU_Lock_t;#include <errno.h>#ifdef HAVE_WINDOWS_H#include <winsock2.h>#include <windows.h>#endifstatic inline void MPIDU_Init_lock( MPIDU_Lock_t *lock ){ *(lock) = 0;}static inline void MPIDU_Lock( MPIDU_Lock_t *lock ){ int i; for (;;) { for (i=0; i<g_nLockSpinCount; i++) { if (*lock == 0) {#ifdef HAVE_INTERLOCKEDEXCHANGE if (InterlockedExchange((LPLONG)lock, 1) == 0) { /*printf("lock %x\n", lock);fflush(stdout);*/ MPID_PROFILE_OUT(MPIDU_BUSY_LOCK); return; }#elif defined(HAVE_COMPARE_AND_SWAP) if (compare_and_swap(lock, 0, 1) == 1) { MPID_PROFILE_OUT(MPIDU_BUSY_LOCK); return; }#else#error Atomic memory operation needed to implement busy locks#endif } } MPIDU_Yield(); }}static inline void MPIDU_Unlock( MPIDU_Lock_t *lock ){ *(lock) = 0;}static inline void MPIDU_Busy_wait( MPIDU_Lock_t *lock ){ int i; for (;;) { for (i=0; i<g_nLockSpinCount; i++) if (!*lock) { return; } MPIDU_Yield(); }}static inline void MPIDU_Free_lock( MPIDU_Lock_t *lock ){}/*@ MPIDU_Compare_swap - Parameters:+ void **dest. void *new_val. void *compare_val. MPIDU_Lock_t *lock- void **original_val Notes:@*/static inline int MPIDU_Compare_swap( void **dest, void *new_val, void *compare_val, MPIDU_Lock_t *lock, void **original_val ){ /* dest = pointer to value to be checked (address size) new_val = value to set dest to if *dest == compare_val original_val = value of dest prior to this operation */#ifdef HAVE_NT_LOCKS /* *original_val = (void*)InterlockedCompareExchange(dest, new_val, compare_val); */ *original_val = InterlockedCompareExchangePointer(dest, new_val, compare_val);#elif defined(HAVE_COMPARE_AND_SWAP) if (compare_and_swap((volatile long *)dest, (long)compare_val, (long)new_val)) *original_val = new_val;#else#error Locking functions not defined#endif return 0;}#endif /* WITH_ALLOCATOR_LOCKING */static BlockAllocator BlockAllocInit(unsigned int blocksize, int count, int incrementsize, void *(* alloc_fn)(unsigned int size), void (* free_fn)(void *p)){ BlockAllocator p; void **ppVoid; int i; p = alloc_fn( sizeof(struct BlockAllocator_struct) + ((blocksize + sizeof(void**)) * count) ); p->alloc_fn = alloc_fn; p->free_fn = free_fn; p->nIncrementSize = incrementsize; p->pNextAllocation = NULL; p->nCount = count; p->nBlockSize = blocksize; p->pNextFree = (void**)(p + 1);#ifdef WITH_ALLOCATOR_LOCKING MPIDU_Init_lock(&p->lock);#endif ppVoid = (void**)(p + 1); for (i=0; i<count-1; i++) { *ppVoid = (void*)((char*)ppVoid + sizeof(void**) + blocksize); ppVoid = *ppVoid; } *ppVoid = NULL; return p;}static int BlockAllocFinalize(BlockAllocator *p){ if (*p == NULL) return 0; BlockAllocFinalize(&(*p)->pNextAllocation); if ((*p)->free_fn != NULL) (*p)->free_fn(*p); *p = NULL; return 0;}static void * BlockAlloc(BlockAllocator p){ void *pVoid; #ifdef WITH_ALLOCATOR_LOCKING MPIDU_Lock(&p->lock);#endif pVoid = p->pNextFree + 1; if (*(p->pNextFree) == NULL) { BlockAllocator pIter = p; while (pIter->pNextAllocation != NULL) pIter = pIter->pNextAllocation; pIter->pNextAllocation = BlockAllocInit(p->nBlockSize, p->nIncrementSize, p->nIncrementSize, p->alloc_fn, p->free_fn); p->pNextFree = pIter->pNextFree; } else p->pNextFree = *(p->pNextFree);#ifdef WITH_ALLOCATOR_LOCKING MPIDU_Unlock(&p->lock);#endif return pVoid;}static int BlockFree(BlockAllocator p, void *pBlock){#ifdef WITH_ALLOCATOR_LOCKING MPIDU_Lock(&p->lock);#endif ((void**)pBlock)--; *((void**)pBlock) = p->pNextFree; p->pNextFree = pBlock;#ifdef WITH_ALLOCATOR_LOCKING MPIDU_Unlock(&p->lock);#endif return 0;}/* utility socket functions */static int easy_create(SOCKET *sock, int port, unsigned long addr){ struct linger linger; int optval, len; SOCKET temp_sock; SOCKADDR_IN sockAddr; /* create the non-blocking socket */ temp_sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (temp_sock == INVALID_SOCKET) return SOCK_FAIL; memset(&sockAddr,0,sizeof(sockAddr)); sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = addr; sockAddr.sin_port = htons((unsigned short)port); if (bind(temp_sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR) return SOCK_FAIL; /* Set the linger on close option */ linger.l_onoff = 1 ; linger.l_linger = 60; setsockopt(temp_sock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger)); /* set the socket buffers */ len = sizeof(int); if (!getsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, &len)) { optval = 64*1024; setsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int)); } len = sizeof(int); if (!getsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, &len)) { optval = 64*1024; setsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int)); } /* prevent the socket from being inherited by child processes */ DuplicateHandle( GetCurrentProcess(), (HANDLE)temp_sock, GetCurrentProcess(), (HANDLE*)sock, 0, FALSE, DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS); /* Set the no-delay option */ setsockopt(*sock, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval)); return SOCK_SUCCESS;}static inline int easy_get_sock_info(SOCKET sock, char *name, int *port){ struct sockaddr_in addr; int name_len = sizeof(addr); getsockname(sock, (struct sockaddr*)&addr, &name_len); *port = ntohs(addr.sin_port); gethostname(name, 100); return 0;}static inline void init_state_struct(sock_state_t *p){ p->listen_sock = INVALID_SOCKET; p->sock = INVALID_SOCKET; p->user_ptr = NULL; p->type = 0; p->state = 0; p->read.total = 0; p->read.num_bytes = 0; p->read.buffer = NULL; p->read.iov = NULL; p->read.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); p->read.ovl.Offset = 0; p->read.ovl.OffsetHigh = 0; p->read.progress_update = NULL; p->write.total = 0; p->write.num_bytes = 0; p->write.buffer = NULL; p->write.iov = NULL; p->write.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); p->write.ovl.Offset = 0; p->write.ovl.OffsetHigh = 0; p->write.progress_update = NULL;}static inline int post_next_accept(sock_state_t * listen_state){ int error; listen_state->state = SOCK_ACCEPTING; listen_state->sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (listen_state->sock == INVALID_SOCKET) return SOCK_FAIL; if (!AcceptEx( listen_state->listen_sock, listen_state->sock, listen_state->accept_buffer, 0, sizeof(struct sockaddr_in)+16, sizeof(struct sockaddr_in)+16, &listen_state->read.num_bytes, &listen_state->read.ovl)) { error = WSAGetLastError(); if (error == ERROR_IO_PENDING) return TRUE; printf("AcceptEx failed with error %d\n", error); return FALSE; } return TRUE;}/* sock functions */static BlockAllocator g_StateAllocator;int sock_init(){ char *szNum; WSADATA wsaData; int err; /* Start the Winsock dll */ if ((err = WSAStartup(MAKEWORD(2, 0), &wsaData)) != 0) return err; /* get the connection retry value */ szNum = getenv("SOCK_CONNECT_TRIES"); if (szNum != NULL) { g_connection_attempts = atoi(szNum); if (g_connection_attempts < 1) g_connection_attempts = DEFAULT_NUM_RETRIES; } g_StateAllocator = BlockAllocInit(sizeof(sock_state_t), 1000, 500, malloc, free); return SOCK_SUCCESS;}int sock_finalize(){ WSACleanup(); BlockAllocFinalize(&g_StateAllocator); return SOCK_SUCCESS;}int sock_create_set(sock_set_t *set){ HANDLE port; port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, g_num_cp_threads); if (port != NULL) { *set = port; return SOCK_SUCCESS; } return SOCK_FAIL;}int sock_destroy_set(sock_set_t set){ BOOL b; b = CloseHandle(set); if (b == TRUE) return SOCK_SUCCESS; return SOCK_FAIL;}static int listening = 0;int sock_listen(sock_set_t set, void * user_ptr, int *port, sock_t *listener){ char host[100]; DWORD num_read = 0; sock_state_t * listen_state; if (listening) return SOCK_FAIL; listening = 1; listen_state = (sock_state_t*)BlockAlloc(g_StateAllocator); init_state_struct(listen_state);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -