⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ibu_wait.ibal.c

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 C
📖 第 1 页 / 共 5 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//**  (C) 2001 by Argonne National Laboratory.*      See COPYRIGHT in top-level directory.*/#include "mpidimpl.h"#include "ibu.h"#ifdef HAVE_STDIO_H#include <stdio.h>#endif#ifdef HAVE_MALLOC_H#include <malloc.h>#endif#include "mpidi_ch3_impl.h"#ifdef USE_IB_IBAL#include "ibuimpl.ibal.h"/*#define PRINT_IBU_WAIT*/#ifdef PRINT_IBU_WAIT#define MPIU_DBG_PRINTFX(a) MPIU_DBG_PRINTF(a)#else#define MPIU_DBG_PRINTFX(a)#endifchar * op2str(int wc_type){    static char str[20];    switch(wc_type)    {    case IB_WC_SEND:	return "IB_WC_SEND";    case IB_WC_RDMA_WRITE:	return "IB_WC_RDMA_WRITE";    case IB_WC_RECV:	return "IB_WC_RECV";    case IB_WC_RDMA_READ:	return "IB_WC_RDMA_READ";    case IB_WC_MW_BIND:	return "IB_WC_MW_BIND";    case IB_WC_FETCH_ADD:	return "IB_WC_FETCH_ADD";    case IB_WC_COMPARE_SWAP:	return "IB_WC_COMPARE_SWAP";    case IB_WC_RECV_RDMA_WRITE:	return "IB_WC_RECV_RDMA_WRITE";    }    MPIU_Snprintf(str, 20, "%d", wc_type);    return str;}void PrintWC(ib_wc_t *p){    MPIU_Msg_printf("Work Completion Descriptor:\n");    MPIU_Msg_printf(" id: %d\n", (int)p->wr_id);    MPIU_Msg_printf(" opcode: %u = %s\n",	   p->wc_type, op2str(p->wc_type));    MPIU_Msg_printf(" opcode: %u \n",  p->wc_type);    MPIU_Msg_printf(" length: %d\n", p->length);    MPIU_Msg_printf(" imm_data_valid: %d\n", (IB_RECV_OPT_IMMEDIATE & (int)p->recv.conn.recv_opt));     MPIU_Msg_printf(" imm_data: %u\n", (unsigned int)p->recv.conn.immediate_data);    MPIU_Msg_printf(" remote_node_addr:\n");    // TODO where in ibal remote_node_addr?    MPIU_Msg_printf("  type: %u = %s\n",    //	   p->remote_node_addr.type,    //	   remote_node_addr_sym(p->remote_node_addr.type));    MPIU_Msg_printf("  slid: %d\n", (int)p->recv.ud.remote_lid);    MPIU_Msg_printf("  sl: %d\n", (int)p->recv.ud.remote_sl);    MPIU_Msg_printf("  qp: %d\n", (int)p->recv.ud.remote_qp);    MPIU_Msg_printf("  loc_eecn: %d\n", 0 /*TODO: loc_eecn*/);    MPIU_Msg_printf(" grh_flag: %d\n", (IB_RECV_OPT_GRH_VALID & (int)p->recv.ud.recv_opt));    MPIU_Msg_printf(" pkey_ix: %d\n", p->recv.ud.pkey_index);    MPIU_Msg_printf(" status: %u = %s\n",	(int)p->status, ib_get_wc_status_str(p->status));    MPIU_Msg_printf(" vendor_err_syndrome: %d\n", (IB_RECV_OPT_VEND_MASK & (int)p->recv.ud.recv_opt));    fflush(stdout);}#undef FUNCNAME#define FUNCNAME ibu_poll#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)ib_api_status_t ibu_poll(ib_wc_t *completion_data){    int i, RDMA_buf_water_mark;    ibu_t ibu;    MPIDI_VC_t *vc;    ib_wc_t *p_in, *p_complete;    ibu_work_id_handle_t *id_ptr;    ibui_send_wqe_info_t entry;    static cur_vc = 0;    static vc_cq  = 0;    volatile void* mem_ptr;    volatile int valid_flag;    ibu_set_t set = MPIDI_CH3I_Process.set;    MPIDI_PG_t  *pg = MPIDI_CH3I_Process.pg;    ib_api_status_t status = IB_NOT_FOUND;    MPIDI_STATE_DECL(MPID_STATE_IBU_POLL);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_POLL);    MPIU_DBG_PRINTFX(("entering ibu_poll\n"));    if (IBU_Process.num_send_cqe || (vc_cq == IBU_CQ_POLL_WATER_MARK))    {			vc_cq = 0;	MPIU_DBG_PRINTF(("calling poll cq pg->ch.rank =%d \n", pg->ch.rank));	p_complete = NULL;	completion_data->p_next = NULL;	p_in = completion_data;	status = ib_poll_cq(set, &p_in, &p_complete); 	if (status == IB_SUCCESS)	{	    if (completion_data->status != IB_WCS_SUCCESS) 	    {		/* Create id_ptr for this completion data, so that code outside will remain the same */		id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator);		id_ptr->ibu = NULL;		id_ptr->length = 0;		id_ptr->mem = NULL;		/* Set id for completion data */		completion_data->wr_id = (uint64_t)id_ptr;		completion_data->recv.ud.recv_opt = completion_data->recv.ud.recv_opt & (!IB_RECV_OPT_IMMEDIATE); /* Mellanox, dafna April 11th: set immd valid to 0 */		MPIU_DBG_PRINTFX(("exiting ibu_poll upon bad completion status \n"));		MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL);		return status;	    }	    id_ptr = *((ibu_work_id_handle_t**)&completion_data->wr_id);	    ibu = id_ptr->ibu;	    mem_ptr = (void*)(id_ptr->mem);			    send_wqe_info_fifo_pop(ibu, &entry);	    while ((entry.mem_ptr) != mem_ptr)	    {		ibuBlockFreeIB(ibu->allocator, entry.mem_ptr);			send_wqe_info_fifo_pop(ibu, &entry);	    }	    if (IBU_RDMA_EAGER_SIGNALED == entry.RDMA_type)	    {		/* Mellanox - always using IB_WC_SEND since there is never a real RECV */		completion_data->wc_type = IB_WC_SEND;	    }	    MPIU_DBG_PRINTFX(("exiting ibu_poll \n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL);	    return status;	}	if (status != IB_INSUFFICIENT_RESOURCES && status != IB_NOT_FOUND)	{	    MPIU_DBG_PRINTFX(("ib_poll_cq return  != IB_SUCCESS/NOT_FOUND/INSUFFECIENT_RESOURCES\n"));	    MPIU_DBG_PRINTFX(("exiting ibu_poll 3\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL);	    return status;	}    }    MPIU_DBG_PRINTF(("calling poll buffers pg->ch.rank =%d \n", pg->ch.rank));    vc_cq++;    for (i=cur_vc; i<MPIDI_PG_Get_size(pg); i++)    {    	cur_vc = (i==(MPIDI_PG_Get_size(pg)-1)) ? 0 : i;	MPIDI_PG_Get_vc(pg, i, &vc);	if (vc->pg_rank == pg->ch.rank)	{	    MPIU_DBG_PRINTF(("cont. pg->ch.rank =%d vc->pg_rank = %d\n", pg->ch.rank,vc->pg_rank));	    continue;	}		    			/* get the RDMA buf valid flag address */	mem_ptr = vc->ch.ibu->local_RDMA_buf_base + vc->ch.ibu->local_RDMA_head;	valid_flag = ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag;	MPIU_DBG_PRINTF(("poll on buffer vc->pg_rank = %d  mem_ptr = %p \n", vc->pg_rank,mem_ptr));	MPIU_DBG_PRINTF(("poll on buffer local_RDMA_buf_base = %p  local_RDMA_head = %d \n",vc->ch.ibu->local_RDMA_buf_base, vc->ch.ibu->local_RDMA_head ));	if (IBU_VALID_RDMA_BUF == valid_flag)	{	    ((ibu_rdma_buf_t*)mem_ptr)->footer.RDMA_buf_valid_flag = IBU_INVALID_RDMA_BUF;	    /* Set id_ptr*/	    id_ptr = (ibu_work_id_handle_t*)ibuBlockAlloc(IBU_Process.workAllocator);	    id_ptr->mem = (void*)(((ibu_rdma_buf_t*)mem_ptr)->alignment + (IBU_RDMA_BUF_SIZE -((ibu_rdma_buf_t*)mem_ptr)->footer.total_size));	    id_ptr->ibu = vc->ch.ibu;	    id_ptr->length = ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size - sizeof(ibu_rdma_buf_footer_t); 	    vc->ch.ibu->remote_RDMA_limit = ((ibu_rdma_buf_t*)mem_ptr)->footer.updated_remote_RDMA_recv_limit;	    /* Set pseudo completion data */	    completion_data->wr_id = (uint64_t)id_ptr;	    completion_data->recv.ud.recv_opt  = completion_data->recv.ud.recv_opt & (!IB_RECV_OPT_IMMEDIATE); 	    completion_data->length = ((ibu_rdma_buf_t*)mem_ptr)->footer.total_size - sizeof(ibu_rdma_buf_footer_t);	    completion_data->status	= IB_WCS_SUCCESS;	    completion_data->wc_type = IB_WC_RECV;	    if (completion_data->length) 	    {		status = IB_SUCCESS;	    } 	    /* else status remains IB_NOT_FOUND */	    RDMA_buf_water_mark = (vc->ch.ibu->local_last_updated_RDMA_limit - vc->ch.ibu->local_RDMA_head + IBU_NUM_OF_RDMA_BUFS) % IBU_NUM_OF_RDMA_BUFS;	    if (RDMA_buf_water_mark < (IBU_NUM_OF_RDMA_BUFS /2))	    {		ibui_post_ack_write(vc->ch.ibu);	    }	    /* update head to point to next buffer */	    vc->ch.ibu->local_RDMA_head = (vc->ch.ibu->local_RDMA_head + 1) % IBU_NUM_OF_RDMA_BUFS;	    break; /* Finished this poll */	}    }    MPIU_DBG_PRINTFX(("exiting ibu_poll 4\n"));    MPIDI_FUNC_EXIT(MPID_STATE_IBU_POLL);    return status;}#undef FUNCNAME#define FUNCNAME ibu_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int ibu_wait(int millisecond_timeout, void **vc_pptr, int *num_bytes_ptr, ibu_op_t *op_ptr){    ib_api_status_t status;    ib_wc_t completion_data;    void *mem_ptr;    char *iter_ptr;    ibu_t ibu;    int num_bytes;    long offset;    ibu_work_id_handle_t *id_ptr;    int send_length;    int ibu_reg_status = IBU_SUCCESS;    MPIDI_VC_t *recv_vc_ptr;    void *mem_ptr_orig;    int mpi_errno;    int pkt_offset;#ifdef MPIDI_CH3_CHANNEL_RNDV    MPID_Request *sreq, *rreq;    int complete;    int i;#endif    MPIDI_STATE_DECL(MPID_STATE_IBU_WAIT);    MPIDI_FUNC_ENTER(MPID_STATE_IBU_WAIT);    MPIU_DBG_PRINTFX(("entering ibu_wait\n"));    for (;;)    {	if (IBU_Process.unex_finished_list)	{	    MPIDI_DBG_PRINTF((60, FCNAME, "returning previously received %d bytes",		IBU_Process.unex_finished_list->read.total));	    /* remove this ibu from the finished list */	    ibu = IBU_Process.unex_finished_list;	    IBU_Process.unex_finished_list = IBU_Process.unex_finished_list->unex_finished_queue;	    ibu->unex_finished_queue = NULL;	    *num_bytes_ptr = ibu->read.total;	    *op_ptr = IBU_OP_READ;	    *vc_pptr = ibu->vc_ptr;	    ibu->pending_operations--;	    if (ibu->closing && ibu->pending_operations == 0)	    {		ibu = IBU_INVALID_QP;	    }	    MPIU_DBG_PRINTFX(("exiting ibu_wait 1\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);	    return MPI_SUCCESS;	}	status = ibu_poll(&completion_data);	if (status == IB_INSUFFICIENT_RESOURCES || status == IB_NOT_FOUND)	{	    /*usleep(1);*/	    /* poll until there is something in the queue */	    /* or the timeout has expired */	    if (millisecond_timeout == 0)	    {		*num_bytes_ptr = 0;		*vc_pptr = NULL;		*op_ptr = IBU_OP_TIMEOUT;		MPIU_DBG_PRINTFX(("exiting ibu_wait 2\n"));		MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);		return MPI_SUCCESS;	    }	    continue;	}	if (status != IB_SUCCESS)	{	    /*MPIU_Internal_error_printf("%s: error: ib_poll_cq did not return IB_SUCCESS, %s\n", FCNAME, ib_get_err_str(status));*/	    /*MPIU_dump_dbg_memlog_to_stdout();*/	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", ib_get_err_str(status));	    MPIU_DBG_PRINTFX(("exiting ibu_wait 3\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);	    return mpi_errno;	}	/*	if (completion_data.status != IB_WCS_SUCCESS)	{	    MPIU_Internal_error_printf("%s: error: status = %s != IB_WCS_SUCCESS\n", 		FCNAME, ib_get_wc_status_str(completion_data.status));	    MPIU_DBG_PRINTFX(("exiting ibu_wait 4\n"));	    MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);	    return IBU_FAIL;	}	*/	id_ptr = *((ibu_work_id_handle_t**)&completion_data.wr_id);	ibu = id_ptr->ibu;	mem_ptr = (void*)(id_ptr->mem);	send_length = id_ptr->length;	ibuBlockFree(IBU_Process.workAllocator, (void*)id_ptr);	mem_ptr_orig = mem_ptr;	switch (completion_data.wc_type)	{#ifdef MPIDI_CH3_CHANNEL_RNDV	case IB_WC_RDMA_WRITE:	    if (completion_data.status != IB_WCS_SUCCESS)	    {		MPIU_Internal_error_printf("%s: send completion status = %s\n",		    FCNAME, ib_get_wc_status_str(completion_data.status));		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", "**fail %s", ib_get_wc_status_str(completion_data.status));		PrintWC(&completion_data);		MPIU_DBG_PRINTFX(("exiting ibu_wait 4\n"));		MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);		return mpi_errno;	    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -