📄 tcp_module_poll.c
字号:
fprintf(stderr,"[%i] --- Got cell[%i] : %i\n",MPID_nem_mem_region.rank,index,((int *)&(cell->pkt.mpich2))[index] ); } }#endif if (offset != -1) { nodes[grank].left2read_head += offset; if (nodes[grank].left2read_head != 0) { if (nodes[grank].left2read_head < MPID_NEM_OPT_HEAD_LEN) {#ifdef TRACE fprintf(stderr,"[%i] -- RECV TCP READ : got PARTIAL header [%i bytes/ %i total] \n", MPID_nem_mem_region.rank, offset, MPID_NEM_OPT_HEAD_LEN);#endif if (strncmp( (char *)((MPID_nem_pkt_mpich2_t *)&(cell->pkt.mpich2)),TCP_END_STRING,strlen(TCP_END_STRING)) == 0) {#ifdef TRACE fprintf(stderr,"[%i] -- RECV TCP READ : got TERM MSG (PARTIAL) : %s \n", MPID_nem_mem_region.rank,(char *)((MPID_nem_pkt_mpich2_t *)&(cell->pkt.mpich2)));#endif --MPID_nem_tcp_internal_vars.nb_procs; MPID_nem_queue_enqueue (MPID_nem_module_tcp_free_queue, cell); goto end; } else { MPID_nem_tcp_internal_queue_enqueue (&nodes[grank].internal_free_queue, cell); MPID_nem_tcp_internal_vars.n_pending_recv++; } } else { #ifdef TRACE { int index; fprintf(stderr,"[%i] -- RECV TCP READ : got FULL header [%i bytes/ %i total] [src %i -- dest %i -- dlen %i -- seqno %i]\n", MPID_nem_mem_region.rank, offset, MPID_NEM_OPT_HEAD_LEN, cell->pkt.mpich2.source, cell->pkt.mpich2.dest, cell->pkt.mpich2.datalen, cell->pkt.mpich2.seqno); }#endif if (strncmp( (char *)((MPID_nem_pkt_mpich2_t *)&(cell->pkt.mpich2)),TCP_END_STRING,strlen(TCP_END_STRING)) == 0) {#ifdef TRACE fprintf(stderr,"[%i] -- RECV TCP READ : got TERM MSG (FULL) : %s, cell @ %p \n", MPID_nem_mem_region.rank,(char *)((MPID_nem_pkt_mpich2_t *)&(cell->pkt.mpich2)),cell);#endif --MPID_nem_tcp_internal_vars.nb_procs; MPID_nem_queue_enqueue (MPID_nem_module_tcp_free_queue, cell); goto end; } if ( (cell->pkt.mpich2.datalen) > (MPID_NEM_OPT_SIZE) ) { nodes[grank].left2read = ((cell->pkt.mpich2.datalen) - (MPID_NEM_OPT_SIZE)); } else { nodes[grank].left2read = 0; } if (nodes[grank].left2read != 0) { do { offset = read(nodes[grank].desc, ((char *)&(cell->pkt.mpich2) + (MPID_NEM_OPT_HEAD_LEN)), nodes[grank].left2read ); } while (offset == -1 && errno == EINTR); if (offset != -1) {#ifdef TRACE { int index; fprintf(stderr,"[%i] -- RECV TCP READ : got [%i bytes/ %i total] \n", MPID_nem_mem_region.rank, offset, nodes[grank].left2read); }#endif nodes[grank].left2read_head = 0; nodes[grank].left2read -= offset; if (nodes[grank].left2read == 0) { MPID_nem_queue_enqueue (MPID_nem_process_recv_queue, cell); } else { MPID_nem_tcp_internal_queue_enqueue (&nodes[grank].internal_free_queue, cell); MPID_nem_tcp_internal_vars.n_pending_recv++; } } else { if (errno == EAGAIN) { MPID_nem_tcp_internal_queue_enqueue (&nodes[grank].internal_free_queue, cell); MPID_nem_tcp_internal_vars.n_pending_recv++; } else { /* read() returned an error */ MPIU_ERR_SETANDJUMP1 (mpi_errno, MPI_ERR_OTHER, "**read", "**read %s", strerror (errno)); } } } else { MPID_nem_queue_enqueue (MPID_nem_process_recv_queue, cell); end: nodes[grank].left2read = 0; nodes[grank].left2read_head = 0; } } } else { /* eof i guess */ MPID_nem_queue_enqueue (MPID_nem_module_tcp_free_queue, cell);#ifdef TRACE perror("EOF SOCK");#endif } } else { if (errno == EAGAIN) { /* why is eagain handled differently? */ MPIU_ERR_SETANDJUMP1 (mpi_errno, MPI_ERR_OTHER, "**intern", "**intern %s", strerror (errno)); } else { /* read() returned an error */ MPIU_ERR_SETANDJUMP1 (mpi_errno, MPI_ERR_OTHER, "**read", "**read %s", strerror (errno)); } } } else { /* Q is empty !!! */ nodes[grank].toread++; outstanding2++; } } } else {#ifdef TRACE fprintf(stderr,"[%i] -- RECV TCP READ NO desc (%i) (index %i) \n", MPID_nem_mem_region.rank, nodes[index].desc,grank);#endif if (nodes[grank].toread > 0 ) { nodes[grank].toread--; MPID_nem_tcp_internal_vars.outstanding--; goto main_routine; } } } } MPID_nem_tcp_internal_vars.outstanding = outstanding2; outstanding2 = 0; fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_MODULE_POLL_RECV); return mpi_errno; fn_fail: goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_alt_tcp_module_poll#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_alt_tcp_module_poll (MPID_nem_poll_dir_t in_or_out){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_ALT_TCP_MODULE_POLL); MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_ALT_TCP_MODULE_POLL); if(MPID_nem_tcp_internal_vars.poll_freq >= 0) { if (in_or_out == MPID_NEM_POLL_OUT) { if( MPID_nem_tcp_internal_vars.n_pending_send > 0 ) { mpi_errno = MPID_nem_tcp_module_poll_send(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); if (MPID_nem_tcp_internal_vars.n_pending_recv > 0) { mpi_errno = MPID_nem_tcp_module_poll_recv(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); } else if (--(MPID_nem_tcp_internal_vars.poll_freq) == 0) { mpi_errno = MPID_nem_tcp_module_poll_recv(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); MPID_nem_tcp_internal_vars.poll_freq = MPID_nem_tcp_internal_vars.old_poll_freq; } } else if (--(MPID_nem_tcp_internal_vars.poll_freq) == 0) { mpi_errno = MPID_nem_tcp_module_poll_send(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); mpi_errno = MPID_nem_tcp_module_poll_recv(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); MPID_nem_tcp_internal_vars.poll_freq = MPID_nem_tcp_internal_vars.old_poll_freq; } } else { if( MPID_nem_tcp_internal_vars.n_pending_recv > 0 ) { mpi_errno = MPID_nem_tcp_module_poll_recv(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); if (MPID_nem_tcp_internal_vars.n_pending_send > 0) { mpi_errno = MPID_nem_tcp_module_poll_send(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); } else if (--(MPID_nem_tcp_internal_vars.poll_freq) == 0) { mpi_errno = MPID_nem_tcp_module_poll_send(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); MPID_nem_tcp_internal_vars.poll_freq = MPID_nem_tcp_internal_vars.old_poll_freq; } } else if (--(MPID_nem_tcp_internal_vars.poll_freq) == 0) { mpi_errno = MPID_nem_tcp_module_poll_recv(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); mpi_errno = MPID_nem_tcp_module_poll_send(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); MPID_nem_tcp_internal_vars.poll_freq = MPID_nem_tcp_internal_vars.old_poll_freq; } } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_ALT_TCP_MODULE_POLL); return mpi_errno; fn_fail: goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_tcp_module_poll#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_tcp_module_poll (MPID_nem_poll_dir_t in_or_out){ int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_MODULE_POLL); MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_MODULE_POLL); if (MPID_nem_tcp_internal_vars.poll_freq >= 0) { if (in_or_out == MPID_NEM_POLL_OUT) { mpi_errno = MPID_nem_tcp_module_poll_send(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); mpi_errno = MPID_nem_tcp_module_poll_recv(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); } else { mpi_errno = MPID_nem_tcp_module_poll_recv(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); mpi_errno = MPID_nem_tcp_module_poll_send(); if (mpi_errno) MPIU_ERR_POP (mpi_errno); } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_MODULE_POLL); return mpi_errno; fn_fail: goto fn_exit;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -