📄 ch3_progress.c
字号:
conn->state = CONN_STATE_OPEN_CSEND; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_OPEN_REQ); conn->pkt.sc_open_req.pg_id_len = (int) strlen(MPIDI_Process.my_pg->id) + 1; conn->pkt.sc_open_req.pg_rank = MPIR_Process.comm_world->rank; connection_post_send_pkt_and_pgid(conn); } else { /* CONN_STATE_CONNECT_ACCEPT */ int port_name_tag; MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_OPEN_CSEND"); conn->state = CONN_STATE_OPEN_CSEND; /* pkt contains port name tag. In memory debugging mode, MPIDI_Pkt_init resets the packet contents. Therefore, save the port name tag and then add it back. */ port_name_tag = conn->pkt.sc_conn_accept.port_name_tag; MPIDI_Pkt_init(&conn->pkt, MPIDI_CH3I_PKT_SC_CONN_ACCEPT); conn->pkt.sc_conn_accept.port_name_tag = port_name_tag; mpi_errno = connection_post_send_pkt(conn); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_INTERN, "**ch3|sock|scconnaccept"); } } break; } case MPIDU_SOCK_OP_CLOSE: { MPIDI_CH3I_Connection_t * conn = (MPIDI_CH3I_Connection_t *) event->user_ptr; /* If the conn pointer is NULL then the close was intentional */ if (conn != NULL) { if (conn->state == CONN_STATE_CLOSING) { MPIU_Assert(conn->send_active == NULL); MPIU_Assert(conn->recv_active == NULL); if (conn->vc != NULL) { MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_UNCONNECTED"); conn->vc->ch.state = MPIDI_CH3I_VC_STATE_UNCONNECTED; conn->vc->ch.sock = MPIDU_SOCK_INVALID_SOCK; MPIDI_CH3U_Handle_connection(conn->vc, MPIDI_VC_EVENT_TERMINATED); } } else { MPIU_Assert(conn->state == CONN_STATE_LISTENING); MPIDI_CH3I_listener_conn = NULL; MPIDI_CH3I_listener_port = 0; MPIDI_CH3_Progress_signal_completion(); /* MPIDI_CH3I_progress_completion_count++; */ } conn->sock = MPIDU_SOCK_INVALID_SOCK; MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CLOSED"); conn->state = CONN_STATE_CLOSED; connection_free(conn); } break; } case MPIDU_SOCK_OP_WAKEUP: { MPIDI_CH3_Progress_signal_completion(); /* MPIDI_CH3I_progress_completion_count++; */ break; } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SOCK_EVENT); return mpi_errno; fn_fail: goto fn_exit;}/* end MPIDI_CH3I_Progress_handle_sock_event() */#if (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE)#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_delay#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_delay(unsigned int completion_count){ int mpi_errno = MPI_SUCCESS; # if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MONITOR) {# error This is so not right. But what is the correct technique? if (MPIU_Monitor_closet_get_occupancy_count(MPIR_Process.global_closet) > 0) { MPIU_Monitor_continue(MPIR_Process.global_monitor, MPIR_Process.global_closet); MPIU_Monitor_enter(MPIR_Process.global_monitor); if (completion_count != MPIDI_CH3I_progress_completion_count) { goto impl_exit; } } MPIU_Monitor_delay(MPIR_Process.global_monitor, MPIR_Process.global_closet); impl_exit: { } }# elif (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) {# if defined(USE_CH3I_PROGRESS_DELAY_QUEUE) { int rc; struct MPIDI_CH3I_Progress_delay_queue_elem dq_elem; dq_elem.count = completion_count; dq_elem.flag = FALSE; dq_elem.next = NULL; MPIDI_CH3I_Progress_delay_queue_tail->next = &dq_elem; MPIDI_CH3I_Progress_delay_queue_tail = &dq_elem; if (MPIDI_CH3I_Progress_delay_queue_head == NULL) { MPIDI_CH3I_Progress_delay_queue_head = &dq_elem; } rc = MPID_Thread_cond_create(&dq_elem.cond, NULL); /* --BEGIN ERROR HANDLING-- */ if (rc != 0) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); goto impl_exit; } /* --END ERROR HANDLING-- */ do { MPID_Thread_cond_wait(&dq_elem.cond, &MPIR_Process.global_mutex); } while(dq_elem.flag == FALSE); MPID_Thread_cond_destroy(&dq_elem.cond, NULL); impl_exit: { } }# else { while (completion_count == MPIDI_CH3I_progress_completion_count) { MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_Process.global_mutex); } }# endif }# endif return mpi_errno;}/* end MPIDI_CH3I_Progress_delay() */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_continue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Progress_continue(unsigned int completion_count){ int mpi_errno = MPI_SUCCESS;# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MONITOR) {# error This is so not right. But what is the correct technique? if (MPIU_Monitor_closet_get_occupancy(MPIR_Process.global_closet) > 0) { MPIU_Monitor_continue(MPIR_Process.global_monitor, MPIR_Process.global_closet); } else { MPIU_Monitor_exit(MPIR_Process.global_monitor); } }# elif (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) {# if defined(USE_CH3I_PROGRESS_DELAY_QUEUE) { struct MPIDI_CH3I_Progress_delay_queue_elem * dq_elem; dq_elem = MPIDI_CH3I_Progress_delay_queue_head; while(dq_elem != NULL && dq_elem->count != completion_count) { dq_elem->flag = TRUE; MPID_Thread_cond_signal(&dq_elem->cond); dq_elem = dq_elem->next; } MPIDI_CH3I_Progress_delay_queue_head = dq_elem; }# else { MPID_Thread_cond_broadcast(&MPIDI_CH3I_progress_completion_cond); }# endif }# endif return mpi_errno;}/* end MPIDI_CH3I_Progress_continue() */#endif /* (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL) */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_VC_post_connect#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_VC_post_connect(MPIDI_VC_t * vc){ int mpi_errno = MPI_SUCCESS; char key[MPIDI_MAX_KVS_KEY_LEN]; char val[MPIDI_MAX_KVS_VALUE_LEN]; char host_description[MAX_HOST_DESCRIPTION_LEN]; int port; unsigned char ifaddr[4]; int hasIfaddr = 0; int rc; MPIDI_CH3I_Connection_t * conn = 0; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT); MPIDI_DBG_PRINTF((60, FCNAME, "entering")); MPIU_Assert(vc->ch.state == MPIDI_CH3I_VC_STATE_UNCONNECTED); MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_CONNECTING"); vc->ch.state = MPIDI_CH3I_VC_STATE_CONNECTING; rc = MPIU_Snprintf(key, MPIDI_MAX_KVS_KEY_LEN, "P%d-businesscard", vc->pg_rank); /* --BEGIN ERROR HANDLING-- */ if (rc < 0 || rc > MPIDI_MAX_KVS_KEY_LEN) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", NULL); goto fn_exit; } /* --END ERROR HANDLING-- */ mpi_errno = MPIDI_KVS_Get(vc->pg->ch.kvs_name, key, val); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } mpi_errno = MPIDU_Sock_get_conninfo_from_bc( val, host_description, sizeof(host_description), &port, ifaddr, &hasIfaddr ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } mpi_errno = MPIDI_CH3I_Connection_alloc(&conn); if (mpi_errno == MPI_SUCCESS) { /* FIXME: This is a hack to allow Windows to continue to use the host description string instead of the interface address bytes when posting a socket connection. This should be fixed by changing the Sock_post_connect to only accept interface address. See also channels/ssm/ch3_progress_connect.c */#ifndef HAVE_WINDOWS_H if (hasIfaddr) { mpi_errno = MPIDU_Sock_post_connect_ifaddr(MPIDI_CH3I_sock_set, conn, ifaddr, port, &conn->sock); } else #endif { mpi_errno = MPIDU_Sock_post_connect(MPIDI_CH3I_sock_set, conn, host_description, port, &conn->sock); } if (mpi_errno == MPI_SUCCESS) { MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to CONN_STATE_CONNECTING"); vc->ch.sock = conn->sock; vc->ch.conn = conn; conn->vc = vc; conn->state = CONN_STATE_CONNECTING; conn->send_active = NULL; conn->recv_active = NULL; } /* --BEGIN ERROR HANDLING-- */ else { MPIU_DBG_MSG(CH3_CONNECT,TYPICAL,"Setting state to VC_STATE_FAILED"); vc->ch.state = MPIDI_CH3I_VC_STATE_FAILED; mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ch3|sock|postconnect", "**ch3|sock|postconnect %d %d %s", MPIR_Process.comm_world->rank, vc->pg_rank, val); goto fn_fail; } /* --END ERROR HANDLING-- */ } else { MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER, "**ch3|sock|connalloc"); } fn_exit: MPIDI_DBG_PRINTF((60, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_VC_POST_CONNECT); return mpi_errno; fn_fail: /* --BEGIN ERROR HANDLING-- */ if (conn) { connection_free(conn); } goto fn_exit; /* --END ERROR HANDLING-- */}/* end MPIDI_CH3I_VC_post_connect() */#undef FUNCNAME#define FUNCNAME connection_free#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void connection_free(MPIDI_CH3I_Connection_t * conn){ MPIDI_STATE_DECL(MPID_STATE_CONNECTION_FREE); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_FREE); MPIU_Free(conn->pg_id); MPIU_Free(conn); MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_FREE);}#undef FUNCNAME#define FUNCNAME connection_post_sendq_req#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_sendq_req(MPIDI_CH3I_Connection_t * conn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SENDQ_REQ); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SENDQ_REQ); /* post send of next request on the send queue */ conn->send_active = MPIDI_CH3I_SendQ_head(conn->vc); /* MT */ if (conn->send_active != NULL) { mpi_errno = MPIDU_Sock_post_writev(conn->sock, conn->send_active->dev.iov, conn->send_active->dev.iov_count, NULL); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", NULL); } /* --END ERROR HANDLING-- */ } MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SENDQ_REQ); return mpi_errno;}#undef FUNCNAME#define FUNCNAME connection_post_send_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_send_pkt(MPIDI_CH3I_Connection_t * conn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SEND_PKT); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SEND_PKT); mpi_errno = MPIDU_Sock_post_write(conn->sock, &conn->pkt, sizeof(conn->pkt), sizeof(conn->pkt), NULL); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER, "**fail"); } MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SEND_PKT); return mpi_errno;}#undef FUNCNAME#define FUNCNAME connection_post_recv_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_RECV_PKT); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_RECV_PKT); mpi_errno = MPIDU_Sock_post_read(conn->sock, &conn->pkt, sizeof(conn->pkt), sizeof(conn->pkt), NULL); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER, "**fail"); } MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_RECV_PKT); return mpi_errno;}#undef FUNCNAME#define FUNCNAME connection_post_send_pkt_and_pgid#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void connection_post_send_pkt_and_pgid(MPIDI_CH3I_Connection_t * conn){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID); MPIDI_FUNC_ENTER(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID); conn->iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) &conn->pkt; conn->iov[0].MPID_IOV_LEN = (int) sizeof(conn->pkt); conn->iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) MPIDI_Process.my_pg->id; conn->iov[1].MPID_IOV_LEN = (int) strlen(MPIDI_Process.my_pg->id) + 1; mpi_errno = MPIDU_Sock_post_writev(conn->sock, conn->iov, 2, NULL); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER, "**fail"); } MPIDI_FUNC_EXIT(MPID_STATE_CONNECTION_POST_SEND_PKT_AND_PGID);}/* FIXME: What is this routine for? */#undef FUNCNAME#define FUNCNAME adjust_iov#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb){ MPID_IOV * const iov = *iovp; const int count = *countp; int offset = 0; while (offset < count) { if (iov[offset].MPID_IOV_LEN <= nb) { nb -= iov[offset].MPID_IOV_LEN; offset++; } else { iov[offset].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)((char *) iov[offset].MPID_IOV_BUF + nb); iov[offset].MPID_IOV_LEN -= nb; break; } } *iovp += offset; *countp -= offset; return (*countp == 0);}/* end adjust_iov() */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -