⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sock.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 5 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpiimpl.h"#include "mpidu_sock.h"#define MPIDI_QUOTE(A) MPIDI_QUOTE2(A)#define MPIDI_QUOTE2(A) #A#define SOCKI_TCP_BUFFER_SIZE       32*1024#define SOCKI_DESCRIPTION_LENGTH    256#define SOCKI_NUM_PREPOSTED_ACCEPTS 32typedef enum SOCKI_TYPE { SOCKI_INVALID, SOCKI_LISTENER, SOCKI_SOCKET, SOCKI_WAKEUP } SOCKI_TYPE;typedef int SOCKI_STATE;#define SOCKI_ACCEPTING  0x0001#define SOCKI_CONNECTING 0x0004#define SOCKI_READING    0x0008#define SOCKI_WRITING    0x0010typedef struct sock_buffer{    DWORD num_bytes;    OVERLAPPED ovl;    MPID_IOV tiov;#ifdef USE_SOCK_IOV_COPY    MPID_IOV iov[MPID_IOV_MAXLEN];#else    MPID_IOV *iov;#endif    int iovlen;    int index;    MPIU_Size_t total;    MPIDU_Sock_progress_update_func_t progress_update;} sock_buffer;typedef struct sock_state_t{    SOCKI_TYPE type;    SOCKI_STATE state;    SOCKET sock;    MPIDU_Sock_set_t set;    int closing;    int pending_operations;    /* listener/accept structures */    SOCKET listen_sock;    char accept_buffer[sizeof(struct sockaddr_in)*2+32];    /* read and write structures */    sock_buffer read;    sock_buffer write;    /* connect structures */    char *cur_host;    char host_description[SOCKI_DESCRIPTION_LENGTH];    /* user pointer */    void *user_ptr;    /* internal list */    struct sock_state_t *list, *next;    int accepted;    int listener_closed;    /* timing variables for completion notification */    /*    double rt1, rt2;    double wt1, wt2;    double ct1, ct2;    */    struct sock_state_t *next_sock;} sock_state_t;static int g_num_cp_threads = 2;static int g_socket_buffer_size = SOCKI_TCP_BUFFER_SIZE;static int g_socket_rbuffer_size = SOCKI_TCP_BUFFER_SIZE;static int g_socket_sbuffer_size = SOCKI_TCP_BUFFER_SIZE;static int g_init_called = 0;static int g_num_posted_accepts = SOCKI_NUM_PREPOSTED_ACCEPTS;static int g_min_port = 0;static int g_max_port = 0;static sock_state_t *g_sock_list = NULL;/* empty structure used to wake up a sock_wait call */sock_state_t g_wakeup_state;static void translate_error(int error, char *msg, char *prepend){    HLOCAL str;    int num_bytes;    num_bytes = FormatMessage(	FORMAT_MESSAGE_FROM_SYSTEM |	FORMAT_MESSAGE_ALLOCATE_BUFFER,	0,	error,	MAKELANGID( LANG_NEUTRAL, SUBLANG_DEFAULT ),	(LPTSTR) &str,	0,0);    if (num_bytes == 0)    {	if (prepend != NULL)	    MPIU_Strncpy(msg, prepend, 1024);	else	    *msg = '\0';    }    else    {	if (prepend == NULL)	    memcpy(msg, str, num_bytes+1);	else	    MPIU_Snprintf(msg, 1024, "%s%s", prepend, (const char*)str);	LocalFree(str);	strtok(msg, "\r\n");    }}static char *get_error_string(int error_code){    /* obviously not thread safe to store a message like this */    static char error_msg[1024];    translate_error(error_code, error_msg, NULL);    return error_msg;}#undef FUNCNAME#define FUNCNAME easy_create#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int easy_create_ranged(SOCKET *sock, int port, unsigned long addr){    int mpi_errno;    /*struct linger linger;*/    int optval, len;    SOCKET temp_sock;    SOCKADDR_IN sockAddr;    int use_range = 0;    /* create the non-blocking socket */    temp_sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);    if (temp_sock == INVALID_SOCKET)    {	mpi_errno = WSAGetLastError();	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", "**socket %s %d", get_error_string(mpi_errno), mpi_errno);	return mpi_errno;    }        if (port == 0 && g_min_port != 0 && g_max_port != 0)    {	use_range = 1;	port = g_min_port;    }    memset(&sockAddr,0,sizeof(sockAddr));        sockAddr.sin_family = AF_INET;    sockAddr.sin_addr.s_addr = addr;    sockAddr.sin_port = htons((unsigned short)port);    for (;;)    {	if (bind(temp_sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR)	{	    if (use_range)	    {		port++;		if (port > g_max_port)		{		    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", 0);		    return mpi_errno;		}		sockAddr.sin_port = htons((unsigned short)port);	    }	    else	    {		mpi_errno = WSAGetLastError();		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", "**socket %s %d", get_error_string(mpi_errno), mpi_errno);		return mpi_errno;	    }	}	else	{	    break;	}    }    /* Set the linger on close option */    /*    linger.l_onoff = 1 ;    linger.l_linger = 60;    setsockopt(temp_sock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));    */    /* set the socket to non-blocking */    /*    optval = TRUE;    ioctlsocket(temp_sock, FIONBIO, &optval);    */    /* set the socket buffers */    len = sizeof(int);    if (!getsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, &len))    {	optval = g_socket_rbuffer_size;	setsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int));    }    len = sizeof(int);    if (!getsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, &len))    {	optval = g_socket_sbuffer_size;	setsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int));    }    /* prevent the socket from being inherited by child processes */    if (!DuplicateHandle(	GetCurrentProcess(), (HANDLE)temp_sock,	GetCurrentProcess(), (HANDLE*)sock,	0, FALSE, DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS))    {	mpi_errno = GetLastError();	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**duphandle", "**duphandle %s %d", get_error_string(mpi_errno), mpi_errno);	return mpi_errno;    }    /* Set the no-delay option */    setsockopt(*sock, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval));    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME easy_create#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int easy_create(SOCKET *sock, int port, unsigned long addr){    int mpi_errno;    /*struct linger linger;*/    int optval, len;    SOCKET temp_sock;    SOCKADDR_IN sockAddr;    /* create the non-blocking socket */    temp_sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);    if (temp_sock == INVALID_SOCKET)    {	mpi_errno = WSAGetLastError();	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", "**socket %s %d", get_error_string(mpi_errno), mpi_errno);	return mpi_errno;    }        memset(&sockAddr,0,sizeof(sockAddr));        sockAddr.sin_family = AF_INET;    sockAddr.sin_addr.s_addr = addr;    sockAddr.sin_port = htons((unsigned short)port);    if (bind(temp_sock, (SOCKADDR*)&sockAddr, sizeof(sockAddr)) == SOCKET_ERROR)    {	mpi_errno = WSAGetLastError();	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**socket", "**socket %s %d", get_error_string(mpi_errno), mpi_errno);	return mpi_errno;    }        /* Set the linger on close option */    /*    linger.l_onoff = 1 ;    linger.l_linger = 60;    setsockopt(temp_sock, SOL_SOCKET, SO_LINGER, (char*)&linger, sizeof(linger));    */    /* set the socket to non-blocking */    /*    optval = TRUE;    ioctlsocket(temp_sock, FIONBIO, &optval);    */    /* set the socket buffers */    len = sizeof(int);    if (!getsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, &len))    {	optval = g_socket_rbuffer_size;	setsockopt(temp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&optval, sizeof(int));    }    len = sizeof(int);    if (!getsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, &len))    {	optval = g_socket_sbuffer_size;	setsockopt(temp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&optval, sizeof(int));    }    /* prevent the socket from being inherited by child processes */    if (!DuplicateHandle(	GetCurrentProcess(), (HANDLE)temp_sock,	GetCurrentProcess(), (HANDLE*)sock,	0, FALSE, DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS))    {	mpi_errno = GetLastError();	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**duphandle", "**duphandle %s %d", get_error_string(mpi_errno), mpi_errno);	return mpi_errno;    }    /* Set the no-delay option */    setsockopt(*sock, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(optval));    return MPI_SUCCESS;}static inline int easy_get_sock_info(SOCKET sock, char *name, int *port){    struct sockaddr_in addr;    int name_len = sizeof(addr);    DWORD len = 100;    getsockname(sock, (struct sockaddr*)&addr, &name_len);    *port = ntohs(addr.sin_port);    /*GetComputerName(name, &len);*/    GetComputerNameEx(ComputerNameDnsFullyQualified, name, &len);    /*gethostname(name, 100);*/    return 0;}static inline void init_state_struct(sock_state_t *p){    p->listen_sock = INVALID_SOCKET;    p->sock = INVALID_SOCKET;    p->set = INVALID_HANDLE_VALUE;    p->user_ptr = NULL;    p->type = 0;    p->state = 0;    p->closing = FALSE;    p->pending_operations = 0;    p->read.total = 0;    p->read.num_bytes = 0;    p->read.tiov.MPID_IOV_BUF = NULL;#ifndef USE_SOCK_IOV_COPY    p->read.iov = NULL;#endif    p->read.iovlen = 0;    p->read.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);    p->read.ovl.Offset = 0;    p->read.ovl.OffsetHigh = 0;    p->read.progress_update = NULL;    p->write.total = 0;    p->write.num_bytes = 0;    p->write.tiov.MPID_IOV_BUF = NULL;#ifndef USE_SOCK_IOV_COPY    p->write.iov = NULL;#endif    p->write.iovlen = 0;    p->write.ovl.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);    p->write.ovl.Offset = 0;    p->write.ovl.OffsetHigh = 0;    p->write.progress_update = NULL;    p->list = NULL;    p->next = NULL;    p->accepted = 0;    p->listener_closed = 0;    /*    p->bogus_t1 = 0;    p->bogus_t2 = 0;    */    p->next_sock = g_sock_list;    g_sock_list = p;}#undef FUNCNAME#define FUNCNAME post_next_accept#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int post_next_accept(sock_state_t * context){    int mpi_errno;    context->state = SOCKI_ACCEPTING;    context->accepted = 0;    /*printf("posting an accept.\n");fflush(stdout);*/    context->sock = WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);    if (context->sock == INVALID_SOCKET)    {	mpi_errno = WSAGetLastError();	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %s %d", get_error_string(mpi_errno), mpi_errno);	return mpi_errno;    }    if (!AcceptEx(	context->listen_sock, 	context->sock, 	context->accept_buffer, 	0, 	sizeof(struct sockaddr_in)+16, 	sizeof(struct sockaddr_in)+16, 	&context->read.num_bytes,	&context->read.ovl))    {	mpi_errno = WSAGetLastError();	if (mpi_errno == ERROR_IO_PENDING)	    return MPI_SUCCESS;	/*MPIU_Error_printf("AcceptEx failed with error %d\n", error);fflush(stdout);*/	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**fail", "**fail %s %d", get_error_string(mpi_errno), mpi_errno);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -