⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ibu_wait.vapi.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 5 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpidimpl.h"#include "ibu.h"#ifdef HAVE_STDIO_H#include <stdio.h>#endif#ifdef HAVE_MALLOC_H#include <malloc.h>#endif#include "mpidi_ch3_impl.h"#ifdef USE_IB_VAPI#include "ibuimpl.vapi.h"/*#define PRINT_IBU_WAIT*/#ifdef PRINT_IBU_WAIT#define MPIU_DBG_PRINTFX(a) MPIU_DBG_PRINTF(a)#else#define MPIU_DBG_PRINTFX(a)#endifchar * op2str(int opcode){    static char str[20];    switch(opcode)    {    case VAPI_CQE_SQ_SEND_DATA:	return "VAPI_CQE_SQ_SEND_DATA";    case VAPI_CQE_SQ_RDMA_WRITE:	return "VAPI_CQE_SQ_RDMA_WRITE";    case VAPI_CQE_SQ_RDMA_READ:	return "VAPI_CQE_SQ_RDMA_READ";    case VAPI_CQE_SQ_COMP_SWAP:	return "VAPI_CQE_SQ_COMP_SWAP";    case VAPI_CQE_SQ_FETCH_ADD:	return "VAPI_CQE_SQ_FETCH_ADD";    case VAPI_CQE_SQ_BIND_MRW:	return "VAPI_CQE_SQ_BIND_MRW";    case VAPI_CQE_RQ_SEND_DATA:	return "VAPI_CQE_RQ_SEND_DATA";    case VAPI_CQE_RQ_RDMA_WITH_IMM:	return "VAPI_CQE_RQ_RDMA_WITH_IMM";    case VAPI_CQE_INVAL_OPCODE:	return "VAPI_CQE_INVAL_OPCODE";    }    MPIU_Snprintf(str, 20, "%d", opcode);    return str;}void PrintWC(VAPI_wc_desc_t *p){    MPIU_Msg_printf("Work Completion Descriptor:\n");    MPIU_Msg_printf(" id: %d\n", (int)p->id);    MPIU_Msg_printf(" opcode: %u = %s\n",	   p->opcode, VAPI_cqe_opcode_sym(p->opcode));    MPIU_Msg_printf(" byte_len: %d\n", p->byte_len);    MPIU_Msg_printf(" imm_data_valid: %d\n", (int)p->imm_data_valid);    MPIU_Msg_printf(" imm_data: %u\n", (unsigned int)p->imm_data);    MPIU_Msg_printf(" remote_node_addr:\n");    MPIU_Msg_printf("  type: %u = %s\n",	   p->remote_node_addr.type,	    VAPI_remote_node_addr_sym(p->remote_node_addr.type));    MPIU_Msg_printf("  slid: %d\n", (int)p->remote_node_addr.slid);    MPIU_Msg_printf("  sl: %d\n", (int)p->remote_node_addr.sl);    MPIU_Msg_printf("  qp: %d\n", (int)p->remote_node_addr.qp_ety.qp);    MPIU_Msg_printf("  loc_eecn: %d\n", (int)p->remote_node_addr.ee_dlid.loc_eecn);    MPIU_Msg_printf(" grh_flag: %d\n", (int)p->grh_flag);    MPIU_Msg_printf(" pkey_ix: %d\n", p->pkey_ix);    MPIU_Msg_printf(" status: %u = %s\n",	   (int)p->status, VAPI_wc_status_sym(p->status));    MPIU_Msg_printf(" vendor_err_syndrome: %d\n", p->vendor_err_syndrome);    MPIU_Msg_printf(" free_res_count: %d\n", p->free_res_count);    fflush(stdout);}#undef FUNCNAME#define FUNCNAME ibu_poll#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)VAPI_ret_t ibu_poll(VAPI_wc_desc_t *completion_data){    int i, RDMA_buf_water_mark;    ibu_t ibu;    MPIDI_VC_t *vc;    ibu_work_id_handle_t *id_ptr;    ibui_send_wqe_info_t entry;    static cur_vc = 0;    static vc_cq  = 0;    volatile void* mem_ptr;    volatile int valid_flag;    ibu_set_t set = MPIDI_CH3I_Process.set;    MPIDI_PG_t  *pg = MPIDI_CH3I_Process.pg;    VAPI_ret_t status = VAPI_CQ_EMPTY;    MPIDI_STATE_DECL(MPID_STATE_IBU_POLL);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_POLL);    MPIU_DBG_PRINTFX(("entering ibu_poll\n"));    if (IBU_Process.num_send_cqe || (vc_cq == IBU_CQ_POLL_WATER_MARK))    {			vc_cq = 0;	MPIU_DBG_PRINTF(("calling poll cq pg->ch.rank =%d \n", pg->ch.rank));	status = VAPI_poll_cq(IBU_Process.hca_handle,set,completion_data);			if (status == VAPI_OK)	{	    if (completion_data->status != VAPI_SUCCESS) 	    {		/* Create id_ptr for this completion data, so that code outside will remain the same */		id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator);		id_ptr->ibu = NULL;		id_ptr->length = 0;		id_ptr->mem = NULL;		/* Set id for completion data */		completion_data->id = (VAPI_wr_id_t)id_ptr;		completion_data->imm_data_valid = 0; 		MPIU_DBG_PRINTFX(("exiting ibu_poll upon bad completion status \n"));		MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL);		return status;	    }	    id_ptr = *((ibu_work_id_handle_t**)&completion_data->id);	    ibu = id_ptr->ibu;	    mem_ptr = (void*)(id_ptr->mem);			    send_wqe_info_fifo_pop(ibu, &entry);	    while ((entry.mem_ptr) != mem_ptr)	    {		ibuBlockFreeIB(ibu->allocator, entry.mem_ptr);			send_wqe_info_fifo_pop(ibu, &entry);	    }	    if (IBU_RDMA_EAGER_SIGNALED == entry.RDMA_type)	    {		/* Mellanox - always using VAPI_CQE_SQ_SEND_DATA since there is never a real RECV */		completion_data->opcode = VAPI_CQE_SQ_SEND_DATA;	    }	    MPIU_DBG_PRINTFX(("exiting ibu_poll \n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL);	    return status;	}	if (status != VAPI_EAGAIN && status != VAPI_CQ_EMPTY)	{	    MPIU_DBG_PRINTFX(("VAPI_poll_cq return  != VAPI_OK/EMPTY/EAGAIN\n"));	    MPIU_DBG_PRINTFX(("exiting ibu_poll 3\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL);	    return status;	}    }    MPIU_DBG_PRINTF(("calling poll buffers pg->ch.rank =%d \n", pg->ch.rank));    vc_cq++;    for (i=cur_vc; i<MPIDI_PG_Get_size(pg); i++)    {    	cur_vc = (i==(MPIDI_PG_Get_size(pg)-1)) ? 0 : i;	MPIDI_PG_Get_vc(pg, i, &vc);	if (vc->pg_rank == pg->ch.rank)	{	    MPIU_DBG_PRINTF(("cont. pg->ch.rank =%d vc->pg_rank = %d\n", pg->ch.rank,vc->pg_rank));	    continue;	}		    			/* get the RDMA buf valid flag address */	mem_ptr = vc->ch.ibu->local_RDMA_buf_base + vc->ch.ibu->local_RDMA_head;	valid_flag = ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag;	MPIU_DBG_PRINTF(("poll on buffer vc->pg_rank = %d  mem_ptr = %p \n", vc->pg_rank,mem_ptr));	MPIU_DBG_PRINTF(("poll on buffer local_RDMA_buf_base = %p  local_RDMA_head = %d \n",vc->ch.ibu->local_RDMA_buf_base, vc->ch.ibu->local_RDMA_head ));	if (IBU_VALID_RDMA_BUF == valid_flag)	{	    ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_INVALID_RDMA_BUF;	    /* Set id_ptr*/	    id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator);	    id_ptr->mem = (void*)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE -((ibu_rdma_buf_t*)mem_ptr)->footer.total_size));	    id_ptr->ibu = vc->ch.ibu;	    id_ptr->length = ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size - sizeof(ibu_rdma_buf_footer_t); 	    vc->ch.ibu->remote_RDMA_limit = ((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit;	    /* Set pseudo completion data */	    completion_data->id = (VAPI_wr_id_t)id_ptr;	    completion_data->imm_data_valid = 0; 	    completion_data->byte_len = ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size - sizeof(ibu_rdma_buf_footer_t);	    completion_data->status	= VAPI_SUCCESS;	    completion_data->opcode = VAPI_CQE_RQ_SEND_DATA;	    if (completion_data->byte_len) 	    {		status = VAPI_OK;	    } 	    /* else status remains VAPI_CQ_EMPTY */	    RDMA_buf_water_mark = (vc->ch.ibu->local_last_updated_RDMA_limit - vc->ch.ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS;	    if (RDMA_buf_water_mark < (IBU_NUM_OF_RDMA_BUFS /2))	    {		ibui_post_ack_write(vc->ch.ibu);	    }	    /* update head to point to next buffer */	    vc->ch.ibu->local_RDMA_head = (vc->ch.ibu->local_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS;	    break; /* Finished this poll */	}    }    MPIU_DBG_PRINTFX(("exiting ibu_poll 4\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL);    return status;}#undef FUNCNAME#define FUNCNAME ibu_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_wait(int millisecond_timeout, void **vc_pptr, int *num_bytes_ptr, ibu_op_t *op_ptr){    VAPI_ret_t status;    VAPI_wc_desc_t completion_data;    void *mem_ptr;    char *iter_ptr;    ibu_t ibu;    int num_bytes;    long offset;    ibu_work_id_handle_t *id_ptr;    int send_length;    int ibu_reg_status = IBU_SUCCESS;    MPIDI_VC_t *recv_vc_ptr;    void *mem_ptr_orig;    int mpi_errno;    int pkt_offset;#ifdef MPIDI_CH3_CHANNEL_RNDV    MPID_Request *sreq, *rreq;    int complete;    int i;#endif    MPIDI_STATE_DECL(MPID_STATE_IBU_WAIT);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_WAIT);    MPIU_DBG_PRINTFX(("entering ibu_wait\n"));    for (;;)    {	if (IBU_Process.unex_finished_list)	{	    MPIDI_DBG_PRINTF((60, FCNAME, "returning previously received %d bytes",		IBU_Process.unex_finished_list->read.total));	    /* remove this ibu from the finished list */	    ibu = IBU_Process.unex_finished_list;	    IBU_Process.unex_finished_list = IBU_Process.unex_finished_list->unex_finished_queue;	    ibu->unex_finished_queue = NULL;	    *num_bytes_ptr = ibu->read.total;	    *op_ptr = IBU_OP_READ;	    *vc_pptr = ibu->vc_ptr;	    ibu->pending_operations--;	    if (ibu->closing && ibu->pending_operations == 0)	    {		ibu = IBU_INVALID_QP;	    }	    MPIU_DBG_PRINTFX(("exiting ibu_wait 1\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);	    return MPI_SUCCESS;	}	status = ibu_poll(&completion_data);	if (status == VAPI_EAGAIN || status == VAPI_CQ_EMPTY)	{	    /*usleep(1);*/	    /* poll until there is something in the queue */	    /* or the timeout has expired */	    if (millisecond_timeout == 0)	    {		*num_bytes_ptr = 0;		*vc_pptr = NULL;		*op_ptr = IBU_OP_TIMEOUT;		MPIU_DBG_PRINTFX(("exiting ibu_wait 2\n"));		MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);		return MPI_SUCCESS;	    }	    continue;	}	if (status != VAPI_OK)	{	    /*MPIU_Internal_error_printf("%s: error: VAPI_poll_cq did not return VAPI_OK, %s\n", FCNAME, VAPI_strerror(status));*/	    /*MPIU_dump_dbg_memlog_to_stdout();*/	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", VAPI_strerror(status));	    MPIU_DBG_PRINTFX(("exiting ibu_wait 3\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);	    return mpi_errno;	}	/*	if (completion_data.status != VAPI_SUCCESS)	{	    MPIU_Internal_error_printf("%s: error: status = %s != VAPI_SUCCESS\n", 		FCNAME, VAPI_strerror(completion_data.status));	    MPIU_DBG_PRINTFX(("exiting ibu_wait 4\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);	    return IBU_FAIL;	}	*/	id_ptr = *((ibu_work_id_handle_t**)&completion_data.id);	ibu = id_ptr->ibu;	mem_ptr = (void*)(id_ptr->mem);	send_length = id_ptr->length;	ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr);	mem_ptr_orig = mem_ptr;	switch (completion_data.opcode)	{#ifdef MPIDI_CH3_CHANNEL_RNDV	case VAPI_CQE_SQ_RDMA_WRITE:	    if (completion_data.status != VAPI_SUCCESS)	    {		MPIU_Internal_error_printf("%s: send completion status = %s\n",		    FCNAME, VAPI_wc_status_sym(completion_data.status));		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", VAPI_wc_status_sym(completion_data.status));		PrintWC(&completion_data);		MPIU_DBG_PRINTFX(("exiting ibu_wait 4\n"));		MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);		return mpi_errno;	    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -