⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 3 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "ch3i_progress.h"/* * This file contains multiple implementations of the progress routine; * they are selected based on the style of polling that is desired. */int MPIDI_CH3I_shm_read_active = 0;int MPIDI_CH3I_shm_write_active = 0;int MPIDI_CH3I_sock_read_active = 0;int MPIDI_CH3I_sock_write_active = 0;int MPIDI_CH3I_active_flag = 0;MPIDU_Sock_set_t MPIDI_CH3I_sock_set = NULL; /* We initialize this array with a zero to prevent problems with systems   that insist on making uninitialized global variables unshared common   symbols */MPIDI_CH3_PktHandler_Fcn *MPIDI_pktArray[MPIDI_CH3_PKT_END_CH3+1] = { 0 };#if defined(USE_FIXED_SPIN_WAITS) || !defined(MPID_CPU_TICK)/****************************************//*                                      *//*   Fixed spin waits progress engine   *//*                                      *//****************************************/#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress(int is_blocking, MPID_Progress_state *state){    int mpi_errno = MPI_SUCCESS;    int rc;#ifdef MPICH_DBG_OUTPUT    int count;#endif    int bShmProgressMade;    MPIDU_Sock_event_t event;    unsigned completions = MPIDI_CH3I_progress_completion_count;    MPIDI_CH3I_Shmem_queue_info info;    int num_bytes;    shm_wait_t wait_result;    MPIDI_VC_t *vc_ptr;    MPIDI_CH3I_VC *vcch;    MPIDI_CH3I_PG *pgch;    static int spin_count = 1;    static int msg_queue_count = 0;    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS);    MPIDI_STATE_DECL(MPID_STATE_MPIDU_YIELD);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS);#ifdef MPICH_DBG_OUTPUT    if (is_blocking)    {	MPIDI_DBG_PRINTF((50, FCNAME, "entering, blocking=%s", is_blocking ? "true" : "false"));    }#endif    do    {	/* make progress on the shared memory queues */	bShmProgressMade = FALSE;	if (MPIDI_CH3I_Process.shm_reading_list)	{	    rc = MPIDI_CH3I_SHM_read_progress(MPIDI_CH3I_Process.shm_reading_list, 0, &vc_ptr, &num_bytes,&wait_result);	    if (rc == MPI_SUCCESS)	    {		if (wait_result != SHM_WAIT_TIMEOUT) {		    MPIDI_DBG_PRINTF((50, FCNAME, "MPIDI_CH3I_SHM_read_progress reported %d bytes read", num_bytes));		    mpi_errno = MPIDI_CH3I_Handle_shm_read(vc_ptr, num_bytes);		    if (mpi_errno != MPI_SUCCESS)		    {			mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0);			goto fn_exit;		    }		    bShmProgressMade = TRUE;		}	    }	    else	    {		/*MPIDI_err_printf("MPIDI_CH3_Progress", "MPIDI_CH3I_SHM_read_progress returned error %d\n", rc);*/		mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**shm_read_progress", 0);		goto fn_exit;	    }	}	if (MPIDI_CH3I_Process.shm_writing_list)	{	    vc_ptr = MPIDI_CH3I_Process.shm_writing_list;	    while (vc_ptr)	    {		MPIDI_CH3I_VC *vcch = (MPIDI_CH3I_VC *)vc_ptr->channel_private;		if (vcch->send_active != NULL)		{		    rc = MPIDI_CH3I_SHM_write_progress(vc_ptr);		    if (rc == MPI_SUCCESS)		    {			bShmProgressMade = TRUE;		    }		    else if (rc != -1 /* SHM_WAIT_TIMEOUT */)		    {			mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0);			goto fn_exit;		    }		}		vc_ptr = vcch->shm_next_writer;	    }	}	/* FIXME: It looks like this is a relic of code to occassional	 call sleep to implement a yield.  As no code defined the	 use sleep yield macro, that code was removed, exposing this odd	 construction. */	pgch = (MPIDI_CH3I_PG *)MPIDI_Process.my_pg->channel_private;	if (spin_count >= pgch->nShmWaitSpinCount)	{	    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_YIELD);	    MPIDU_Yield();	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_YIELD);	    spin_count = 1;	}	else {	    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_YIELD);	    MPIDU_Yield();	    MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_YIELD);	}	spin_count++;	if (spin_count > (pgch->nShmWaitSpinCount >> 1) )	{	    /* make progress on the sockets */	    mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event);	    if (mpi_errno == MPI_SUCCESS)	    {		mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event);		if (mpi_errno != MPI_SUCCESS)		{		    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress_handle_sock_op", 0);		    goto fn_exit;		}	    }	    else	    {		if (MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT)		{		    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress_sock_wait", 0);		    goto fn_exit;		}		mpi_errno = MPI_SUCCESS;		MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_YIELD);		MPIDU_Yield();		MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_YIELD);	    }	}	if (((msg_queue_count++ % MPIDI_CH3I_MSGQ_ITERATIONS) == 0) || 	    !is_blocking)	{	    /* check for new shmem queue connection requests */	    rc = MPIDI_CH3I_BootstrapQ_recv_msg(                   pgch->bootstrapQ, &info, 		   sizeof(info), &num_bytes, FALSE);	    if (rc != MPI_SUCCESS)	    {		mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**boot_recv", 0);		goto fn_exit;	    }#ifdef MPICH_DBG_OUTPUT	    if (num_bytes != 0 && num_bytes != sizeof(info))	    {		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**bootqmsg", "**bootqmsg %d", num_bytes);		goto fn_exit;	    }#endif	    if (num_bytes)	    {		MPIDI_PG_t *pg;		MPIDI_PG_Find(info.pg_id, &pg);		MPIDI_PG_Get_vc(pg, info.pg_rank, &vc_ptr);		/*		printf("attaching to shared memory queue:\nVC.rank %d\nVC.pg_id <%s>\nPG.id <%s>\n",		    vc_ptr->pg_rank, vc_ptr->pg->id, pg->id);		fflush(stdout);		*/		/*vc_ptr = &pgch->vc_table[info.pg_rank];*/		vcch = (MPIDI_CH3I_VC *)vc_ptr->channel_private;		rc = MPIDI_CH3I_SHM_Attach_to_mem(&info.info, 					    &vcch->shm_read_queue_info);		if (rc != MPI_SUCCESS)		{		    mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**attach_to_mem", "**attach_to_mem %d", vcch->shm_read_queue_info.error);		    goto fn_exit;		}		MPIU_DBG_PRINTF(("attached to queue from process %d\n", info.pg_rank));#ifdef HAVE_SHARED_PROCESS_READ#ifdef HAVE_WINDOWS_H		mpi_errno = MPIDI_SHM_InitRWProc( info.pid, 					  &vcch->hSharedProcessHandle );#else		vcch->nSharedProcessID = info.pid;		mpi_errno = MPIDI_SHM_InitRWProc( info.pid, 				   &vcch->nSharedProcessFileDescriptor );#endif		if (mpi_errno) { return mpi_errno; }#endif		/*vc_ptr->ch.state = MPIDI_CH3I_VC_STATE_CONNECTED;*/ 		/* we are read connected but not write connected */		vcch->shm_read_connected = 1;		vcch->bShm = TRUE;		vcch->read_shmq = vcch->shm_read_queue_info.addr;/*info.info.addr;*/		MPIU_DBG_PRINTF(("read_shmq = %p\n", vcch->read_shmq));		vcch->shm_reading_pkt = TRUE;		/* add this VC to the global list to be shm_waited on */		MPIDI_CH3I_SHM_Add_to_reader_list(vc_ptr);	    }	}    }    while (completions == MPIDI_CH3I_progress_completion_count && is_blocking);fn_exit:#ifdef MPICH_DBG_OUTPUT    count = MPIDI_CH3I_progress_completion_count - completions;    if (is_blocking) {	MPIDI_DBG_PRINTF((50, FCNAME, "exiting, count=%d", count));    }    else {	if (count > 0) {	    MPIDI_DBG_PRINTF((50, FCNAME, "exiting (non-blocking), count=%d", count));	}    }#endif    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS);    return mpi_errno;}#endif /* USE_FIXED_SPIN_WAITS */#if defined(USE_ADAPTIVE_PROGRESS) && defined(MPID_CPU_TICK)/********************************//*                              *//*   Adaptive progress engine   *//*                              *//********************************/#define MPIDI_CH3I_UPDATE_ITERATIONS    10#define MPID_SINGLE_ACTIVE_FACTOR      100/* Define this macro to include the definition of the message queue    progress routine */#define NEEDS_MESSAGE_QUEUE_PROGRESSstatic int MPIDI_CH3I_Message_queue_progress( void );#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_test#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_test(){    /* This function has a problem that is #if 0'd out.     * The commented out code causes test to only probe the message queue for     * connection attempts     * every MPIDI_CH3I_MSGQ_ITERATIONS iterations.  This can delay shm     * connection formation for codes that call test infrequently.     * But the uncommented code also has the problem that the overhead of      * checking the message queue is incurred with every call to test.     */    int mpi_errno = MPI_SUCCESS;    int rc;    MPIDU_Sock_event_t event;    int num_bytes;    MPIDI_VC_t *vc_ptr;#if 0    static int msgqIter = 0;#endif    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);    if (MPIDI_CH3I_Process.shm_reading_list)    {	rc = MPIDI_CH3I_SHM_read_progress(	    MPIDI_CH3I_Process.shm_reading_list,	    0, &vc_ptr, &num_bytes);	if (rc == MPI_SUCCESS)	{	    MPIDI_DBG_PRINTF((50, FCNAME, "MPIDI_CH3I_SHM_read_progress reported %d bytes read", num_bytes));	    mpi_errno = MPIDI_CH3I_Handle_shm_read(vc_ptr, num_bytes);	    if (mpi_errno != MPI_SUCCESS)	    {		mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress_test", 0);		goto fn_exit;	    }	}    }    if (MPIDI_CH3I_Process.shm_writing_list)    {	vc_ptr = MPIDI_CH3I_Process.shm_writing_list;	while (vc_ptr)	{	    if (vc_ptr->ch.send_active != NULL)	    {		rc = MPIDI_CH3I_SHM_write_progress(vc_ptr);		if (rc != MPI_SUCCESS && rc != -1 /*SHM_WAIT_TIMEOUT*/)		{		    mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0);		    goto fn_exit;		}	    }	    vc_ptr = vc_ptr->ch.shm_next_writer;	}    }    mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event);    if (mpi_errno == MPI_SUCCESS)    {	mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event);	if (mpi_errno != MPI_SUCCESS)	{	    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**handle_sock_op", 0);	    return mpi_errno;	}	/*active = active | MPID_CH3I_SOCK_BIT;*/	MPIDI_CH3I_active_flag |= MPID_CH3I_SOCK_BIT;    }    else    {	if (MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT)	{	    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**test_sock_wait", 0);	    goto fn_exit;	}	mpi_errno = MPI_SUCCESS;    }#if 0    if (msgqIter++ == MPIDI_CH3I_MSGQ_ITERATIONS)    {	msgqIter = 0;	/*printf("[%d] calling message queue progress from test.\n", 	  MPIR_Process.comm_world->rank);fflush(stdout);*/	mpi_errno = MPID_CH3I_Message_queue_progress();	if (mpi_errno != MPI_SUCCESS)	{	    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mqp_failure", 0);	}    }#else    mpi_errno = MPIDI_CH3I_Message_queue_progress();    if (mpi_errno != MPI_SUCCESS)    {	mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mqp_failure", 0);    }#endiffn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_TEST);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_wait#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_wait(MPID_Progress_state *state){    int mpi_errno = MPI_SUCCESS;    int rc;    MPIDU_Sock_event_t event;    unsigned completions = MPIDI_CH3I_progress_completion_count;    int num_bytes;    MPIDI_VC_t *vc_ptr;    /*static int active = 0;*/    static int updateIter = 0;    static int msgqIter = MPIDI_CH3I_MSGQ_ITERATIONS;    MPID_CPU_Tick_t start, end;    static MPID_CPU_Tick_t shmTicks = 0;    static int shmIter = 0;    static int shmReps = 1;    static int shmTotalReps = 0;    static int spin_count = 1;    static MPID_CPU_Tick_t sockTicks = 0;    static int sockIter = 0;    static int sockReps = 1;    static int sockTotalReps = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -