📄 ch3_progress.c
字号:
{ mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0); goto fn_exit; } } vc_ptr = vc_ptr->ch.shm_next_writer; } } MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_YIELD); MPIDU_Yield(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_YIELD); }after_shm_loop: if (shmIter == shmReps) { shmIter = 0; } MPID_CPU_TICK(&end); shmTicks += end - start; if (shmIter != 0) goto skip_sock_loop;skip_shm_loop: MPID_CPU_TICK(&start); for (; sockIter<sockReps; sockIter++, sockTotalReps++) { /* make progress on the sockets */ mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event); if (mpi_errno == MPI_SUCCESS) { mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**handle_sock_op", 0); goto fn_exit; } /*active = active | MPID_CH3I_SOCK_BIT;*/ MPIDI_CH3I_active_flag |= MPID_CH3I_SOCK_BIT; } else { if (MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**handle_sock_op", 0); goto fn_exit; } mpi_errno = MPI_SUCCESS; MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_YIELD); MPIDU_Yield(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_YIELD); } if (completions != MPIDI_CH3I_progress_completion_count) { sockIter++; sockTotalReps++; goto after_sock_loop; } }after_sock_loop: if (sockIter == sockReps) { sockIter = 0; updateIter++; } MPID_CPU_TICK(&end); sockTicks += end - start;skip_sock_loop: if (updateIter == MPIDI_CH3I_UPDATE_ITERATIONS) { updateIter = 0; switch (MPIDI_CH3I_active_flag) { case MPID_CH3I_SHM_BIT: /* only shared memory has been active */ /* give shm MPID_SINGLE_ACTIVE_FACTOR cycles for every 1 sock cycle */ shmReps = (int) ( ( sockTicks * (MPID_CPU_Tick_t)MPID_SINGLE_ACTIVE_FACTOR * (MPID_CPU_Tick_t)shmTotalReps ) / (MPID_CPU_Tick_t)sockTotalReps / shmTicks); if (shmReps < 1) shmReps = 1; sockReps = 1; /*MPIU_DBG_PRINTF(("(SHM_BIT: shmReps = %d, sockReps = %d)", shmReps, sockReps));*/ break; case MPID_CH3I_SOCK_BIT: /* only sockets have been active */ /* give sock MPID_SINGLE_ACTIVE_FACTOR cycles for every 1 shm cycle */ sockReps = (int) ( ( shmTicks * (MPID_CPU_Tick_t)MPID_SINGLE_ACTIVE_FACTOR * (MPID_CPU_Tick_t)sockTotalReps ) / (MPID_CPU_Tick_t)shmTotalReps / sockTicks ); if (sockReps < 1) sockReps = 1; shmReps = 1; /*MPIU_DBG_PRINTF(("(SOCK_BIT: shmReps = %d, sockReps = %d)", shmReps, sockReps));*/ break; case MPID_CH3I_SHM_BIT + MPID_CH3I_SOCK_BIT: /* both channels have been active */ /* give each channel 50% of the spin cycles */ shmReps = (int) ( ( (MPID_CPU_Tick_t)((shmTicks + sockTicks) >> 1) * (MPID_CPU_Tick_t)shmTotalReps ) / shmTicks ) / MPIDI_CH3I_UPDATE_ITERATIONS; if (shmReps < 1) shmReps = 1; sockReps = (int) ( ( (MPID_CPU_Tick_t)((shmTicks + sockTicks) >> 1) * (MPID_CPU_Tick_t)sockTotalReps ) / sockTicks ) / MPIDI_CH3I_UPDATE_ITERATIONS; if (sockReps < 1) sockReps = 1; /*MPIU_DBG_PRINTF(("(BOTH_BITS: shmReps = %d, sockReps = %d)", shmReps, sockReps));*/ break; default: /* neither channel has been active */ /* leave the state in its previous state */ /*printf(".");*/ break; } MPIDI_CH3I_active_flag = 0; shmTotalReps = 0; shmTicks = 0; shmIter = 0; sockTotalReps = 0; sockTicks = 0; sockIter = 0; } if (msgqIter++ == MPIDI_CH3I_MSGQ_ITERATIONS) { msgqIter = 0; mpi_errno = MPIDI_CH3I_Message_queue_progress(); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mqp_failure", 0); goto fn_exit; } } } while (completions == MPIDI_CH3I_progress_completion_count && is_blocking);fn_exit:#ifdef MPICH_DBG_OUTPUT if (is_blocking) { MPIDI_DBG_PRINTF((50, FCNAME, "exiting, count=%d", MPIDI_CH3I_progress_completion_count - completions)); } else { if (MPIDI_CH3I_progress_completion_count - completions > 0) { MPIDI_DBG_PRINTF((50, FCNAME, "exiting (non-blocking), count=%d", MPIDI_CH3I_progress_completion_count - completions)); } }#endif MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS); /*return MPIDI_CH3I_progress_completion_count - completions;*/ return mpi_errno;}#endif /* USE_FIXED_ACTIVE_PROGRESS */#if !defined(MPIDI_CH3_Progress_poke)#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_poke#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3_Progress_poke(){ int mpi_errno; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_POKE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_POKE); mpi_errno = MPIDI_CH3_Progress_test(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_POKE); return mpi_errno;}#endif#if !defined(MPIDI_CH3_Progress_start)#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_start#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3_Progress_start(MPID_Progress_state *state){ /* MT - This function is empty for the single-threaded implementation */ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_START); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_START); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_START);}#endif#if !defined(MPIDI_CH3_Progress_end)#undef FUNCNAME#define FUNCNAME MPIDI_CH3_Progress_end#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)void MPIDI_CH3_Progress_end(MPID_Progress_state *state){ /* MT: This function is empty for the single-threaded implementation */ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_END); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_END); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_END);}#endif#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_init(){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); /* FIXME: copied from sock# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, NULL); }# endif */ mpi_errno = MPIDU_Sock_init(); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress_init", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ /* create sock set */ mpi_errno = MPIDU_Sock_create_set(&MPIDI_CH3I_sock_set); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress_init", 0); goto fn_exit; } /* --END ERROR HANDLING-- */ /* establish non-blocking listener */ mpi_errno = MPIDU_CH3I_SetupListener( MPIDI_CH3I_sock_set ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } /* Initialize the code to handle incoming packets */ mpi_errno = MPIDI_CH3_PktHandler_Init( MPIDI_pktArray, MPIDI_CH3_PKT_END_CH3+1 ); if (mpi_errno) { MPIU_ERR_POP(mpi_errno); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT); return mpi_errno; fn_fail: goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress_finalize(){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); /* Shut down the listener */ MPIDU_CH3I_ShutdownListener(); /* FIXME: Cleanly shutdown other socks and MPIU_Free connection structures. (close protocol?) */ MPIDU_Sock_destroy_set(MPIDI_CH3I_sock_set); MPIDU_Sock_finalize(); /* FIXME: copied from sock# if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX) { MPID_Thread_cond_destroy(&MPIDI_CH3I_progress_completion_cond, NULL); }# endif */ MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE); return mpi_errno;}#ifdef NEEDS_MESSAGE_QUEUE_PROGRESS/* FIXME: What is this routine for? */#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Message_queue_progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static int MPIDI_CH3I_Message_queue_progress( void ){ MPIDI_CH3I_Shmem_queue_info info; int num_bytes; MPIDI_VC_t *vc_ptr; int mpi_errno; /* check for new shmem queue connection requests */ /*printf("<%dR>", MPIR_Process.comm_world->rank);fflush(stdout);*/ mpi_errno = MPIDI_CH3I_BootstrapQ_recv_msg( MPIDI_Process.my_pg->ch.bootstrapQ, &info, sizeof(info), &num_bytes, FALSE); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SET(mpi_errno,MPI_ERR_OTHER,"**boot_recv"); return mpi_errno; }#ifdef MPICH_DBG_OUTPUT if (num_bytes != 0 && num_bytes != sizeof(info)) { MPIU_ERR_SETFATAL1(mpi_errno,MPI_ERR_OTHER, "**bootqmsg", "**bootqmsg %d", num_bytes); return mpi_errno; }#endif if (num_bytes) { MPIDI_PG_t *pg; MPIDI_PG_Find(info.pg_id, &pg); MPIDI_PG_Get_vc(pg, info.pg_rank, &vc_ptr); /*vc_ptr = &MPIDI_Process.my_pg->ch.vc_table[info.pg_rank];*/ mpi_errno = MPIDI_CH3I_SHM_Attach_to_mem( &info.info, &vc_ptr->ch.shm_read_queue_info); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_SETFATAL1(mpi_errno,MPI_ERR_OTHER, "**MPIDI_CH3I_SHM_Attach_to_mem", "**MPIDI_CH3I_SHM_Attach_to_mem %d", vc_ptr->ch.shm_read_queue_info.error); return mpi_errno; } MPIU_DBG_PRINTF(("attached to queue from process %d\n", info.pg_rank));#ifdef HAVE_SHARED_PROCESS_READ#ifdef HAVE_WINDOWS_H mpi_errno = MPIDI_SHM_InitRWProc( info.pid, &vc_ptr->ch.hSharedProcessHandle );#else vc_ptr->ch.nSharedProcessID = info.pid; mpi_errno = MPIDI_SHM_InitRWProc( info.pid, &vc_ptr->ch.nSharedProcessFileDescriptor );#endif if (mpi_errno) { return mpi_errno; }#endif /*vc_ptr->ch.state = MPIDI_CH3I_VC_STATE_CONNECTED;*/ /* we are read connected but not write connected */ vc_ptr->vc_ptr->ch.shm_read_connected = 1; vc_ptr->ch.bShm = TRUE; vc_ptr->ch.read_shmq = vc_ptr->ch.shm_read_queue_info.addr; MPIU_DBG_PRINTF(("read_shmq = %p\n", vc_ptr->ch.read_shmq)); vc_ptr->ch.shm_reading_pkt = TRUE; /* add this VC to the global list to be shm_waited on */ /*printf("vc added to reading list.\n");fflush(stdout);*/ MPIDI_CH3I_SHM_Add_to_reader_list(vc_ptr); } return MPI_SUCCESS;}#endif/* FIXME: This is a temp to free memory allocated in this file */int MPIDI_CH3U_Finalize_ssm_memory( void ){ MPIDI_CH3I_VC *vcch; /* Free resources allocated in CH3_Init() */ while (MPIDI_CH3I_Process.shm_reading_list) { vcch = (MPIDI_CH3I_VC *)MPIDI_CH3I_Process.shm_reading_list->channel_private; MPIDI_CH3I_SHM_Release_mem(&vcch->shm_read_queue_info); MPIDI_CH3I_Process.shm_reading_list = vcch->shm_next_reader; } while (MPIDI_CH3I_Process.shm_writing_list) { vcch = (MPIDI_CH3I_VC *)MPIDI_CH3I_Process.shm_writing_list->channel_private; MPIDI_CH3I_SHM_Release_mem(&vcch->shm_write_queue_info); MPIDI_CH3I_Process.shm_writing_list = vcch->shm_next_writer; } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -