📄 ibu.ibal.c
字号:
buf += len; if (cur_len == len) { cur_index++; cur_len = iov[cur_index].MPID_IOV_LEN; cur_buf = iov[cur_index].MPID_IOV_BUF; } else { cur_len -= len; cur_buf += len; } } } /*((ibu_rdma_buf_t*)mem_ptr)->footer.cur_offset = 0;*/ ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_VALID_RDMA_BUF; ((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit = (( ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) - 1) % IBU_NUM_OF_RDMA_BUFS; /* Piggybacked update of remote Q state */ ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size = msg_size; /* Already added size of additional header*/ /* Mellanox END copying data */ data.length = msg_size; MPIU_Assert(data.length); /* Data.vaddr points to beginning of original buffer */ data.vaddr = (uint64_t)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE - msg_size)); data.lkey = GETLKEY(mem_ptr); work_req.p_next = NULL; work_req.wr_type = WR_RDMA_WRITE; /* replaced WR_SEND by Mellanox, dafna April 11th */ signaled_type = ibui_signaled_completion(ibu); work_req.send_opt = signaled_type; work_req.num_ds = 1; work_req.ds_array = &data; work_req.immediate_data = 0; /* Added and updated remote ops by Mellanox, dafna April 11th */ work_req.remote_ops.vaddr = (uint64_t)(ibu->remote_RDMA_buf_base + (ibu->remote_RDMA_head + 1)); work_req.remote_ops.vaddr -= msg_size; work_req.remote_ops.rkey = ibu->remote_RDMA_buf_hndl.rkey; work_req.remote_ops.atomic1 = 0x0; work_req.remote_ops.atomic2 = 0x0; MPIU_DBG_PRINTF((" work_req remote_addr = %p \n",work_req.remote_ops.vaddr )); if (signaled_type == IB_SEND_OPT_SIGNALED) { id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator); *((ibu_work_id_handle_t**)&work_req.wr_id) = id_ptr; if (id_ptr == NULL) { MPIDI_DBG_PRINTF((60, FCNAME, "ibuBlocAlloc returned NULL")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); MPIU_Assert(0); return IBU_FAIL; } id_ptr->ibu = ibu; id_ptr->mem = (void*)mem_ptr; id_ptr->length = msg_size; } if (msg_size < (unsigned int)ibu->max_inline_size) { MPIDI_DBG_PRINTF((60, FCNAME, "ib_post_inline_sr(%d bytes)", msg_size)); work_req.send_opt |= IB_SEND_OPT_INLINE; } else { MPIDI_DBG_PRINTF((60, FCNAME, "ib_post_send(%d bytes)", msg_size)); } status = ib_post_send(ibu->qp_handle, &work_req, NULL); if (status != IB_SUCCESS) { /* Free id_ptr if was signaled and VAPI posting failed */ if (signaled_type == IB_SEND_OPT_SIGNALED) { ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr); } MPIU_Internal_error_printf("%s: Error: failed to post ib send, status = %s\n", FCNAME, ib_get_err_str(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_FAIL; } /* Mellanox push entry to send_wqe_fifo */ entry_type = (signaled_type == IB_SEND_OPT_SIGNALED)? IBU_RDMA_EAGER_SIGNALED : IBU_RDMA_EAGER_UNSIGNALED; send_wqe_info_fifo_push(ibu, entry_type , mem_ptr, msg_size); /* Mellanox update head of remote RDMA buffer to write to */ ibu->remote_RDMA_head = (ibu->remote_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS; /* Mellanox update remote RDMA limit to what was sent in the packet */ ibu->local_last_updated_RDMA_limit = (( ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS )- 1) % IBU_NUM_OF_RDMA_BUFS; /* Mellanox change of print. use remote_RDMA_head/remote_RMDA_limit instead of nAvailRemote use new local RDMA limit for nUnacked */ MPIU_DBG_PRINTF(("send posted, nAvailRemote: %d, local_last_updated_RDMA_limit: %d\n", (ibu->remote_RDMA_limit - ibu->remote_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS, ((ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS)- 1) % IBU_NUM_OF_RDMA_BUFS)); } while (cur_index < n); *num_bytes_ptr = total; MPIU_DBG_PRINTF(("exiting ibu_writev\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_SUCCESS;}#ifdef HAVE_WINDOWS_H#ifdef USE_DEBUG_ALLOCATION_HOOKint __cdecl ibu_allocation_hook(int nAllocType, void * pvData, size_t nSize, int nBlockUse, long lRequest, const unsigned char * szFileName, int nLine) { /*nBlockUse = _FREE_BLOCK, _NORMAL_BLOCK, _CRT_BLOCK, _IGNORE_BLOCK, _CLIENT_BLOCK */ if ( nBlockUse == _CRT_BLOCK ) /* Ignore internal C runtime library allocations */ return( TRUE ); /*nAllocType = _HOOK_ALLOC, _HOOK_REALLOC, _HOOK_FREE */ if (nAllocType == _HOOK_FREE) { /* remove from cache */ if ( pvData != NULL ) { ibu_invalidate_memory(pvData, nSize); } } return( TRUE ); /* Allow the memory operation to proceed */}#endif#endif#undef FUNCNAME#define FUNCNAME ibui_get_first_active_ca#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int ibui_get_first_active_ca(){ int mpi_errno; ib_api_status_t status; intn_t guid_count; intn_t i; uint32_t port; ib_net64_t p_ca_guid_array[12]; ib_ca_attr_t *p_ca_attr; size_t bsize; ib_port_attr_t *p_port_attr; ib_ca_handle_t hca_handle; MPIDI_STATE_DECL(MPID_STATE_IBUI_GET_FIRST_ACTIVE_CA); MPIDI_FUNC_ENTER(MPID_STATE_IBUI_GET_FIRST_ACTIVE_CA); MPIU_DBG_PRINTF(("entering ibui_get_first_active_ca\n")); status = ib_get_ca_guids( IBU_Process.al_handle, NULL, &guid_count ); if (status != IB_INSUFFICIENT_MEMORY) { MPIU_Internal_error_printf( "[%d] ib_get_ca_guids failed [%s]\n", __LINE__, ib_get_err_str(status)); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**get_guids", 0); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_FIRST_ACTIVE_CA); return mpi_errno; } /*printf("Total number of CA's = %d\n", (uint32_t)guid_count);fflush(stdout);*/ if (guid_count == 0) { MPIU_Internal_error_printf("no channel adapters available.\n"); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**noca", 0); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_FIRST_ACTIVE_CA); return mpi_errno; } if (guid_count > 12) { guid_count = 12; } status = ib_get_ca_guids(IBU_Process.al_handle, p_ca_guid_array, &guid_count); if ( status != IB_SUCCESS ) { MPIU_Internal_error_printf("[%d] ib_get_ca_guids failed [%s]\n", __LINE__, ib_get_err_str(status)); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**ca_guids", "**ca_guids %s", ib_get_err_str(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_FIRST_ACTIVE_CA); return mpi_errno; } /* walk guid table */ for ( i = 0; i < guid_count; i++ ) { status = ib_open_ca( IBU_Process.al_handle, p_ca_guid_array[i], NULL, NULL, &hca_handle ); if (status != IB_SUCCESS) { MPIU_Internal_error_printf( "[%d] ib_open_ca failed [%s]\n", __LINE__, ib_get_err_str(status)); continue; } /*printf( "GUID = %"PRIx64"\n", p_ca_guid_array[i]);fflush(stdout);*/ /* Query the CA */ bsize = 0; status = ib_query_ca( hca_handle, NULL, &bsize ); if (status != IB_INSUFFICIENT_MEMORY) { MPIU_Internal_error_printf( "[%d] ib_query_ca failed [%s]\n", __LINE__, ib_get_err_str(status)); ib_close_ca(hca_handle, NULL); continue; } /* Allocate the memory needed for query_ca */ p_ca_attr = (ib_ca_attr_t *)cl_zalloc( bsize ); if ( !p_ca_attr ) { MPIU_Internal_error_printf( "[%d] not enough memory\n", __LINE__); ib_close_ca(hca_handle, NULL); continue; } status = ib_query_ca( hca_handle, p_ca_attr, &bsize ); if (status != IB_SUCCESS) { MPIU_Internal_error_printf( "[%d] ib_query_ca failed [%s]\n", __LINE__, ib_get_err_str(status)); ib_close_ca(hca_handle, NULL); cl_free( p_ca_attr ); continue; } /* scan for active port */ for( port = 0; port < p_ca_attr->num_ports; port++ ) { p_port_attr = &p_ca_attr->p_port_attr[port]; /* is there an active port? */ if ( p_port_attr->link_state == IB_LINK_ACTIVE ) { /* yes, is there a port_guid or lid we should attach to? */ /* printf("port %d active with lid %d\n", p_port_attr->port_num, cl_ntoh16(p_port_attr->lid)); fflush(stdout); */ /* get a protection domain handle */ status = ib_alloc_pd(hca_handle, IB_PDT_NORMAL, NULL, &IBU_Process.pd_handle); if (status != IB_SUCCESS) { MPIU_Internal_error_printf("get_first_ca: ib_alloc_pd failed, status %s\n", ib_get_err_str(status)); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**pd_alloc", "**pd_alloc %s", ib_get_err_str(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_FIRST_ACTIVE_CA); return mpi_errno; } IBU_Process.port = p_port_attr->port_num; IBU_Process.port_static_rate = ib_port_info_compute_rate(p_port_attr); /* Mellanox dafna April 11th, compute port's static rate according to link width*/ IBU_Process.lid = p_port_attr->lid; MPIU_DBG_PRINTF(("port = %d, lid = %d, mtu = %d, max_cqes = %d, maxmsg = %d, link = %s, static_rate = %d\n", p_port_attr->port_num, cl_ntoh16(p_port_attr->lid), p_port_attr->mtu, p_ca_attr->max_cqes, p_port_attr->max_msg_size, ib_get_port_state_str(p_port_attr->link_state), IBU_Process.port_static_rate)); IBU_Process.hca_handle = hca_handle; IBU_Process.dev_id = p_ca_attr->dev_id; /* Mellanox dafna April 11th, added dev_id */ IBU_Process.cq_size = p_ca_attr->max_cqes; cl_free( p_ca_attr ); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_FIRST_ACTIVE_CA); return MPI_SUCCESS; } } /* free allocated mem */ cl_free( p_ca_attr ); ib_close_ca(hca_handle, NULL); } MPIU_Internal_error_printf("no channel adapters available.\n"); mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**noca", 0); MPIDI_FUNC_EXIT(MPID_STATE_IBUI_GET_FIRST_ACTIVE_CA); return mpi_errno;}#undef FUNCNAME#define FUNCNAME ibu_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_init(){ ib_api_status_t status; MPIDI_STATE_DECL(MPID_STATE_IBU_INIT); MPIDI_FUNC_ENTER(MPID_STATE_IBU_INIT); MPIU_DBG_PRINTF(("entering ibu_init\n")); /* FIXME: This is a temporary solution to prevent cached pointers from pointing to old physical memory pages. A better solution might be to add a user hook to free() to remove cached pointers at that time. */#ifdef MPIDI_CH3_CHANNEL_RNDV /* taken from the OSU mvapich source: */ /* Set glibc/stdlib malloc options to prevent handing * memory back to the system (brk) upon free. * Also, dont allow MMAP memory for large allocations. */#ifdef M_TRIM_THRESHOLD mallopt(M_TRIM_THRESHOLD, -1);#endif#ifdef M_MMAP_MAX mallopt(M_MMAP_MAX, 0);#endif /* End of OSU code */#ifdef HAVE_WINDOWS_H#ifdef USE_DEBUG_ALLOCATION_HOOK _CrtSetAllocHook(ibu_allocation_hook);#endif#endif#endif /* Initialize globals */ IBU_Process.num_send_cqe = 0; /* Added by Mellanox, dafna April 11th */ /* get a handle to the infiniband access layer */ status = ib_open_al(&IBU_Process.al_handle); if (status != IB_SUCCESS) { MPIU_Internal_error_printf("ibu_init: ib_open_al failed, status %s\n", ib_get_err_str(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } /* wait for 50 ms before querying al. This fixes a potential race condition in al where ib_query is not ready with port information on faster systems */ cl_thread_suspend(50); status = ibui_get_first_active_ca(); if (status != MPI_SUCCESS) { MPIU_Internal_error_printf("ibu_init: get_first_active_ca failed.\n"); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } /* non infiniband initialization */ IBU_Process.unex_finished_list = NULL; IBU_Process.workAllocator = ibuBlockAllocInit(sizeof(ibu_work_id_handle_t), 256, 256, malloc, free); MPIU_DBG_PRINTF(("exiting ibu_init\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return IBU_SUCCESS;}#undef FUNCNAME#define FUNCNAME ibu_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_finalize(){ MPIDI_STATE_DECL(MPID_STATE_IBU_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_FINALIZE); MPIU_DBG_PRINTF(("entering ibu_finalize\n")); ibuBlockAllocFinalize(&IBU_Process.workAllocator); /* replaced g_allocator by Mellanox, dafna April 11th */ ib_close_ca(IBU_Process.hca_handle, NULL); ib_close_al(IBU_Process.al_handle); MPIU_DBG_PRINTF(("exiting ibu_finalize\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_FINALIZE); return IBU_SUCCESS;}void AL_API FooBar(const ib_cq_handle_t h_cq, void *p){ MPIU_Internal_error_printf("FooBar\n");}#undef FUNCNAME#define FUNCNAME ibu_create_set#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_create_set(ibu_set_t *set){ ib_api_status_t status; ib_cq_create_t cq_attr; MPIDI_STATE_DECL(MPID_STATE_IBU_CREATE_SET); MPIDI_FUNC_ENTER(MPID_STATE_IBU_CREATE_SET); MPIU_DBG_PRINTF(("entering ibu_create_set\n")); /* create the completion queue */ cq_attr.size = IBU_Process.cq_size; cq_attr.pfn_comp_cb = FooBar; /* completion routine */ cq_attr.h_wait_obj = NULL; /* client specific wait object */ status = ib_create_cq(IBU_Process.hca_handle, &cq_attr, NULL, NULL, set); if (status != IB_SUCCESS) { MPIU_Internal_error_printf("ibu_create_set: ib_create_cq failed, error %s\n", ib_get_err_str(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_CREATE_SET); return status; } /* status = ib_rearm_cq(*set, TRUE); if (status != IB_SUCCESS) { MPIU_Internal_error_printf("%s: error: ib_rearm_cq failed, %s\n", FCNAME, ib_get_err_str(status)); MPIDI_FUNC_EXIT(MPID_STATE_IBU_CREATE_SET); return IBU_FAIL; } */ MPIU_DBG_PRINTF(("exiting ibu_create_set\n")); MPIDI_FUNC_EXIT(MPID_STATE_IBU_CREATE_SET); return status;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -