📄 ch3_progress.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"#include <netinet/tcp.h>#include <sys/poll.h>#if !defined(INFTIM)#define INFTIM -1#endif#include <netdb.h>/* extern int h_errno; */volatile unsigned int MPIDI_CH3I_progress_completions = 0;typedef enum{ TCP_STATE_UNCONNECTED, TCP_STATE_LISTENING, TCP_STATE_CONNECTING, TCP_STATE_OPEN_EXCHANGE, TCP_STATE_CONNECTED, TCP_STATE_DISCONNECTING, TCP_STATE_CLOSED, TCP_STATE_FAILED}tcp_state_t;struct pollinfo{ MPIDI_VC * vc; tcp_state_t state; MPID_Request * send_active; MPID_Request * recv_active; MPID_Request req;};static int listener_elem = -1;static short listener_port = 0;static struct pollfd * poll_fds = NULL;static struct pollinfo * poll_infos = NULL;static int poll_num = 0;static int poll_sz = 0;static int shutting_down = FALSE;static int poll_elem_alloc(int fd);static void poll_elem_free(int elem);#include "ch3_progress_poll_elem.i"/* #include "ch3_progress_flags.i" */static inline void post_pkt_send(int elem);static inline void post_pkt_recv(int elem);static inline void post_queued_send(int elem);static inline void handle_pollin(int elem);static inline void handle_pollout(int elem);static inline void make_progress(int is_blocking);#if defined(MPICH_DBG_OUTPUT)#define DBGMSG(e) DbgMsg e#define DbgMsg(level, str) \{ \ if (vc) \ { \ MPIDI_DBG_PRINTF((level, FCNAME, str ## ", pg_rank=%d, vc=0x%08x, vc.fd=%d, elem.fd=%d, vc.elem=%d elem=%d", \ vc->tcp.pg_rank, (unsigned) vc, vc->tcp.fd, poll_fds[elem].fd, vc->tcp.poll_elem, elem)); \ } \ else \ { \ MPIDI_DBG_PRINTF((level, FCNAME, str ## ", vc=0x%08x, elem.fd=%d, elem=%d", (unsigned) vc, poll_fds[elem].fd, elem)); \ } \}#else#define DBGMSG(e)#endifvoid MPIDI_CH3_Progress_start(){ /* MT - This function is empty for the single-threaded implementation */}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress(int is_blocking){ unsigned register count; unsigned completions = MPIDI_CH3I_progress_completions; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS); MPIDI_DBG_PRINTF((50, FCNAME, "entering, blocking=%s", is_blocking ? "true" : "false")); do { make_progress(is_blocking); } while (completions == MPIDI_CH3I_progress_completions && is_blocking); count = MPIDI_CH3I_progress_completions - completions; MPIDI_DBG_PRINTF((50, FCNAME, "exiting, count=%d", count)); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS); return count;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_poke#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3_Progress_poke(){ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_POKE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_POKE); make_progress(0); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_POKE);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_end#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3_Progress_end(){ /* MT - This function is empty for the single-threaded implementation */}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(){ int fd; long flags; struct sockaddr_in addr; socklen_t addr_len; int rc; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_INIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_INIT); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); /* establish non-blocking listener */ fd = socket(PF_INET, SOCK_STREAM, 0); assert(fd != -1); flags = fcntl(fd, F_GETFL, 0); assert(flags != -1); rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); assert(rc != -1); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(0); rc = bind(fd, (struct sockaddr *) &addr, sizeof(addr)); assert(rc != -1); rc = listen(fd, SOMAXCONN); assert(rc != -1); /* add listener to poll list */ listener_elem = poll_elem_alloc(fd); assert(listener_elem >= 0); poll_infos[listener_elem].vc = NULL; poll_infos[listener_elem].state = TCP_STATE_LISTENING; poll_infos[listener_elem].send_active = NULL; poll_infos[listener_elem].recv_active = NULL; poll_fds[listener_elem].events = POLLIN; /* record listener port */ addr_len = sizeof(addr); rc = getsockname(fd, (struct sockaddr *) &addr, &addr_len); assert(rc != -1); listener_port = ntohs(addr.sin_port); MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_INIT); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(){ /* MT - in a multi-threaded environment, finalize() should signal any thread(s) blocking on poll() or select() and wait for those threads to complete before destroying the progress engine data structures. */ int rc; int fd; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_FINALIZE); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); MPI_Barrier(MPI_COMM_WORLD); /* this barrier may not be necessary */ shutting_down = TRUE; MPI_Barrier(MPI_COMM_WORLD); /* Shut down the listener */ fd = poll_fds[listener_elem].fd; poll_fds[listener_elem].events = 0; poll_elem_free(listener_elem); rc = close(fd); assert(rc != -1); listener_elem = -1; listener_port = 0; /* XXX - need to cleanly shutdown other sockets. (close protocol???) */ if (poll_sz > 0) { MPIU_Free(poll_fds); MPIU_Free(poll_infos); poll_fds = NULL; poll_infos = NULL; poll_sz = 0; poll_num = 0; } MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_FINALIZE); return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Listener_get_port#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)short MPIDI_CH3I_Listener_get_port(){ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_LISTENER_GET_PORT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_LISTENER_GET_PORT); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_LISTENER_GET_PORT); return listener_port;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_TCP_post_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_TCP_post_connect(MPIDI_VC * vc){ char * key; char * val; int key_max_sz; int val_max_sz; struct hostent * hostent; struct sockaddr_in addr; short port; long flags; int nodelay; int fd; int elem; int rc; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_TCP_POST_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_TCP_POST_CONNECT); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); if (vc->tcp.fd >= 0) { goto fn_exit; } vc->tcp.state = MPIDI_CH3I_VC_STATE_CONNECTING; key_max_sz = PMI_KVS_Get_key_length_max(); key = MPIU_Malloc(key_max_sz); assert(key != NULL); val_max_sz = PMI_KVS_Get_value_length_max(); val = MPIU_Malloc(val_max_sz); assert(val != NULL); rc = snprintf(key, key_max_sz, "P%d-hostname", vc->tcp.pg_rank); assert(rc > -1 && rc < key_max_sz); rc = PMI_KVS_Get(vc->tcp.pg->kvs_name, key, val); assert(rc == 0); hostent = gethostbyname(val); assert (hostent != NULL); assert(hostent->h_addrtype == AF_INET); assert(hostent->h_length == sizeof(addr.sin_addr.s_addr)); memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; memcpy(&addr.sin_addr.s_addr, hostent->h_addr_list[0], sizeof(addr.sin_addr.s_addr)); /* TODO: rewrite the above code to save all addrs. each addr should be tried until either a connection is successfully established or all addrs have been tried. this obviously complicates the progress engine error handling as well as the data structures that must be maintained between calls. */ rc = snprintf(key, key_max_sz, "P%d-port", vc->tcp.pg_rank); assert(rc > -1 && rc < key_max_sz); rc = PMI_KVS_Get(vc->tcp.pg->kvs_name, key, val); assert(rc == 0); rc = sscanf(val, "%hd", &port); assert(rc == 1); addr.sin_port = htons(port); fd = socket(PF_INET, SOCK_STREAM, 0); assert(fd != -1); flags = fcntl(fd, F_GETFL, 0); assert(flags != -1); rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); assert(rc != -1); nodelay = 1; rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); assert(rc == 0); elem = poll_elem_alloc(fd); assert(elem >= 0); poll_infos[elem].vc = vc; poll_infos[elem].state = TCP_STATE_CONNECTING; poll_infos[elem].send_active = NULL; poll_infos[elem].recv_active = NULL; poll_fds[elem].events = 0; vc->tcp.poll_elem = elem; DBGMSG((65, "calling connect()")); do { rc = connect(fd, (struct sockaddr *) &addr, sizeof(addr)); } while (rc == -1 && errno == EINTR); if (rc == 0) { DBGMSG((65, "connect() suceeded immediately")); /* TODO: if connection alredy formed, start sending open req msg */ poll_fds[elem].events |= POLLOUT; } else if (errno == EINPROGRESS) { DBGMSG((65, "connect() pending")); poll_fds[elem].events |= POLLOUT; } else { DBGMSG((65, "connect() failed")); assert(errno == EINPROGRESS); poll_fds[elem].events |= POLLOUT; vc->tcp.poll_elem = -1; vc->tcp.state = MPIDI_CH3I_VC_STATE_FAILED; } MPIU_Free(val); MPIU_Free(key); fn_exit: MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_TCP_POST_CONNECT); return MPI_SUCCESS;}void MPIDI_CH3I_TCP_post_read(MPIDI_VC * vc, MPID_Request * req){ register int elem = vc->tcp.poll_elem; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_TCP_POST_READ); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_TCP_POST_READ); assert(poll_infos[elem].vc == vc); assert(poll_infos[elem].vc->tcp.fd == poll_fds[elem].fd); poll_infos[elem].recv_active = req; poll_fds[elem].events |= POLLIN; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_TCP_POST_READ);}void MPIDI_CH3I_TCP_post_write(MPIDI_VC * vc, MPID_Request * req){ register int elem = vc->tcp.poll_elem; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_TCP_POST_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_TCP_POST_WRITE); assert(poll_infos[elem].vc == vc); assert(poll_infos[elem].vc->tcp.fd == poll_fds[elem].fd); /* req better be the request at the head of the send queue */ poll_infos[elem].send_active = req; poll_fds[elem].events |= POLLOUT; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_TCP_POST_WRITE);}/* * MPIDI_CH3I_Request_adjust_iov() * * Adjust the iovec in the request by the supplied number of bytes. If the iovec has been consumed, return true; otherwise return * false. */#undef FUNCNAME#define FUNCNAME MPIDI_CH3U_Request_adjust_iov#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Request_adjust_iov(MPID_Request * req, MPIDI_msg_sz_t nb){ int offset = req->tcp.iov_offset; const int count = req->ch3.iov_count; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_REQUEST_ADJUST_IOV); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_REQUEST_ADJUST_IOV); while (offset < count) { if (req->ch3.iov[offset].MPID_IOV_LEN <= nb) { nb -= req->ch3.iov[offset].MPID_IOV_LEN; offset++; } else { req->ch3.iov[offset].MPID_IOV_BUF = (char *) req->ch3.iov[offset].MPID_IOV_BUF + nb; req->ch3.iov[offset].MPID_IOV_LEN -= nb; req->tcp.iov_offset = offset; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_REQUEST_ADJUST_IOV); return FALSE; } } req->tcp.iov_offset = offset; MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_REQUEST_ADJUST_IOV); return TRUE;}#undef FUNCNAME#undef FCNAMEstatic inline void post_pkt_send(int elem){ MPIDI_STATE_DECL(MPID_STATE_POST_PKT_SEND); MPIDI_FUNC_ENTER(MPID_STATE_POST_PKT_SEND); poll_infos[elem].req.ch3.iov[0].MPID_IOV_BUF = (char *)&poll_infos[elem].req.tcp.pkt; poll_infos[elem].req.ch3.iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t); poll_infos[elem].req.ch3.iov_count = 1; poll_infos[elem].req.tcp.iov_offset = 0; poll_infos[elem].req.ch3.ca = MPIDI_CH3I_CA_HANDLE_PKT; poll_infos[elem].send_active = &poll_infos[elem].req; poll_fds[elem].events |= POLLOUT; /* TODO: try sending pkt immediately - if we are already processing sends for this connection, then the send will be attempted as soon as we return to handle_pollout() */ MPIDI_FUNC_EXIT(MPID_STATE_POST_PKT_SEND);}static inline void post_pkt_recv(int elem){ MPIDI_STATE_DECL(MPID_STATE_POST_PKT_RECV); MPIDI_FUNC_ENTER(MPID_STATE_POST_PKT_RECV); poll_infos[elem].req.ch3.iov[0].MPID_IOV_BUF = (char *)&poll_infos[elem].req.tcp.pkt; poll_infos[elem].req.ch3.iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t); poll_infos[elem].req.ch3.iov_count = 1; poll_infos[elem].req.tcp.iov_offset = 0; poll_infos[elem].req.ch3.ca = MPIDI_CH3I_CA_HANDLE_PKT; poll_infos[elem].recv_active = &poll_infos[elem].req; poll_fds[elem].events |= POLLIN; MPIDI_FUNC_EXIT(MPID_STATE_POST_PKT_RECV);}#undef FUNCNAME#define FUNCNAME post_queued_send#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void post_queued_send(int elem){ MPIDI_VC * vc = poll_infos[elem].vc; MPIDI_STATE_DECL(MPID_STATE_POST_QUEUED_SEND); MPIDI_FUNC_ENTER(MPID_STATE_POST_QUEUED_SEND); assert(vc != NULL); poll_infos[elem].send_active = MPIDI_CH3I_SendQ_head(vc); /* MT */ if (poll_infos[elem].send_active != NULL) { MPIDI_DBG_PRINTF((75, FCNAME, "elem=%d, queued message, send active", elem)); poll_fds[elem].events |= POLLOUT; } else { MPIDI_DBG_PRINTF((75, FCNAME, "elem=%d, queue empty, send deactivated", elem)); poll_fds[elem].events &= ~POLLOUT; } MPIDI_FUNC_EXIT(MPID_STATE_POST_QUEUED_SEND);}#undef FUNCNAME#define FUNCNAME handle_error#undef FCNAME
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -