📄 ibu_wait.vapi.c
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */#include "mpidimpl.h"#include "ibu.h"#ifdef HAVE_STDIO_H#include <stdio.h>#endif#ifdef HAVE_MALLOC_H#include <malloc.h>#endif#include "mpidi_ch3_impl.h"#ifdef USE_IB_VAPI#include "ibuimpl.vapi.h"/*#define PRINT_IBU_WAIT*/#ifdef PRINT_IBU_WAIT#define MPIU_DBG_PRINTFX(a) MPIU_DBG_PRINTF(a)#else#define MPIU_DBG_PRINTFX(a)#endifchar * op2str(int opcode){ static char str[20]; switch(opcode) { case VAPI_CQE_SQ_SEND_DATA: return "VAPI_CQE_SQ_SEND_DATA"; case VAPI_CQE_SQ_RDMA_WRITE: return "VAPI_CQE_SQ_RDMA_WRITE"; case VAPI_CQE_SQ_RDMA_READ: return "VAPI_CQE_SQ_RDMA_READ"; case VAPI_CQE_SQ_COMP_SWAP: return "VAPI_CQE_SQ_COMP_SWAP"; case VAPI_CQE_SQ_FETCH_ADD: return "VAPI_CQE_SQ_FETCH_ADD"; case VAPI_CQE_SQ_BIND_MRW: return "VAPI_CQE_SQ_BIND_MRW"; case VAPI_CQE_RQ_SEND_DATA: return "VAPI_CQE_RQ_SEND_DATA"; case VAPI_CQE_RQ_RDMA_WITH_IMM: return "VAPI_CQE_RQ_RDMA_WITH_IMM"; case VAPI_CQE_INVAL_OPCODE: return "VAPI_CQE_INVAL_OPCODE"; } MPIU_Snprintf(str, 20, "%d", opcode); return str;}void PrintWC(VAPI_wc_desc_t *p){ MPIU_Msg_printf("Work Completion Descriptor:\n"); MPIU_Msg_printf(" id: %d\n", (int)p->id); MPIU_Msg_printf(" opcode: %u = %s\n", p->opcode, VAPI_cqe_opcode_sym(p->opcode)); MPIU_Msg_printf(" byte_len: %d\n", p->byte_len); MPIU_Msg_printf(" imm_data_valid: %d\n", (int)p->imm_data_valid); MPIU_Msg_printf(" imm_data: %u\n", (unsigned int)p->imm_data); MPIU_Msg_printf(" remote_node_addr:\n"); MPIU_Msg_printf(" type: %u = %s\n", p->remote_node_addr.type, VAPI_remote_node_addr_sym(p->remote_node_addr.type)); MPIU_Msg_printf(" slid: %d\n", (int)p->remote_node_addr.slid); MPIU_Msg_printf(" sl: %d\n", (int)p->remote_node_addr.sl); MPIU_Msg_printf(" qp: %d\n", (int)p->remote_node_addr.qp_ety.qp); MPIU_Msg_printf(" loc_eecn: %d\n", (int)p->remote_node_addr.ee_dlid.loc_eecn); MPIU_Msg_printf(" grh_flag: %d\n", (int)p->grh_flag); MPIU_Msg_printf(" pkey_ix: %d\n", p->pkey_ix); MPIU_Msg_printf(" status: %u = %s\n", (int)p->status, VAPI_wc_status_sym(p->status)); MPIU_Msg_printf(" vendor_err_syndrome: %d\n", p->vendor_err_syndrome); MPIU_Msg_printf(" free_res_count: %d\n", p->free_res_count); fflush(stdout);}#undef FUNCNAME#define FUNCNAME ibu_poll#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)VAPI_ret_t ibu_poll(VAPI_wc_desc_t *completion_data){ int i, RDMA_buf_water_mark; ibu_t ibu; MPIDI_VC_t *vc; ibu_work_id_handle_t *id_ptr; ibui_send_wqe_info_t entry; static cur_vc = 0; static vc_cq = 0; volatile void* mem_ptr; volatile int valid_flag; ibu_set_t set = MPIDI_CH3I_Process.set; MPIDI_PG_t *pg = MPIDI_CH3I_Process.pg; VAPI_ret_t status = VAPI_CQ_EMPTY; MPIDI_STATE_DECL(MPID_STATE_IBU_POLL); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POLL); MPIU_DBG_PRINTFX(("entering ibu_poll\n")); if (IBU_Process.num_send_cqe || (vc_cq == IBU_CQ_POLL_WATER_MARK)) { vc_cq = 0; MPIU_DBG_PRINTF(("calling poll cq pg->ch.rank =%d \n", pg->ch.rank)); status = VAPI_poll_cq(IBU_Process.hca_handle,set,completion_data); if (status == VAPI_OK) { if (completion_data->status != VAPI_SUCCESS) { /* Create id_ptr for this completion data, so that code outside will remain the same */ id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator); id_ptr->ibu = NULL; id_ptr->length = 0; id_ptr->mem = NULL; /* Set id for completion data */ completion_data->id = (VAPI_wr_id_t)id_ptr; completion_data->imm_data_valid = 0; MPIU_DBG_PRINTFX(("exiting ibu_poll upon bad completion status \n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL); return status; } id_ptr = *((ibu_work_id_handle_t**)&completion_data->id); ibu = id_ptr->ibu; mem_ptr = (void*)(id_ptr->mem); send_wqe_info_fifo_pop(ibu, &entry); while ((entry.mem_ptr) != mem_ptr) { ibuBlockFreeIB(ibu->allocator, entry.mem_ptr); send_wqe_info_fifo_pop(ibu, &entry); } if (IBU_RDMA_EAGER_SIGNALED == entry.RDMA_type) { /* Mellanox - always using VAPI_CQE_SQ_SEND_DATA since there is never a real RECV */ completion_data->opcode = VAPI_CQE_SQ_SEND_DATA; } MPIU_DBG_PRINTFX(("exiting ibu_poll \n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL); return status; } if (status != VAPI_EAGAIN && status != VAPI_CQ_EMPTY) { MPIU_DBG_PRINTFX(("VAPI_poll_cq return != VAPI_OK/EMPTY/EAGAIN\n")); MPIU_DBG_PRINTFX(("exiting ibu_poll 3\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL); return status; } } MPIU_DBG_PRINTF(("calling poll buffers pg->ch.rank =%d \n", pg->ch.rank)); vc_cq++; for (i=cur_vc; i<MPIDI_PG_Get_size(pg); i++) { cur_vc = (i==(MPIDI_PG_Get_size(pg)-1)) ? 0 : i; MPIDI_PG_Get_vc(pg, i, &vc); if (vc->pg_rank == pg->ch.rank) { MPIU_DBG_PRINTF(("cont. pg->ch.rank =%d vc->pg_rank = %d\n", pg->ch.rank,vc->pg_rank)); continue; } /* get the RDMA buf valid flag address */ mem_ptr = vc->ch.ibu->local_RDMA_buf_base + vc->ch.ibu->local_RDMA_head; valid_flag = ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag; MPIU_DBG_PRINTF(("poll on buffer vc->pg_rank = %d mem_ptr = %p \n", vc->pg_rank,mem_ptr)); MPIU_DBG_PRINTF(("poll on buffer local_RDMA_buf_base = %p local_RDMA_head = %d \n",vc->ch.ibu->local_RDMA_buf_base, vc->ch.ibu->local_RDMA_head )); if (IBU_VALID_RDMA_BUF == valid_flag) { ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_INVALID_RDMA_BUF; /* Set id_ptr*/ id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator); id_ptr->mem = (void*)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE -((ibu_rdma_buf_t*)mem_ptr)->footer.total_size)); id_ptr->ibu = vc->ch.ibu; id_ptr->length = ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size - sizeof(ibu_rdma_buf_footer_t); vc->ch.ibu->remote_RDMA_limit = ((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit; /* Set pseudo completion data */ completion_data->id = (VAPI_wr_id_t)id_ptr; completion_data->imm_data_valid = 0; completion_data->byte_len = ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size - sizeof(ibu_rdma_buf_footer_t); completion_data->status = VAPI_SUCCESS; completion_data->opcode = VAPI_CQE_RQ_SEND_DATA; if (completion_data->byte_len) { status = VAPI_OK; } /* else status remains VAPI_CQ_EMPTY */ RDMA_buf_water_mark = (vc->ch.ibu->local_last_updated_RDMA_limit - vc->ch.ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS; if (RDMA_buf_water_mark < (IBU_NUM_OF_RDMA_BUFS /2)) { ibui_post_ack_write(vc->ch.ibu); } /* update head to point to next buffer */ vc->ch.ibu->local_RDMA_head = (vc->ch.ibu->local_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS; break; /* Finished this poll */ } } MPIU_DBG_PRINTFX(("exiting ibu_poll 4\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL); return status;}#undef FUNCNAME#define FUNCNAME ibu_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_wait(int millisecond_timeout, void **vc_pptr, int *num_bytes_ptr, ibu_op_t *op_ptr){ VAPI_ret_t status; VAPI_wc_desc_t completion_data; void *mem_ptr; char *iter_ptr; ibu_t ibu; int num_bytes; long offset; ibu_work_id_handle_t *id_ptr; int send_length; int ibu_reg_status = IBU_SUCCESS; MPIDI_VC_t *recv_vc_ptr; void *mem_ptr_orig; int mpi_errno; int pkt_offset;#ifdef MPIDI_CH3_CHANNEL_RNDV MPID_Request *sreq, *rreq; int complete; int i;#endif MPIDI_STATE_DECL(MPID_STATE_IBU_WAIT); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WAIT); MPIU_DBG_PRINTFX(("entering ibu_wait\n")); for (;;) { if (IBU_Process.unex_finished_list) { MPIDI_DBG_PRINTF((60, FCNAME, "returning previously received %d bytes", IBU_Process.unex_finished_list->read.total)); /* remove this ibu from the finished list */ ibu = IBU_Process.unex_finished_list; IBU_Process.unex_finished_list = IBU_Process.unex_finished_list->unex_finished_queue; ibu->unex_finished_queue = NULL; *num_bytes_ptr = ibu->read.total; *op_ptr = IBU_OP_READ; *vc_pptr = ibu->vc_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { ibu = IBU_INVALID_QP; } MPIU_DBG_PRINTFX(("exiting ibu_wait 1\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return MPI_SUCCESS; } status = ibu_poll(&completion_data); if (status == VAPI_EAGAIN || status == VAPI_CQ_EMPTY) { /*usleep(1);*/ /* poll until there is something in the queue */ /* or the timeout has expired */ if (millisecond_timeout == 0) { *num_bytes_ptr = 0; *vc_pptr = NULL; *op_ptr = IBU_OP_TIMEOUT; MPIU_DBG_PRINTFX(("exiting ibu_wait 2\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return MPI_SUCCESS; } continue; } if (status != VAPI_OK) { /*MPIU_Internal_error_printf("%s: error: VAPI_poll_cq did not return VAPI_OK, %s\n", FCNAME, VAPI_strerror(status));*/ /*MPIU_dump_dbg_memlog_to_stdout();*/ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", VAPI_strerror(status)); MPIU_DBG_PRINTFX(("exiting ibu_wait 3\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; } /* if (completion_data.status != VAPI_SUCCESS) { MPIU_Internal_error_printf("%s: error: status = %s != VAPI_SUCCESS\n", FCNAME, VAPI_strerror(completion_data.status)); MPIU_DBG_PRINTFX(("exiting ibu_wait 4\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_FAIL; } */ id_ptr = *((ibu_work_id_handle_t**)&completion_data.id); ibu = id_ptr->ibu; mem_ptr = (void*)(id_ptr->mem); send_length = id_ptr->length; ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr); mem_ptr_orig = mem_ptr; switch (completion_data.opcode) {#ifdef MPIDI_CH3_CHANNEL_RNDV case VAPI_CQE_SQ_RDMA_WRITE: if (completion_data.status != VAPI_SUCCESS) { MPIU_Internal_error_printf("%s: send completion status = %s\n", FCNAME, VAPI_wc_status_sym(completion_data.status)); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", VAPI_wc_status_sym(completion_data.status)); PrintWC(&completion_data); MPIU_DBG_PRINTFX(("exiting ibu_wait 4\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return mpi_errno; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -