📄 ch3_progress.c
字号:
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); /*MPIDI_STATE_DECL(MPID_STATE_MPIDU_YIELD);*/ MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); do { /* make progress on the shared memory queues */ if (sockIter != 0) goto skip_shm_loop; MPID_CPU_TICK(&start); for (; shmIter<shmReps; shmIter++, shmTotalReps++) { if (MPIDI_CH3I_Process.shm_reading_list) { rc = MPIDI_CH3I_SHM_read_progress( MPIDI_CH3I_Process.shm_reading_list, 0, &vc_ptr, &num_bytes); if (rc == MPI_SUCCESS) { MPIDI_DBG_PRINTF((50, FCNAME, "MPIDI_CH3I_SHM_read_progress reported %d bytes read", num_bytes)); mpi_errno = MPIDI_CH3I_Handle_shm_read(vc_ptr, num_bytes); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0); goto fn_exit; } } else { if (rc != SHM_WAIT_TIMEOUT) { /*MPIDI_err_printf("MPIDI_CH3_Progress", "MPIDI_CH3I_SHM_read_progress returned error %d\n", rc);*/ mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0); goto fn_exit; } /* if (rc == SHM_WAIT_TIMEOUT_WHILE_ACTIVE) { active = active | MPID_CH3I_SHM_BIT; } */ } if (MPIDI_CH3I_shm_read_active) spin_count = 1; if (completions != MPIDI_CH3I_progress_completion_count) { MPIDI_CH3I_active_flag |= MPID_CH3I_SHM_BIT; /* active = active | MPID_CH3I_SHM_BIT; spin_count = 1; */ shmIter++; shmTotalReps++; goto after_shm_loop; } } if (MPIDI_CH3I_Process.shm_writing_list) { vc_ptr = MPIDI_CH3I_Process.shm_writing_list; while (vc_ptr) { if (vc_ptr->ch.send_active != NULL) { rc = MPIDI_CH3I_SHM_write_progress(vc_ptr); if (rc == MPI_SUCCESS) { /*active = active | MPID_CH3I_SHM_BIT;*/ if (completions != MPIDI_CH3I_progress_completion_count) { MPIDI_CH3I_active_flag |= MPID_CH3I_SHM_BIT; spin_count = 1; shmIter++; shmTotalReps++; goto after_shm_loop; } } else if (rc != -1 /*SHM_WAIT_TIMEOUT*/) { mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0); goto fn_exit; } } vc_ptr = vc_ptr->ch.shm_next_writer; } } MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_YIELD); MPIDU_Yield(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_YIELD); }after_shm_loop: if (shmIter == shmReps) { shmIter = 0; } MPID_CPU_TICK(&end); shmTicks += end - start; if (shmIter != 0) goto skip_sock_loop;skip_shm_loop: MPID_CPU_TICK(&start); for (; sockIter<sockReps; sockIter++, sockTotalReps++) { /* make progress on the sockets */ mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event); if (mpi_errno == MPI_SUCCESS) { mpi_errno = MPIDI_CH3I_Progress_handle_sock_event(&event); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress_sock_wait", 0); goto fn_exit; } /*active = active | MPID_CH3I_SOCK_BIT;*/ MPIDI_CH3I_active_flag |= MPID_CH3I_SOCK_BIT; } else { if (MPIR_ERR_GET_CLASS(mpi_errno) != MPIDU_SOCK_ERR_TIMEOUT) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress_sock_wait", 0); goto fn_exit; } /* comment out this line to test the error functions */ mpi_errno = MPI_SUCCESS; MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_YIELD); MPIDU_Yield(); MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_YIELD); } if (completions != MPIDI_CH3I_progress_completion_count) { sockIter++; sockTotalReps++; goto after_sock_loop; } }after_sock_loop: if (sockIter == sockReps) { sockIter = 0; updateIter++; } MPID_CPU_TICK(&end); sockTicks += end - start;skip_sock_loop: if (updateIter == MPIDI_CH3I_UPDATE_ITERATIONS) { updateIter = 0; switch (MPIDI_CH3I_active_flag) { case MPID_CH3I_SHM_BIT: /* only shared memory has been active */ /* give shm MPID_SINGLE_ACTIVE_FACTOR cycles for every 1 sock cycle */ shmReps = (int) ( ( sockTicks * (MPID_CPU_Tick_t)MPID_SINGLE_ACTIVE_FACTOR * (MPID_CPU_Tick_t)shmTotalReps ) / (MPID_CPU_Tick_t)sockTotalReps / shmTicks); if (shmReps < 1) shmReps = 1; if (shmReps > 256) { shmReps = 256; } sockReps = 1; /*MPIU_DBG_PRINTF(("(SHM_BIT: shmReps = %d, sockReps = %d)", shmReps, sockReps));*/ break; case MPID_CH3I_SOCK_BIT: /* only sockets have been active */ /* give sock MPID_SINGLE_ACTIVE_FACTOR cycles for every 1 shm cycle */ sockReps = (int) ( ( shmTicks * (MPID_CPU_Tick_t)MPID_SINGLE_ACTIVE_FACTOR * (MPID_CPU_Tick_t)sockTotalReps ) / (MPID_CPU_Tick_t)shmTotalReps / sockTicks ); if (sockReps < 1) sockReps = 1; if (sockReps > 100) { sockReps = 100; } shmReps = 1; /*MPIU_DBG_PRINTF(("(SOCK_BIT: shmReps = %d, sockReps = %d)", shmReps, sockReps));*/ break; case MPID_CH3I_SHM_BIT + MPID_CH3I_SOCK_BIT: /* both channels have been active */ /* give each channel 50% of the spin cycles */ shmReps = (int) ( ( (MPID_CPU_Tick_t)((shmTicks + sockTicks) >> 1) * (MPID_CPU_Tick_t)shmTotalReps ) / shmTicks ) / MPIDI_CH3I_UPDATE_ITERATIONS; if (shmReps < 1) shmReps = 1; if (shmReps > 256) { shmReps = 256; } sockReps = (int) ( ( (MPID_CPU_Tick_t)((shmTicks + sockTicks) >> 1) * (MPID_CPU_Tick_t)sockTotalReps ) / sockTicks ) / MPIDI_CH3I_UPDATE_ITERATIONS; if (sockReps < 1) sockReps = 1; if (sockReps > 100) { sockReps = 100; } /*MPIU_DBG_PRINTF(("(BOTH_BITS: shmReps = %d, sockReps = %d)", shmReps, sockReps));*/ break; default: /* neither channel has been active */ /* leave the state in its previous state */ /*printf(".");*/ break; } /* if (shmReps > 500 || sockReps > 100) { printf("[%d] SHMREPS=%d, SOCKREPS=%d\n", MPIR_Process.comm_world->rank, shmReps, sockReps); } */ MPIDI_CH3I_active_flag = 0; shmTotalReps = 0; shmTicks = 0; shmIter = 0; sockTotalReps = 0; sockTicks = 0; sockIter = 0; } if (msgqIter++ == MPIDI_CH3I_MSGQ_ITERATIONS) { msgqIter = 0; /*printf("[%d] calling message queue progress\n", MPIR_Process.comm_world->rank);fflush(stdout);*/ /* printf("%d",2*MPIR_Process.comm_world->rank); */ mpi_errno = MPIDI_CH3I_Message_queue_progress(); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**mqp_failure", 0); goto fn_exit; } /* printf("%d",2*MPIR_Process.comm_world->rank+1); */ } } while (completions == MPIDI_CH3I_progress_completion_count);fn_exit: MPIDI_DBG_PRINTF((50, FCNAME, "exiting, count=%d", MPIDI_CH3I_progress_completion_count - completions)); MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PROGRESS_WAIT); return mpi_errno;}#endif /* USE_ADAPTIVE_PROGRESS */#ifdef USE_FIXED_ACTIVE_PROGRESS/**********************************************************//* *//* Fixed active progress engine *//* *//* Like the adaptive engine but active reps are fixed *//* instead of calculated from CPU time. *//* *//**********************************************************/#define MPIDI_CH3I_UPDATE_ITERATIONS 10#define MPID_SINGLE_ACTIVE_FACTOR 100/* Define this macro to include the definition of the message queue progress routine */#define NEEDS_MESSAGE_QUEUE_PROGRESSstatic int MPIDI_CH3I_Message_queue_progress( void );#undef FUNCNAME#define FUNCNAME MPIDI_CH3I_Progress#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPIDI_CH3I_Progress(int is_blocking, MPID_Progress_state *state){ int mpi_errno = MPI_SUCCESS; int rc; MPIDU_Sock_event_t event; unsigned completions = MPIDI_CH3I_progress_completion_count; int num_bytes; MPIDI_VC_t *vc_ptr; /*static int active = 0;*/ static int updateIter = 0; static int msgqIter = 0; MPID_CPU_Tick_t start, end; static MPID_CPU_Tick_t shmTicks = 0; static int shmIter = 0; static int shmReps = 1; static int shmTotalReps = 0; static int spin_count = 1; static MPID_CPU_Tick_t sockTicks = 0; static int sockIter = 0; static int sockReps = 1; static int sockTotalReps = 0; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PROGRESS); MPIDI_STATE_DECL(MPID_STATE_MPIDU_YIELD); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PROGRESS);#ifdef MPICH_DBG_OUTPUT if (is_blocking) { MPIDI_DBG_PRINTF((50, FCNAME, "entering, blocking=%s", is_blocking ? "true" : "false")); }#endif do { /* make progress on the shared memory queues */ if (sockIter != 0) goto skip_shm_loop; MPID_CPU_TICK(&start); for (; shmIter<shmReps; shmIter++, shmTotalReps++) { if (MPIDI_CH3I_Process.shm_reading_list) { rc = MPIDI_CH3I_SHM_read_progress( MPIDI_CH3I_Process.shm_reading_list, 0, &vc_ptr, &num_bytes); if (rc == MPI_SUCCESS) { MPIDI_DBG_PRINTF((50, FCNAME, "MPIDI_CH3I_SHM_read_progress reported %d bytes read", num_bytes)); mpi_errno = MPIDI_CH3I_Handle_shm_read(vc_ptr, num_bytes); if (mpi_errno != MPI_SUCCESS) { mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0); goto fn_exit; } } else { if (rc != SHM_WAIT_TIMEOUT) { MPIDI_err_printf("MPIDI_CH3_Progress", "MPIDI_CH3I_SHM_read_progress returned error %d\n", rc); mpi_errno = MPIR_Err_create_code(rc, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**progress", 0); goto fn_exit; } /* if (rc == SHM_WAIT_TIMEOUT_WHILE_ACTIVE) { active = active | MPID_CH3I_SHM_BIT; } */ } if (MPIDI_CH3I_shm_read_active) spin_count = 1; if (completions != MPIDI_CH3I_progress_completion_count) { MPIDI_CH3I_active_flag |= MPID_CH3I_SHM_BIT; /* active = active | MPID_CH3I_SHM_BIT; spin_count = 1; */ shmIter++; shmTotalReps++; goto after_shm_loop; } } if (MPIDI_CH3I_Process.shm_writing_list) { vc_ptr = MPIDI_CH3I_Process.shm_writing_list; while (vc_ptr) { if (vc_ptr->ch.send_active != NULL) { rc = MPIDI_CH3I_SHM_write_progress(vc_ptr); if (rc == MPI_SUCCESS) { /*active = active | MPID_CH3I_SHM_BIT;*/ if (completions != MPIDI_CH3I_progress_completion_count) { MPIDI_CH3I_active_flag |= MPID_CH3I_SHM_BIT; spin_count = 1; shmIter++; shmTotalReps++; goto after_shm_loop; } } else if (rc != -1 /*SHM_WAIT_TIMEOUT*/)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -