⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 刚才是说明 现在是安装程序在 LINUX环境下进行编程的MPICH安装文件
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"#include <netinet/tcp.h>#include <sys/poll.h>#if !defined(INFTIM)#define INFTIM -1#endif#include <netdb.h>/* extern int h_errno; */volatile unsigned int MPIDI_CH3I_progress_completions = 0;typedef enum{    TCP_STATE_UNCONNECTED,    TCP_STATE_LISTENING,    TCP_STATE_CONNECTING,    TCP_STATE_OPEN_EXCHANGE,    TCP_STATE_CONNECTED,    TCP_STATE_DISCONNECTING,    TCP_STATE_CLOSED,    TCP_STATE_FAILED}tcp_state_t;struct pollinfo{    MPIDI_VC * vc;    tcp_state_t state;    MPID_Request * send_active;    MPID_Request * recv_active;    MPID_Request req;};static int listener_elem = -1;static short listener_port = 0;static struct pollfd * poll_fds = NULL;static struct pollinfo * poll_infos = NULL;static int poll_num = 0;static int poll_sz = 0;static int shutting_down = FALSE;static int poll_elem_alloc(int fd);static void poll_elem_free(int elem);#include "ch3_progress_poll_elem.i"/* #include "ch3_progress_flags.i" */static inline void post_pkt_send(int elem);static inline void post_pkt_recv(int elem);static inline void post_queued_send(int elem);static inline void handle_pollin(int elem);static inline void handle_pollout(int elem);static inline void make_progress(int is_blocking);#if defined(MPICH_DBG_OUTPUT)#define DBGMSG(e) DbgMsg e#define DbgMsg(level, str)													\{																\    if (vc)															\    {																\	MPIDI_DBG_PRINTF((level, FCNAME, str ## ", pg_rank=%d, vc=0x%08x, vc.fd=%d, elem.fd=%d, vc.elem=%d elem=%d",		\			  vc->tcp.pg_rank, (unsigned) vc, vc->tcp.fd, poll_fds[elem].fd, vc->tcp.poll_elem, elem));		\    }																\    else															\    {																\	MPIDI_DBG_PRINTF((level, FCNAME, str ## ", vc=0x%08x, elem.fd=%d, elem=%d", (unsigned) vc, poll_fds[elem].fd, elem));	\    }																\}#else#define DBGMSG(e)#endifvoid MPIDI_CH3_Progress_start(){    /* MT - This function is empty for the single-threaded implementation */}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress(int is_blocking){    unsigned register count;    unsigned completions = MPIDI_CH3I_progress_completions;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS);    MPIDI_DBG_PRINTF((50, FCNAME, "entering, blocking=%s", is_blocking ? "true" : "false"));    do    {	make_progress(is_blocking);    }    while (completions == MPIDI_CH3I_progress_completions && is_blocking);    count = MPIDI_CH3I_progress_completions - completions;    MPIDI_DBG_PRINTF((50, FCNAME, "exiting, count=%d", count));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS);    return count;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_poke#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3_Progress_poke(){    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_POKE);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_POKE);    make_progress(0);    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_POKE);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_end#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3_Progress_end(){    /* MT - This function is empty for the single-threaded implementation */}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(){    int fd;    long flags;    struct sockaddr_in addr;    socklen_t addr_len;    int rc;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_INIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_INIT);    MPIDI_DBG_PRINTF((60, FCNAME, "entering"));         /* establish non-blocking listener */    fd = socket(PF_INET, SOCK_STREAM, 0);    assert(fd != -1);    flags = fcntl(fd, F_GETFL, 0);    assert(flags != -1);    rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);    assert(rc != -1);		    memset(&addr, 0, sizeof(addr));    addr.sin_family = AF_INET;    addr.sin_addr.s_addr = htonl(INADDR_ANY);    addr.sin_port = htons(0);    rc = bind(fd, (struct sockaddr *) &addr, sizeof(addr));    assert(rc != -1);    rc = listen(fd, SOMAXCONN);    assert(rc != -1);    /* add listener to poll list */    listener_elem = poll_elem_alloc(fd);    assert(listener_elem >= 0);    poll_infos[listener_elem].vc = NULL;    poll_infos[listener_elem].state = TCP_STATE_LISTENING;    poll_infos[listener_elem].send_active = NULL;    poll_infos[listener_elem].recv_active = NULL;    poll_fds[listener_elem].events = POLLIN;    /* record listener port */    addr_len = sizeof(addr);    rc = getsockname(fd, (struct sockaddr *) &addr, &addr_len);    assert(rc != -1);    listener_port = ntohs(addr.sin_port);    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_INIT);    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(){    /* MT - in a multi-threaded environment, finalize() should signal any       thread(s) blocking on poll() or select() and wait for those threads to       complete before destroying the progress engine data structures. */    int rc;    int fd;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_FINALIZE);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_FINALIZE);        MPIDI_DBG_PRINTF((60, FCNAME, "entering"));    MPI_Barrier(MPI_COMM_WORLD); /* this barrier may not be necessary */    shutting_down = TRUE;    MPI_Barrier(MPI_COMM_WORLD);        /* Shut down the listener */    fd = poll_fds[listener_elem].fd;    poll_fds[listener_elem].events = 0;    poll_elem_free(listener_elem);    rc = close(fd);    assert(rc != -1);    listener_elem = -1;    listener_port = 0;    /* XXX - need to cleanly shutdown other sockets. (close protocol???) */        if (poll_sz > 0)    {	MPIU_Free(poll_fds);	MPIU_Free(poll_infos);	poll_fds = NULL;	poll_infos = NULL;	poll_sz = 0;	poll_num = 0;    }    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_FINALIZE);    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Listener_get_port#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)short MPIDI_CH3I_Listener_get_port(){    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_LISTENER_GET_PORT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_LISTENER_GET_PORT);    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_LISTENER_GET_PORT);    return listener_port;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_TCP_post_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_TCP_post_connect(MPIDI_VC * vc){    char * key;    char * val;    int key_max_sz;    int val_max_sz;    struct hostent * hostent;    struct sockaddr_in addr;    short port;    long flags;    int nodelay;    int fd;    int elem;    int rc;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_TCP_POST_CONNECT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_TCP_POST_CONNECT);        MPIDI_DBG_PRINTF((60, FCNAME, "entering"));    if (vc->tcp.fd >= 0)    {	goto fn_exit;    }    vc->tcp.state = MPIDI_CH3I_VC_STATE_CONNECTING;        key_max_sz = PMI_KVS_Get_key_length_max();    key = MPIU_Malloc(key_max_sz);    assert(key != NULL);    val_max_sz = PMI_KVS_Get_value_length_max();    val = MPIU_Malloc(val_max_sz);    assert(val != NULL);        rc = snprintf(key, key_max_sz, "P%d-hostname", vc->tcp.pg_rank);    assert(rc > -1 && rc < key_max_sz);    rc = PMI_KVS_Get(vc->tcp.pg->kvs_name, key, val);    assert(rc == 0);    hostent = gethostbyname(val);    assert (hostent != NULL);    assert(hostent->h_addrtype == AF_INET);    assert(hostent->h_length == sizeof(addr.sin_addr.s_addr));    memset(&addr, 0, sizeof(addr));    addr.sin_family = AF_INET;    memcpy(&addr.sin_addr.s_addr, hostent->h_addr_list[0],	   sizeof(addr.sin_addr.s_addr));    /* TODO: rewrite the above code to save all addrs.  each addr should be tried until either a connection is successfully       established or all addrs have been tried.  this obviously complicates the progress engine error handling as well as the data       structures that must be maintained between calls. */            rc = snprintf(key, key_max_sz, "P%d-port", vc->tcp.pg_rank);    assert(rc > -1 && rc < key_max_sz);    rc = PMI_KVS_Get(vc->tcp.pg->kvs_name, key, val);    assert(rc == 0);    rc = sscanf(val, "%hd", &port);    assert(rc == 1);    addr.sin_port = htons(port);        fd = socket(PF_INET, SOCK_STREAM, 0);    assert(fd != -1);    flags = fcntl(fd, F_GETFL, 0);    assert(flags != -1);    rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);    assert(rc != -1);    nodelay = 1;    rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));    assert(rc == 0);        elem = poll_elem_alloc(fd);    assert(elem >= 0);    poll_infos[elem].vc = vc;    poll_infos[elem].state = TCP_STATE_CONNECTING;    poll_infos[elem].send_active = NULL;    poll_infos[elem].recv_active = NULL;    poll_fds[elem].events = 0;    vc->tcp.poll_elem = elem;    DBGMSG((65, "calling connect()"));    do    {	rc = connect(fd, (struct sockaddr *) &addr, sizeof(addr));    }    while (rc == -1 && errno == EINTR);        if (rc == 0)    {	DBGMSG((65, "connect() suceeded immediately"));	/* TODO: if connection alredy formed, start sending open req msg */	poll_fds[elem].events |= POLLOUT;    }    else if (errno == EINPROGRESS)    {	DBGMSG((65, "connect() pending"));	poll_fds[elem].events |= POLLOUT;    }    else    {	DBGMSG((65, "connect() failed"));	assert(errno == EINPROGRESS);	poll_fds[elem].events |= POLLOUT;	vc->tcp.poll_elem = -1;	vc->tcp.state = MPIDI_CH3I_VC_STATE_FAILED;    }        MPIU_Free(val);    MPIU_Free(key);  fn_exit:    MPIDI_DBG_PRINTF((60, FCNAME, "exiting"));    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_TCP_POST_CONNECT);    return MPI_SUCCESS;}void MPIDI_CH3I_TCP_post_read(MPIDI_VC * vc, MPID_Request * req){    register int elem = vc->tcp.poll_elem;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_TCP_POST_READ);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_TCP_POST_READ);        assert(poll_infos[elem].vc == vc);    assert(poll_infos[elem].vc->tcp.fd == poll_fds[elem].fd);        poll_infos[elem].recv_active = req;    poll_fds[elem].events |= POLLIN;    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_TCP_POST_READ);}void MPIDI_CH3I_TCP_post_write(MPIDI_VC * vc, MPID_Request * req){    register int elem = vc->tcp.poll_elem;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_TCP_POST_WRITE);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_TCP_POST_WRITE);        assert(poll_infos[elem].vc == vc);    assert(poll_infos[elem].vc->tcp.fd == poll_fds[elem].fd);        /* req better be the request at the head of the send queue */        poll_infos[elem].send_active = req;    poll_fds[elem].events |= POLLOUT;    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_TCP_POST_WRITE);}/* * MPIDI_CH3I_Request_adjust_iov() * * Adjust the iovec in the request by the supplied number of bytes.  If the iovec has been consumed, return true; otherwise return * false. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Request_adjust_iov#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Request_adjust_iov(MPID_Request * req, MPIDI_msg_sz_t nb){    int offset = req->tcp.iov_offset;    const int count = req->ch3.iov_count;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_REQUEST_ADJUST_IOV);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_REQUEST_ADJUST_IOV);        while (offset < count)    {	if (req->ch3.iov[offset].MPID_IOV_LEN <= nb)	{	    nb -= req->ch3.iov[offset].MPID_IOV_LEN;	    offset++;	}	else	{	    req->ch3.iov[offset].MPID_IOV_BUF = 		(char *) req->ch3.iov[offset].MPID_IOV_BUF + nb;	    req->ch3.iov[offset].MPID_IOV_LEN -= nb;	    req->tcp.iov_offset = offset;	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_REQUEST_ADJUST_IOV);	    return FALSE;	}    }        req->tcp.iov_offset = offset;    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_REQUEST_ADJUST_IOV);    return TRUE;}#undef FUNCNAME#undef FCNAMEstatic inline void post_pkt_send(int elem){    MPIDI_STATE_DECL(MPID_STATE_POST_PKT_SEND);    MPIDI_FUNC_ENTER(MPID_STATE_POST_PKT_SEND);    poll_infos[elem].req.ch3.iov[0].MPID_IOV_BUF =	(char *)&poll_infos[elem].req.tcp.pkt;    poll_infos[elem].req.ch3.iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);    poll_infos[elem].req.ch3.iov_count = 1;    poll_infos[elem].req.tcp.iov_offset = 0;    poll_infos[elem].req.ch3.ca = MPIDI_CH3I_CA_HANDLE_PKT;    poll_infos[elem].send_active = &poll_infos[elem].req;    poll_fds[elem].events |= POLLOUT;    /* TODO: try sending pkt immediately - if we are already processing sends for this connection, then the send will be attempted       as soon as we return to handle_pollout() */    MPIDI_FUNC_EXIT(MPID_STATE_POST_PKT_SEND);}static inline void post_pkt_recv(int elem){    MPIDI_STATE_DECL(MPID_STATE_POST_PKT_RECV);    MPIDI_FUNC_ENTER(MPID_STATE_POST_PKT_RECV);    poll_infos[elem].req.ch3.iov[0].MPID_IOV_BUF =	(char *)&poll_infos[elem].req.tcp.pkt;    poll_infos[elem].req.ch3.iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);    poll_infos[elem].req.ch3.iov_count = 1;    poll_infos[elem].req.tcp.iov_offset = 0;    poll_infos[elem].req.ch3.ca = MPIDI_CH3I_CA_HANDLE_PKT;    poll_infos[elem].recv_active = &poll_infos[elem].req;    poll_fds[elem].events |= POLLIN;    MPIDI_FUNC_EXIT(MPID_STATE_POST_PKT_RECV);}#undef FUNCNAME#define FUNCNAME post_queued_send#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void post_queued_send(int elem){    MPIDI_VC * vc = poll_infos[elem].vc;    MPIDI_STATE_DECL(MPID_STATE_POST_QUEUED_SEND);    MPIDI_FUNC_ENTER(MPID_STATE_POST_QUEUED_SEND);        assert(vc != NULL);    poll_infos[elem].send_active = MPIDI_CH3I_SendQ_head(vc); /* MT */    if (poll_infos[elem].send_active != NULL)    {	MPIDI_DBG_PRINTF((75, FCNAME, "elem=%d, queued message, send active", elem));	poll_fds[elem].events |= POLLOUT;    }    else    {	MPIDI_DBG_PRINTF((75, FCNAME, "elem=%d, queue empty, send deactivated", elem));	poll_fds[elem].events &= ~POLLOUT;    }    MPIDI_FUNC_EXIT(MPID_STATE_POST_QUEUED_SEND);}#undef FUNCNAME#define FUNCNAME handle_error#undef FCNAME

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -