⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp_module_poll.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2006 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "tcp_module_impl.h"int MPID_nem_alt_tcp_module_poll (MPID_nem_poll_dir_t in_or_out);/* #define TRACE */#undef FUNCNAME#define FUNCNAME MPID_nem_tcp_module_poll_send#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_tcp_module_poll_send( void ){    int                  mpi_errno = MPI_SUCCESS;    MPID_nem_cell_ptr_t  cell;    MPID_nem_pkt_t      *pkt;    int                  offset;    int                  dest;    int                  len;    int                  index;    int                  grank;    node_t              *nodes     = MPID_nem_tcp_internal_vars.nodes;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_MODULE_POLL_SEND);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_MODULE_POLL_SEND);    /* first, handle pending sends */    if  (MPID_nem_tcp_internal_vars.n_pending_send > 0)    {        for(index = 0 ; index < MPID_nem_mem_region.ext_procs ;index++)	{            grank = MPID_nem_mem_region.ext_ranks[index];            if((grank != MPID_nem_mem_region.rank ) && (!MPID_nem_tcp_internal_queue_empty (nodes[grank].internal_recv_queue)))	    {#ifdef TRACE                fprintf(stderr,"[%i] -- TCP RETRY SEND for %i ... \n",MPID_nem_mem_region.rank,grank);                /*MPID_nem_dump_queue( nodes[grank].internal_recv_queue ); */#endif                pkt  = (MPID_nem_pkt_t *)MPID_NEM_CELL_TO_PACKET ( nodes[grank].internal_recv_queue.head ); /* cast away volatile */                dest = pkt->mpich2.dest;                len  = (MPID_NEM_PACKET_OPT_LEN(pkt)) - nodes[dest].left2write;                do                {                    offset = write(nodes[dest].desc,(char *)pkt + nodes[dest].left2write, len);                }                while (offset == -1 && errno == EINTR);#ifdef TRACE                fprintf(stderr,"[%i] -- TCP RETRY SEND for %i/offset %i/remaining %i \n/pkt len : %i/curr offset : %i \n",                        MPID_nem_mem_region.rank,                        grank,                        offset,                        len,                        MPID_NEM_PACKET_OPT_LEN(pkt),                        nodes[dest].left2write);#endif                if(offset != -1)		{                    nodes[dest].left2write += offset;                    if(nodes[dest].left2write == (MPID_NEM_PACKET_OPT_LEN(pkt)))		    {#ifdef TRACE                        fprintf(stderr,"[%i] -- TCP SEND : sent PARTIAL MSG 2 %i len, [%i total/%i payload]\n",                                MPID_nem_mem_region.rank,                                offset,                                nodes[dest].left2write,                                pkt->mpich2.datalen);#endif                        nodes[dest].left2write = 0;                        MPID_nem_tcp_internal_queue_dequeue (&nodes[dest].internal_recv_queue, &cell);                        MPID_nem_queue_enqueue (MPID_nem_process_free_queue, cell);                        MPID_nem_tcp_internal_vars.n_pending_send--;                        MPID_nem_tcp_internal_vars.n_pending_sends[dest]--;		    }		}                else		{                    /* write() returned an error */                    MPIU_ERR_CHKANDJUMP1 (errno != EAGAIN, mpi_errno, MPI_ERR_OTHER, "**write", "**write %s", strerror (errno));		}	    }	}    } fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_MODULE_POLL_SEND);    return mpi_errno; fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_tcp_module_poll_recv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_tcp_module_poll_recv( void ){    int                  mpi_errno    = MPI_SUCCESS;    MPID_nem_cell_ptr_t  cell         = NULL;    fd_set               read_set     = MPID_nem_tcp_internal_vars.set;    int                  ret          = 0;    int                  index;    int                  grank;    int                  outstanding2 = 0 ;    int                  offset;    MPID_nem_pkt_t      *pkt          = NULL;    struct timeval       time;    node_t              *nodes        = MPID_nem_tcp_internal_vars.nodes ;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_MODULE_POLL_RECV);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_MODULE_POLL_RECV);    time.tv_sec  = 0;    time.tv_usec = 0;    ret = select (MPID_nem_tcp_internal_vars.max_fd, &read_set, NULL, NULL, &time);    MPIU_ERR_CHKANDJUMP1 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**select", "**select %s", strerror (errno));#ifdef TRACE    if(ret)	fprintf(stderr,		"[%i] -- RECV TCP select with ret : %i \n",		MPID_nem_mem_region.rank,		ret);#endif    while( (ret > 0) || (MPID_nem_tcp_internal_vars.outstanding > 0))    {	for(index = 0 ; index < MPID_nem_mem_region.ext_procs ; index++)	{	    grank = MPID_nem_mem_region.ext_ranks[index];	    if(FD_ISSET(nodes[grank].desc,&read_set))	    {		FD_CLR(nodes[grank].desc,&read_set);		ret--;		nodes[grank].toread       = 0;		MPID_nem_tcp_internal_vars.outstanding = 0;#ifdef TRACE		fprintf(stderr,"[%i] -- RECV TCP READ : desc is %i (index %i)\n",			MPID_nem_mem_region.rank,			nodes[grank].desc,			index);		/*MPID_nem_dump_queue( nodes[grank].internal_free_queue );*/#endif            main_routine :                /* handle pending recvs */                if(!MPID_nem_tcp_internal_queue_empty (nodes[grank].internal_free_queue))                {                    pkt = (MPID_nem_pkt_t *)MPID_NEM_CELL_TO_PACKET (nodes[grank].internal_free_queue.head); /* cast away volatile */                    if ((nodes[grank].left2read_head > 0) && (nodes[grank].left2read == 0))                    {                        do                        {                            offset = read(nodes[grank].desc,                                          (char *)pkt + nodes[grank].left2read_head,                                          MPID_NEM_OPT_HEAD_LEN - nodes[grank].left2read_head);                        }                        while (offset == -1 && errno == EINTR);                        if(offset != -1)                        {                            nodes[grank].left2read_head += offset;                            if(nodes[grank].left2read_head == MPID_NEM_OPT_HEAD_LEN)                            {                                nodes[grank].left2read_head = 0;                                if (pkt->mpich2.datalen > MPID_NEM_OPT_SIZE)                                {                                    nodes[grank].left2read = pkt->mpich2.datalen - MPID_NEM_OPT_SIZE;                                    do                                    {                                        offset =  read(nodes[grank].desc,                                                       (pkt->mpich2.payload + MPID_NEM_OPT_SIZE),                                                       nodes[grank].left2read);                                    }                                    while (offset == -1 && errno == EINTR);                                    if (offset != -1)                                    {                                        nodes[grank].left2read -= offset;                                        if (nodes[grank].left2read == 0)                                        {                                            MPID_nem_tcp_internal_queue_dequeue (&nodes[grank].internal_free_queue, &cell);                                            MPID_nem_queue_enqueue (MPID_nem_process_recv_queue, cell);                                            MPID_nem_tcp_internal_vars.n_pending_recv--;                                        }                                    }                                    else if (errno != EAGAIN)                                    {                                        /* read() returned an error *//*                                         printf ("read (fd=%d buf=%p len=%d) grank=%d\n", nodes[grank].desc, /\*DARIUS*\/ *//*                                                 (pkt->mpich2.payload + MPID_NEM_OPT_SIZE), /\*DARIUS*\/ *//*                                                 nodes[grank].left2read, grank); /\*DARIUS*\/ */                                        MPIU_ERR_SETANDJUMP1 (mpi_errno, MPI_ERR_OTHER, "**read", "**read %s", strerror (errno));                                    }                                    continue;                                }                                else                                {                                    nodes[grank].left2read = 0;                                    MPID_nem_tcp_internal_queue_dequeue (&nodes[grank].internal_free_queue, &cell);                                    MPID_nem_queue_enqueue (MPID_nem_process_recv_queue, cell);                                    MPID_nem_tcp_internal_vars.n_pending_recv--;                                    continue;                                }                            }#ifdef TRACE                            else                            {                                fprintf(stderr,"[%i] -- RECV TCP READ : NOT FULL HEAD YET !!!\n",MPID_nem_mem_region.rank);                            }#endif                        }                        else                        {                            if(errno != EAGAIN)                            {                                /* read() returned an error */                                MPIU_ERR_SETANDJUMP1 (mpi_errno, MPI_ERR_OTHER, "**read", "**read %s", strerror (errno));                            }                        }                    }                    else if (nodes[grank].left2read > 0)                    {                        do                        {                            offset = read(nodes[grank].desc,                                          (char *)&(pkt->mpich2.payload) + (pkt->mpich2.datalen - nodes[grank].left2read),                                          nodes[grank].left2read);                        }                        while (offset == -1 && errno == EINTR);                        if (offset != -1)                        {                            nodes[grank].left2read -= offset;#ifdef TRACE                            {                                int index;                                fprintf(stderr,                                        "[%i] -- RECV TCP READ : RETRY 3 for %i, got %i bytes [%i current/ %i total] \n",                                        MPID_nem_mem_region.rank,                                        grank,                                        offset,                                        (pkt->mpich2.datalen - nodes[grank].left2read),                                        pkt->mpich2.datalen);                            }#endif	                            if (nodes[grank].left2read == 0)                            {                                nodes[grank].left2read_head = 0;                                MPID_nem_tcp_internal_queue_dequeue (&nodes[grank].internal_free_queue, &cell);                                MPID_nem_queue_enqueue (MPID_nem_process_recv_queue, cell);                                MPID_nem_tcp_internal_vars.n_pending_recv--;                            }                        }                        else                        {                            if (errno != EAGAIN)                            {                                /* read() returned an error */                                MPIU_ERR_SETANDJUMP1 (mpi_errno, MPI_ERR_OTHER, "**read", "**read %s", strerror (errno));                            }                        }                    }                }                else                {                    /* handle regular Net Q */                    if (!MPID_nem_queue_empty(MPID_nem_module_tcp_free_queue))                    {                        MPID_nem_queue_dequeue (MPID_nem_module_tcp_free_queue, &cell);                        do                        {                            offset = read (nodes[grank].desc,                                           (MPID_nem_pkt_mpich2_t *)&(cell->pkt.mpich2), /* cast away volatile */                                           MPID_NEM_OPT_HEAD_LEN);                        }                        while (offset == -1 && errno == EINTR);#ifdef TRACE                        {				                            int index ;                            for(index = 0 ; index < ((cell->pkt.mpich2.datalen)/sizeof(int)); index ++)                            {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -