📄 gm_module_send.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2006 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "gm_module_impl.h"typedef struct sendbuf{ struct sendbuf *next; int node_id; int port_id; MPIDI_msg_sz_t datalen; packet_t pkt;} sendbuf_t;static struct {sendbuf_t *top;} sendbuf_stack;static struct {MPID_Request *head, *tail;} send_queue;static MPID_Request *active_send;static sendbuf_t *sendbufs;#define SENDBUF_S_EMPTY() GENERIC_S_EMPTY(sendbuf_stack)#define SENDBUF_S_PUSH(sbp) GENERIC_S_PUSH(&sendbuf_stack, sbp, next)#define SENDBUF_S_POP(sbpp) GENERIC_S_POP(&sendbuf_stack, sbpp, next)#define SEND_Q_EMPTY() GENERIC_Q_EMPTY(send_queue)#define SEND_Q_ENQUEUE(reqp) GENERIC_Q_ENQUEUE(&send_queue, reqp, dev.next)#define SEND_Q_DEQUEUE(reqpp) GENERIC_Q_DEQUEUE(&send_queue, reqpp, dev.next)int MPID_nem_gm_module_send(MPIDI_VC_t *vc, MPID_nem_cell_ptr_t cell, int datalen){ MPIU_Assert(0); return -1;}#undef FUNCNAME#define FUNCNAME MPID_nem_gm_module_send_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_nem_gm_module_send_init(){ int mpi_errno = MPI_SUCCESS; int i; gm_status_t status; MPIU_CHKPMEM_DECL(1); active_send = NULL; send_queue.head = send_queue.tail = NULL; sendbuf_stack.top = NULL; MPIU_CHKPMEM_MALLOC(sendbufs, sendbuf_t *, MPID_nem_module_gm_num_send_tokens * sizeof(sendbuf_t), mpi_errno, "sendbufs"); status = gm_register_memory(MPID_nem_module_gm_port, (void *)sendbufs, MPID_nem_module_gm_num_send_tokens * sizeof (sendbuf_t)); MPIU_ERR_CHKANDJUMP1(status != GM_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**gm_regmem", "**gm_regmem %d", status); for (i = 0; i < MPID_nem_module_gm_num_send_tokens; ++i) { sendbufs[i].next = NULL; SENDBUF_S_PUSH(&sendbufs[i]); } MPIU_CHKPMEM_COMMIT(); fn_exit: return mpi_errno; fn_fail: MPIU_CHKPMEM_REAP(); goto fn_exit;}static void send_callback(struct gm_port *p, void *context, gm_status_t status){ sendbuf_t *sb = (sendbuf_t *)context; printf_d ("send_callback()\n"); if (status != GM_SUCCESS) { gm_perror ("Send error", status); } ++MPID_nem_module_gm_num_send_tokens; SENDBUF_S_PUSH(sb);}/* sends a packet consisting of a header and some of the data pointed to by *dataptr. *dataptr and *dataleft will be updated to reflect the portion of the data not yet sent. */#undef FUNCNAME#define FUNCNAME send_header_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void send_header_pkt(int node_id, int port_id, int source_id, void *hdr, MPIDI_msg_sz_t hdr_sz, char **dataptr, MPIDI_msg_sz_t *dataleft){ sendbuf_t *sb; MPIDI_msg_sz_t payload_len; MPIDI_STATE_DECL(MPID_STATE_SEND_HEADER_PKT); MPIDI_FUNC_ENTER(MPID_STATE_SEND_HEADER_PKT); MPIU_Assert(MPID_nem_module_gm_num_send_tokens); MPIU_Assert(!SENDBUF_S_EMPTY()); MPIU_Assert(SENDPKT_DATALEN > sizeof(MPIDI_CH3_Pkt_t)); SENDBUF_S_POP(&sb); payload_len = (*dataleft > SENDPKT_DATALEN - sizeof(MPIDI_CH3_Pkt_t)) ? SENDPKT_DATALEN - sizeof(MPIDI_CH3_Pkt_t) : *dataleft; sb->node_id = node_id; sb->port_id = port_id; sb->datalen = PKT_HEADER_LEN + sizeof(MPIDI_CH3_Pkt_t) + payload_len; sb->pkt.source_id = source_id; /* copy header, then copy data starting at max header length */ MPID_NEM_MEMCPY(&sb->pkt.buf, hdr, hdr_sz); MPID_NEM_MEMCPY(((char *)&sb->pkt.buf) + sizeof(MPIDI_CH3_Pkt_t), *dataptr, payload_len); gm_send_with_callback(MPID_nem_module_gm_port, &sb->pkt, PACKET_SIZE, sb->datalen, GM_LOW_PRIORITY, node_id, port_id, send_callback, (void *)sb); --MPID_nem_module_gm_num_send_tokens; *dataleft -= payload_len; *dataptr += payload_len; MPIDI_FUNC_EXIT(MPID_STATE_SEND_HEADER_PKT);}/* sends a packet consisting of some of the data pointed to by *dataptr. *dataptr and *dataleft will be updated to reflect the portion of the data not yet sent. */#undef FUNCNAME#define FUNCNAME send_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void send_pkt(int node_id, int port_id, int source_id, char **dataptr, MPIDI_msg_sz_t *dataleft){ sendbuf_t *sb; MPIDI_msg_sz_t payload_len; MPIDI_STATE_DECL(MPID_STATE_SEND_PKT); MPIDI_FUNC_ENTER(MPID_STATE_SEND_PKT); MPIU_Assert(!SENDBUF_S_EMPTY()); if (*dataleft == 0) goto fn_exit; payload_len = ((*dataleft > SENDPKT_DATALEN) ? SENDPKT_DATALEN : *dataleft); SENDBUF_S_POP(&sb); sb->node_id = node_id; sb->port_id = port_id; sb->datalen = PKT_HEADER_LEN + payload_len; sb->pkt.source_id = source_id; MPID_NEM_MEMCPY(&sb->pkt.buf, *dataptr, payload_len); gm_send_with_callback (MPID_nem_module_gm_port, &sb->pkt, PACKET_SIZE, sb->datalen, GM_LOW_PRIORITY, sb->node_id, sb->port_id, send_callback, (void *)sb); --MPID_nem_module_gm_num_send_tokens; *dataleft -= payload_len; *dataptr += payload_len; fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_SEND_PKT);}/* sends a packet consisting of an optional header and noncontiguous data described by a segment. */#undef FUNCNAME#define FUNCNAME send_noncontig_pkt#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static inline void send_noncontig_pkt(int node_id, int port_id, int source_id, void *hdr, MPIDI_msg_sz_t hdr_sz, MPID_Segment *segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_size){ sendbuf_t *sb; MPIDI_msg_sz_t payload_len; MPIDI_msg_sz_t last; MPIDI_msg_sz_t h_len; MPIDI_STATE_DECL(MPID_STATE_SEND_NONCONTIG_PKT); MPIDI_FUNC_ENTER(MPID_STATE_SEND_NONCONTIG_PKT); MPIU_Assert(MPID_nem_module_gm_num_send_tokens); MPIU_Assert(!SENDBUF_S_EMPTY()); MPIU_Assert(SENDPKT_DATALEN > sizeof(MPIDI_CH3_Pkt_t)); SENDBUF_S_POP(&sb); if (hdr) { /* copy header */ MPID_NEM_MEMCPY(&sb->pkt.buf, hdr, hdr_sz); h_len = sizeof(MPIDI_CH3_Pkt_t); } else h_len = 0; /* copy data */ if (h_len + segment_size - *segment_first <= SENDPKT_DATALEN) last = segment_size; else last = *segment_first + SENDPKT_DATALEN - h_len; MPID_Segment_pack(segment, *segment_first, &last, ((char *)&sb->pkt.buf) + h_len); payload_len = h_len + last - *segment_first; *segment_first = last; sb->node_id = node_id; sb->port_id = port_id; sb->datalen = PKT_HEADER_LEN + payload_len; sb->pkt.source_id = source_id; gm_send_with_callback(MPID_nem_module_gm_port, &sb->pkt, PACKET_SIZE, sb->datalen, GM_LOW_PRIORITY, node_id, port_id, send_callback, (void *)sb); --MPID_nem_module_gm_num_send_tokens; MPIDI_FUNC_EXIT(MPID_STATE_SEND_NONCONTIG_PKT);}#undef FUNCNAME#define FUNCNAME MPID_nem_send_from_queue#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_nem_send_from_queue(){ int mpi_errno = MPI_SUCCESS; char *dataptr; MPIDI_msg_sz_t datalen; int complete; while (active_send || !SEND_Q_EMPTY()) { /* handle the partially sent requests */ while (active_send) { MPIDI_VC_t *vc = active_send->ch.vc; if (MPID_nem_module_gm_num_send_tokens == 0) goto fn_exit; if (active_send->ch.noncontig) { /* send only if there's something left to send */ if (active_send->dev.segment_first != active_send->dev.segment_size) send_noncontig_pkt(VC_FIELD(vc, gm_node_id), VC_FIELD(vc, gm_port_id), VC_FIELD(vc, source_id), NULL, 0, active_send->dev.segment_ptr, &active_send->dev.segment_first, active_send->dev.segment_size); /* have we finished sending the whole message? */ complete = active_send->dev.segment_first == active_send->dev.segment_size; } else { dataptr = active_send->dev.iov[0].MPID_IOV_BUF; datalen = active_send->dev.iov[0].MPID_IOV_LEN; /* send only if there's something left to send */ if (datalen != 0) send_pkt(VC_FIELD(vc, gm_node_id), VC_FIELD(vc, gm_port_id), VC_FIELD(vc, source_id), &dataptr, &datalen); active_send->dev.iov[0].MPID_IOV_BUF = dataptr; active_send->dev.iov[0].MPID_IOV_LEN = datalen; /* have we finished sending the whole message? */ complete = datalen == 0; } if (complete) { /* finished sending message */ int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *); reqFn = active_send->dev.OnDataAvail; if (!reqFn) { MPIDI_CH3U_Request_complete(active_send); //MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete"); active_send = NULL; } else { complete = 0; mpi_errno = reqFn(vc, active_send, &complete); if (mpi_errno) MPIU_ERR_POP(mpi_errno); if (complete) { MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete"); active_send = NULL; } } } } if (!SEND_Q_EMPTY())
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -