📄 ibu_wait.ibal.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//** (C) 2001 by Argonne National Laboratory.* See COPYRIGHT in top-level directory.*/#include "mpidimpl.h"#include "ibu.h"#ifdef HAVE_STDIO_H#include <stdio.h>#endif#ifdef HAVE_MALLOC_H#include <malloc.h>#endif#include "mpidi_ch3_impl.h"#ifdef USE_IB_IBAL#include "ibuimpl.ibal.h"/*#define PRINT_IBU_WAIT*/#ifdef PRINT_IBU_WAIT#define MPIU_DBG_PRINTFX(a) MPIU_DBG_PRINTF(a)#else#define MPIU_DBG_PRINTFX(a)#endifchar * op2str(int wc_type){ static char str[20]; switch(wc_type) { case IB_WC_SEND: return "IB_WC_SEND"; case IB_WC_RDMA_WRITE: return "IB_WC_RDMA_WRITE"; case IB_WC_RECV: return "IB_WC_RECV"; case IB_WC_RDMA_READ: return "IB_WC_RDMA_READ"; case IB_WC_MW_BIND: return "IB_WC_MW_BIND"; case IB_WC_FETCH_ADD: return "IB_WC_FETCH_ADD"; case IB_WC_COMPARE_SWAP: return "IB_WC_COMPARE_SWAP"; case IB_WC_RECV_RDMA_WRITE: return "IB_WC_RECV_RDMA_WRITE"; } MPIU_Snprintf(str, 20, "%d", wc_type); return str;}void PrintWC(ib_wc_t *p){ MPIU_Msg_printf("Work Completion Descriptor:\n"); MPIU_Msg_printf(" id: %d\n", (int)p->wr_id); MPIU_Msg_printf(" opcode: %u = %s\n", p->wc_type, op2str(p->wc_type)); MPIU_Msg_printf(" opcode: %u \n", p->wc_type); MPIU_Msg_printf(" length: %d\n", p->length); MPIU_Msg_printf(" imm_data_valid: %d\n", (IB_RECV_OPT_IMMEDIATE & (int)p->recv.conn.recv_opt)); MPIU_Msg_printf(" imm_data: %u\n", (unsigned int)p->recv.conn.immediate_data); MPIU_Msg_printf(" remote_node_addr:\n"); // TODO where in ibal remote_node_addr? MPIU_Msg_printf(" type: %u = %s\n", // p->remote_node_addr.type, // remote_node_addr_sym(p->remote_node_addr.type)); MPIU_Msg_printf(" slid: %d\n", (int)p->recv.ud.remote_lid); MPIU_Msg_printf(" sl: %d\n", (int)p->recv.ud.remote_sl); MPIU_Msg_printf(" qp: %d\n", (int)p->recv.ud.remote_qp); MPIU_Msg_printf(" loc_eecn: %d\n", 0 /*TODO: loc_eecn*/); MPIU_Msg_printf(" grh_flag: %d\n", (IB_RECV_OPT_GRH_VALID & (int)p->recv.ud.recv_opt)); MPIU_Msg_printf(" pkey_ix: %d\n", p->recv.ud.pkey_index); MPIU_Msg_printf(" status: %u = %s\n", (int)p->status, ib_get_wc_status_str(p->status)); MPIU_Msg_printf(" vendor_err_syndrome: %d\n", (IB_RECV_OPT_VEND_MASK & (int)p->recv.ud.recv_opt)); fflush(stdout);}#undef FUNCNAME#define FUNCNAME ibu_poll#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)ib_api_status_t ibu_poll(ib_wc_t *completion_data){ int i, RDMA_buf_water_mark; ibu_t ibu; MPIDI_VC_t *vc; ib_wc_t *p_in, *p_complete; ibu_work_id_handle_t *id_ptr; ibui_send_wqe_info_t entry; static cur_vc = 0; static vc_cq = 0; volatile void* mem_ptr; volatile int valid_flag; ibu_set_t set = MPIDI_CH3I_Process.set; MPIDI_PG_t *pg = MPIDI_CH3I_Process.pg; ib_api_status_t status = IB_NOT_FOUND; MPIDI_STATE_DECL(MPID_STATE_IBU_POLL); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POLL); MPIU_DBG_PRINTFX(("entering ibu_poll\n")); if (IBU_Process.num_send_cqe || (vc_cq == IBU_CQ_POLL_WATER_MARK)) { vc_cq = 0; MPIU_DBG_PRINTF(("calling poll cq pg->ch.rank =%d \n", pg->ch.rank)); p_complete = NULL; completion_data->p_next = NULL; p_in = completion_data; status = ib_poll_cq(set, &p_in, &p_complete); if (status == IB_SUCCESS) { if (completion_data->status != IB_WCS_SUCCESS) { /* Create id_ptr for this completion data, so that code outside will remain the same */ id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator); id_ptr->ibu = NULL; id_ptr->length = 0; id_ptr->mem = NULL; /* Set id for completion data */ completion_data->wr_id = (uint64_t)id_ptr; completion_data->recv.ud.recv_opt = completion_data->recv.ud.recv_opt & (!IB_RECV_OPT_IMMEDIATE); /* Mellanox, dafna April 11th: set immd valid to 0 */ MPIU_DBG_PRINTFX(("exiting ibu_poll upon bad completion status \n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL); return status; } id_ptr = *((ibu_work_id_handle_t**)&completion_data->wr_id); ibu = id_ptr->ibu; mem_ptr = (void*)(id_ptr->mem); send_wqe_info_fifo_pop(ibu, &entry); while ((entry.mem_ptr) != mem_ptr) { ibuBlockFreeIB(ibu->allocator, entry.mem_ptr); send_wqe_info_fifo_pop(ibu, &entry); } if (IBU_RDMA_EAGER_SIGNALED == entry.RDMA_type) { /* Mellanox - always using IB_WC_SEND since there is never a real RECV */ completion_data->wc_type = IB_WC_SEND; } MPIU_DBG_PRINTFX(("exiting ibu_poll \n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL); return status; } if (status != IB_INSUFFICIENT_RESOURCES && status != IB_NOT_FOUND) { MPIU_DBG_PRINTFX(("ib_poll_cq return != IB_SUCCESS/NOT_FOUND/INSUFFECIENT_RESOURCES\n")); MPIU_DBG_PRINTFX(("exiting ibu_poll 3\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL); return status; } } MPIU_DBG_PRINTF(("calling poll buffers pg->ch.rank =%d \n", pg->ch.rank)); vc_cq++; for (i=cur_vc; i<MPIDI_PG_Get_size(pg); i++) { cur_vc = (i==(MPIDI_PG_Get_size(pg)-1)) ? 0 : i; MPIDI_PG_Get_vc(pg, i, &vc); if (vc->pg_rank == pg->ch.rank) { MPIU_DBG_PRINTF(("cont. pg->ch.rank =%d vc->pg_rank = %d\n", pg->ch.rank,vc->pg_rank)); continue; } /* get the RDMA buf valid flag address */ mem_ptr = vc->ch.ibu->local_RDMA_buf_base + vc->ch.ibu->local_RDMA_head; valid_flag = ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag; MPIU_DBG_PRINTF(("poll on buffer vc->pg_rank = %d mem_ptr = %p \n", vc->pg_rank,mem_ptr)); MPIU_DBG_PRINTF(("poll on buffer local_RDMA_buf_base = %p local_RDMA_head = %d \n",vc->ch.ibu->local_RDMA_buf_base, vc->ch.ibu->local_RDMA_head )); if (IBU_VALID_RDMA_BUF == valid_flag) { ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_INVALID_RDMA_BUF; /* Set id_ptr*/ id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator); id_ptr->mem = (void*)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE -((ibu_rdma_buf_t*)mem_ptr)->footer.total_size)); id_ptr->ibu = vc->ch.ibu; id_ptr->length = ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size - sizeof(ibu_rdma_buf_footer_t); vc->ch.ibu->remote_RDMA_limit = ((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit; /* Set pseudo completion data */ completion_data->wr_id = (uint64_t)id_ptr; completion_data->recv.ud.recv_opt = completion_data->recv.ud.recv_opt & (!IB_RECV_OPT_IMMEDIATE); completion_data->length = ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size - sizeof(ibu_rdma_buf_footer_t); completion_data->status = IB_WCS_SUCCESS; completion_data->wc_type = IB_WC_RECV; if (completion_data->length) { status = IB_SUCCESS; } /* else status remains IB_NOT_FOUND */ RDMA_buf_water_mark = (vc->ch.ibu->local_last_updated_RDMA_limit - vc->ch.ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS; if (RDMA_buf_water_mark < (IBU_NUM_OF_RDMA_BUFS /2)) { ibui_post_ack_write(vc->ch.ibu); } /* update head to point to next buffer */ vc->ch.ibu->local_RDMA_head = (vc->ch.ibu->local_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS; break; /* Finished this poll */ } } MPIU_DBG_PRINTFX(("exiting ibu_poll 4\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL); return status;}#undef FUNCNAME#define FUNCNAME ibu_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_wait(int millisecond_timeout, void **vc_pptr, int *num_bytes_ptr, ibu_op_t *op_ptr){ ib_api_status_t status; ib_wc_t completion_data; void *mem_ptr; char *iter_ptr; ibu_t ibu; int num_bytes; long offset; ibu_work_id_handle_t *id_ptr; int send_length; int ibu_reg_status = IBU_SUCCESS; MPIDI_VC_t *recv_vc_ptr; void *mem_ptr_orig; int mpi_errno; int pkt_offset;#ifdef MPIDI_CH3_CHANNEL_RNDV MPID_Request *sreq, *rreq; int complete; int i;#endif MPIDI_STATE_DECL(MPID_STATE_IBU_WAIT); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WAIT); MPIU_DBG_PRINTFX(("entering ibu_wait\n")); for (;;) { if (IBU_Process.unex_finished_list) { MPIDI_DBG_PRINTF((60, FCNAME, "returning previously received %d bytes", IBU_Process.unex_finished_list->read.total)); /* remove this ibu from the finished list */ ibu = IBU_Process.unex_finished_list; IBU_Process.unex_finished_list = IBU_Process.unex_finished_list->unex_finished_queue; ibu->unex_finished_queue = NULL; *num_bytes_ptr = ibu->read.total; *op_ptr = IBU_OP_READ; *vc_pptr = ibu->vc_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { ibu = IBU_INVALID_QP; } MPIU_DBG_PRINTFX(("exiting ibu_wait 1\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return MPI_SUCCESS; } status = ibu_poll(&completion_data); if (status == IB_INSUFFICIENT_RESOURCES || status == IB_NOT_FOUND) { /*usleep(1);*/ /* poll until there is something in the queue */ /* or the timeout has expired */ if (millisecond_timeout == 0) { *num_bytes_ptr = 0; *vc_pptr = NULL; *op_ptr = IBU_OP_TIMEOUT; MPIU_DBG_PRINTFX(("exiting ibu_wait 2\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return MPI_SUCCESS; } continue; } if (status != IB_SUCCESS) { /*MPIU_Internal_error_printf("%s: error: ib_poll_cq did not return IB_SUCCESS, %s\n", FCNAME, ib_get_err_str(status));*/ /*MPIU_dump_dbg_memlog_to_stdout();*/ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", ib_get_err_str(status)); MPIU_DBG_PRINTFX(("exiting ibu_wait 3\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } /* if (completion_data.status != IB_WCS_SUCCESS) { MPIU_Internal_error_printf("%s: error: status = %s != IB_WCS_SUCCESS\n", FCNAME, ib_get_wc_status_str(completion_data.status)); MPIU_DBG_PRINTFX(("exiting ibu_wait 4\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_FAIL; } */ id_ptr = *((ibu_work_id_handle_t**)&completion_data.wr_id); ibu = id_ptr->ibu; mem_ptr = (void*)(id_ptr->mem); send_length = id_ptr->length; ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr); mem_ptr_orig = mem_ptr; switch (completion_data.wc_type) {#ifdef MPIDI_CH3_CHANNEL_RNDV case IB_WC_RDMA_WRITE: if (completion_data.status != IB_WCS_SUCCESS) { MPIU_Internal_error_printf("%s: send completion status = %s\n", FCNAME, ib_get_wc_status_str(completion_data.status)); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", ib_get_wc_status_str(completion_data.status)); PrintWC(&completion_data); MPIU_DBG_PRINTFX(("exiting ibu_wait 4\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -