📄 ch3_progress.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidi_ch3_impl.h"#include "pmi.h"int MPIDI_CH3I_inside_handler;gasnet_token_t MPIDI_CH3I_gasnet_token;volatile unsigned int MPIDI_CH3I_progress_completions = 0;struct MPID_Request *MPIDI_CH3I_sendq_head[CH3_NUM_QUEUES];struct MPID_Request *MPIDI_CH3I_sendq_tail[CH3_NUM_QUEUES];struct MPID_Request *MPIDI_CH3I_active_send[CH3_NUM_QUEUES];static int send_enqueuedv (MPIDI_VC_t *vc, MPID_Request *sreq);static int do_put (MPIDI_VC_t * vc, MPID_Request *sreq);#if !defined(MPIDI_CH3_Progress_start)#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_start#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3_Progress_start(MPID_Progress_state * state){ /* MT - This function is empty for the single-threaded implementation */ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_START); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_START); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_START);}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress(int is_blocking){ int rc; unsigned completions = MPIDI_CH3I_progress_completions; int mpi_errno = MPI_SUCCESS; int complete; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS);#if defined(MPICH_DBG_OUTPUT) { if (is_blocking) { MPIDI_DBG_PRINTF((50, FCNAME, "entering, blocking=%s", is_blocking ? "true" : "false")); } }#endif /* we better not be inside the handler: we can't send from inside * the handler, so we wouldn't be able to make progress */ MPIU_Assert (!MPIDI_CH3I_inside_handler); do { MPID_Request *sreq; gasnet_AMPoll (); sreq = MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE]; if (sreq) { complete = 0; mpi_errno = MPIDI_CH3_iWrite (sreq->gasnet.vc, sreq); if (mpi_errno != MPI_SUCCESS) { /* currently iwrite always returns MPI_SUCCESS */ MPID_Abort(NULL, mpi_errno, -1, "Internal error"); } if (sreq->dev.iov_count == 0) { mpi_errno = MPIDI_CH3U_Handle_send_req (sreq->gasnet.vc, sreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPID_Abort(NULL, mpi_errno, -1, "Internal error"); } } if (complete) { MPIDI_CH3I_SendQ_dequeue (CH3_NORMAL_QUEUE); MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = NULL; } } else { sreq = MPIDI_CH3I_SendQ_head (CH3_NORMAL_QUEUE); if (sreq) { complete = 0; mpi_errno = send_enqueuedv (sreq->gasnet.vc, sreq); if (mpi_errno != MPI_SUCCESS) { MPID_Abort(NULL, mpi_errno, -1, "Internal error"); } if (sreq->dev.iov_count == 0) { mpi_errno = MPIDI_CH3U_Handle_send_req (sreq->gasnet.vc, sreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPID_Abort(NULL, mpi_errno, -1, "Internal error"); } } if (complete) { MPIDI_CH3I_SendQ_dequeue (CH3_NORMAL_QUEUE); } else { MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq; } } } /* handle rendezvous puts */ sreq = MPIDI_CH3I_SendQ_head (CH3_RNDV_QUEUE); if (sreq) { MPIDI_DBG_PRINTF((55, FCNAME, "handle rendezvous puts\n")); DUMP_REQUEST(sreq); switch (sreq->gasnet.rndv_state) { case MPIDI_CH3_RNDV_NEW: gasnet_begin_nbi_accessregion (); case MPIDI_CH3_RNDV_CURRENT: do_put (sreq->gasnet.vc, sreq); if (sreq->gasnet.remote_iov_count == 0 || sreq->dev.iov_count == 0) { sreq->gasnet.rndv_state = MPIDI_CH3_RNDV_WAIT; sreq->gasnet.rndv_handle = gasnet_end_nbi_accessregion (); } else { sreq->gasnet.rndv_state = MPIDI_CH3_RNDV_CURRENT; } break; case MPIDI_CH3_RNDV_WAIT: if (gasnet_try_syncnb (sreq->gasnet.rndv_handle) == GASNET_OK) { if (sreq->gasnet.remote_iov_count == 0) { int gn_errno; gn_errno = gasnet_AMRequestShort2 (sreq->gasnet.vc->lpid, MPIDI_CH3_reload_IOV_or_done_handler_id, sreq->gasnet.remote_req_id, sreq->handle); if (gn_errno != GASNET_OK) { MPID_Abort (NULL, MPI_SUCCESS, -1, "GASNet send failed"); } MPIDI_CH3I_SendQ_dequeue (CH3_RNDV_QUEUE); } if (sreq->dev.iov_count == 0) { MPIDI_CH3U_Handle_send_req (sreq->gasnet.vc, sreq, &complete); sreq->gasnet.iov_offset = 0; } } break; default: MPID_Abort (NULL, MPI_SUCCESS, -1, "Internal error"); } } } while (completions == MPIDI_CH3I_progress_completions && is_blocking); #if defined(MPICH_DBG_OUTPUT) { if (is_blocking) { MPIDI_DBG_PRINTF((50, FCNAME, "exiting")); } }#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS); return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_start_packet_handler#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)voidMPIDI_CH3_start_packet_handler (gasnet_token_t token, void* buf, size_t data_sz){ int mpi_errno; int gn_errno; gasnet_node_t sender; MPIDI_VC_t *vc; MPID_Request *rreq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_START_PACKET_HANDLER); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_START_PACKET_HANDLER); MPIDI_DBG_PRINTF((50, FCNAME, "entering")); MPIDI_CH3I_inside_handler = 1; MPIDI_CH3I_gasnet_token = token; gn_errno = gasnet_AMGetMsgSource (token, &sender); if (gn_errno != GASNET_OK) { MPID_Abort(NULL, MPI_SUCCESS, -1, "GASNet AMGetMesgSource failed"); } MPIDI_DBG_PRINTF((55, FCNAME, " sender = %d\n", sender)); MPIDI_PG_Get_vc(MPIDI_Process.my_pg, sender, &vc); vc->gasnet.data = (void *) ((char *)buf + sizeof (MPIDI_CH3_Pkt_t)); vc->gasnet.data_sz = data_sz - sizeof (MPIDI_CH3_Pkt_t); mpi_errno = MPIDI_CH3U_Handle_recv_pkt (vc, (MPIDI_CH3_Pkt_t *)buf, &rreq); if (mpi_errno != MPI_SUCCESS) { MPID_Abort(NULL, mpi_errno, -1, "Internal error"); } if (rreq) { mpi_errno = MPIDI_CH3_iRead (vc, rreq); if (mpi_errno != MPI_SUCCESS) { MPID_Abort(NULL, mpi_errno, -1, "Internal error"); } } MPIDI_CH3I_inside_handler = 0; MPIDI_DBG_PRINTF((50, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_START_PACKET_HANDLER);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_continue_packet_handler#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)voidMPIDI_CH3_continue_packet_handler (gasnet_token_t token, void* buf, size_t data_sz){ int mpi_errno; int gn_errno; gasnet_node_t sender; MPIDI_VC_t *vc; MPID_Request * rreq; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_CONTINUE_PACKET_HANDLER); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_CONTINUE_PACKET_HANDLER); MPIDI_DBG_PRINTF((50, FCNAME, "entering")); MPIDI_CH3I_inside_handler = 1; MPIDI_CH3I_gasnet_token = token; gn_errno = gasnet_AMGetMsgSource (token, &sender); if (gn_errno != GASNET_OK) { MPID_Abort(NULL, MPI_SUCCESS, -1, "GASNet AMGetMsgSource failed"); } MPIDI_DBG_PRINTF((55, FCNAME, " sender = %d\n", sender)); MPIDI_PG_Get_vc(MPIDI_Process.my_pg, sender, &vc); vc->gasnet.data = buf; vc->gasnet.data_sz = data_sz; rreq = vc->gasnet.recv_active; MPIU_Assert (rreq); mpi_errno = MPIDI_CH3_iRead (vc, rreq); if (mpi_errno != MPI_SUCCESS) { MPID_Abort(NULL, mpi_errno, -1, "Internal error"); } MPIDI_CH3I_inside_handler = 0; MPIDI_DBG_PRINTF((50, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_CONTINUE_PACKET_HANDLER);}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_CTS_packet_handler#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)voidMPIDI_CH3_CTS_packet_handler (gasnet_token_t token, void* buf, size_t buf_sz, MPI_Request sreq_id, MPI_Request rreq_id, int remote_data_sz, int n_iov){#if 0 int mpi_errno; int gn_errno; gasnet_node_t sender; MPIDI_VC_t *vc; MPID_Request *sreq; MPID_IOV *iov = (MPID_IOV *)buf; int i; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_CTS_PACKET_HANDLER); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_CTS_PACKET_HANDLER); MPIU_Assert (n_iov * sizeof (MPID_IOV) == buf_sz); MPIDI_DBG_PRINTF((50, FCNAME, "entering")); MPIDI_CH3I_inside_handler = 1; MPIDI_CH3I_gasnet_token = token; MPID_Request_get_ptr(sreq_id, sreq); /* check if message needs to be truncated */ if (remote_data_sz < sreq->dev.segment_size) { MPID_Segment_init(sreq->dev.user_buf, sreq->dev.user_count, sreq->dev.datatype, &sreq->dev.segment, 0); sreq->dev.iov_count = MPID_IOV_LIMIT; sreq->dev.segment_first = 0; sreq->dev.segment_size = remote_data_sz; mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &sreq->dev.iov[0], &sreq->dev.iov_count); if (mpi_errno != MPI_SUCCESS) { MPID_Abort(NULL, mpi_errno, -1, "Internal error"); } } for (i = 0; i < n_iov; ++i) { sreq->gasnet.remote_iov[i] = iov[i]; } sreq->gasnet.remote_iov_count = n_iov; sreq->gasnet.remote_req_id = rreq_id; sreq->gasnet.iov_bytes = 0; sreq->gasnet.remote_iov_bytes = 0; sreq->gasnet.iov_offset = 0; sreq->gasnet.remote_iov_offset = 0; sreq->gasnet.rndv_state = MPIDI_CH3_RNDV_NEW; MPIDI_CH3I_SendQ_enqueue(sreq, CH3_RNDV_QUEUE); MPIDI_CH3I_inside_handler = 0; MPIDI_DBG_PRINTF((50, FCNAME, "exiting")); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_CTS_PACKET_HANDLER);#else abort ();#endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -