⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ch3_progress.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 4 页
字号:
    MPIDI_CH3I_Hash_entry  lresult;        MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SCTP_EVENT);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SCTP_EVENT);    switch (event->op_type)    {	case MPIDU_SCTP_OP_READ:	{            MPIDI_VC_t * vc;            int stream = event-> sri.sinfo_stream;	    int num_bytes = event-> num_bytes;            /* see if this is a new association */            result = hash_find(MPIDI_CH3I_assocID_table, (int4) event->sri.sinfo_assoc_id);            if((result == NULL) && (MPIDI_CH3I_onetomany_fd == event->fd) )            {                /* new association */                                result = &lresult;                result->assoc_id = event->sri.sinfo_assoc_id;                                /* check flags for MSG_EOR (i.e. a complete message) */		MPIU_Assert(event->user_value & MSG_EOR);                		MPIDI_PG_t * pg;		MPIDI_CH3_Pkt_t * pkt = event->user_ptr;		char * data_ptr = event->user_ptr;                              if(pkt->type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT)                {                    /*  a connect is being initiated by the other side, and we've never seen                     *   this association.                     */                                        /* so that this is done instantaneously, change the type to ACCEPT and call                     *  this recursively (this way we can put the accept code in its own place)                     */                    event->op_type = MPIDU_SCTP_OP_ACCEPT;                    mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(event);                    goto fn_exit;                }		/* a new association so the first message must contain the pg_id, or		 *  our protocol has been broken...		 */		MPIU_Assert(pkt->type == MPIDI_CH3I_PKT_SC_OPEN_REQ);                                		/* read pg_id from event->user_ptr and lookup VC using pg_id */		data_ptr += sizeof(MPIDI_CH3_Pkt_t);   /* assumed to be NULL terminated */		mpi_errno = MPIDI_PG_Find(data_ptr, &pg);		if (pg == NULL) {                    		    MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER,					 "**pglookup",					 "**pglookup %s", data_ptr);		}                /* get VC from an existing PG */		MPIDI_PG_Get_vc(pg, pkt->sc_open_req.pg_rank, &vc);		MPIU_Assert(vc->pg_rank == pkt->sc_open_req.pg_rank);		result->vc = vc;                vc->ch.sinfo_assoc_id = result->assoc_id;                    		/* will insert fully populated hash_entry below */                hash_insert(MPIDI_CH3I_assocID_table, result);            }            else            {                /* we have seen this association before */                MPIDI_VC_t * vc;                MPIDI_CH3_Pkt_t * pkt =  event->user_ptr;                MPIDI_msg_sz_t buflen = sizeof (MPIDI_CH3_Pkt_t);                if(MPIDI_CH3I_dynamic_tmp_fd == event->fd)                {                    if(MPIDI_CH3I_dynamic_tmp_vc == NULL &&                       pkt-> type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT)                    {                        /*  the other side is ack'ing our connect.  */                        /* TODO assert received on control stream? */                                                /*  process this instantaneously. Change the type to ACCEPT                         *  and call this recursively (this way we can put the                         *  accept code in its own place)                         */                        event->op_type = MPIDU_SCTP_OP_ACCEPT;                        mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(event);                        goto fn_exit;                    }                    else                    {                        /* FIXME is it possible for tmp_vc to be non-NULL and pkt                         *         to be an accept pkt?                         */                                                /* in the middle of an MPI_Connect/Accept, so use tmp_vc */                        vc = MPIDI_CH3I_dynamic_tmp_vc;                    }                }                                else                {                    /* business as usual */                    vc = result-> vc;                }                char* adv_buffer = (char*) event-> user_ptr;		/* VC cannot be NULL */		MPIU_Assert(vc != NULL);                        		/* whatever is in the adv_buffer is not control packet */		MPID_Request* rreq = RECV_ACTIVE(vc, stream);		MPID_IOV* iovp;					int actual_bytes_read = event-> num_bytes;		/* if recv active is NULL, we are waiting for a pkt/(pkt+msg) */		if(rreq == NULL) {		    /* FIXME  what if not a full pkt (!MSG_EOR)? */		    		    MPIU_Assert(num_bytes >= sizeof(MPIDI_CH3_Pkt_t));					    /* if it's connection PKT, discard */		    if(pkt-> type == MPIDI_CH3I_PKT_SC_OPEN_REQ) 			break;                    /* need to do ACCEPT stuff if this is an initial pkt */                    if(pkt-> type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT)                    {                        /* don't know if this can happen here... */                        event->op_type = MPIDU_SCTP_OP_ACCEPT;                        mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(event);                        goto fn_exit;                    }		    		    mpi_errno = MPIDI_CH3U_Handle_recv_pkt(vc, pkt, &buflen, &rreq);		    		    if (mpi_errno != MPI_SUCCESS) {			MPIU_ERR_POP(mpi_errno);		    }		    		    /*  rreq can equal NULL (e.g. in close protocol), so need to		     *    break cases apart to avoid dereferencing a NULL ptr		     */		    if(rreq) {				/* minus the size of envelope */			actual_bytes_read -= sizeof(*pkt); 			adv_buffer += sizeof(*pkt);		    }		} 		mpi_errno = read_from_advbuf_and_adjust(vc, stream, actual_bytes_read,							adv_buffer, rreq);		            }	    	    break;	}    case MPIDU_SCTP_OP_WRITE:	{	    MPID_IOV* iovp;            MPIU_Size_t nb;	    int iov_cnt;	    SCTP_IOV* post_ptr = NULL;            /* retrieve the information from the event */	    int event_fd = event->fd;	    MPIDI_VC_t* vc = (MPIDI_VC_t*) event-> user_ptr;  /* points to VC for writes */	    int stream_no = event->user_value;	    MPID_Request* nextReq = NULL;	    	    /* --BEGIN ERROR HANDLING-- */	    if (event->error != MPI_SUCCESS) {		mpi_errno = event->error;		MPIU_ERR_POP(mpi_errno);	    }	    /* --END ERROR HANDLING-- */	    	    MPIU_Assert(SEND_ACTIVE(vc, stream_no) != NULL);	    	    MPID_Request* sreq = SEND_ACTIVE(vc, stream_no);	    	    mpi_errno = MPIDI_CH3U_Handle_send_req(vc, sreq, &complete);	    if (mpi_errno != MPI_SUCCESS) {		MPIU_ERR_POP(mpi_errno);	    }	    	    if (complete){				if(MPIDI_CH3I_VC_STATE_CONNECTING == SEND_CONNECTED(vc, stream_no)) {		    SEND_CONNECTED(vc, stream_no) = MPIDI_CH3I_VC_STATE_CONNECTED;#if 0                    /* this code sets a threshold for the number of connection pkts sent                     *   before setting them all to connected.  Connection pkts are required                     *   so that the assoc_id -> VC hash can be appropriately populated.  To                     *   be 100% sure, a separate connection pkt should be sent for each                     *   stream.  This code is an optimization to only send connection pkts                     *   up to a particular threshold.                     */                                         		    vc->ch.send_init_count++;		    if(vc->ch.send_init_count >= CONNECT_THRESHOLD) {			int i = 0;			for(i = 0; i < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; i++) {			    if(SEND_CONNECTED(vc, i) != MPIDI_CH3I_VC_STATE_CONNECTING) 				SEND_CONNECTED(vc, i) = MPIDI_CH3I_VC_STATE_CONNECTED;			}			vc->ch.send_init_count = -1;		    }#endif		}				stream_post_sendq_req(vc, stream_no);	    } else /* more data to send */ {			MPIDU_Sctp_post_writev(vc, sreq, 0, NULL, stream_no); 		    }                         	    break;	}	    	    	case MPIDU_SCTP_OP_CLOSE:	{            MPIDI_VC_t* vc = (MPIDI_VC_t*) event-> user_ptr;  /* points to VC for close */                        MPIU_Assert(vc->state == MPIDI_VC_STATE_CLOSE_ACKED);            if(vc == MPIDI_CH3I_dynamic_tmp_vc) {                MPIU_Assert(MPIDI_CH3I_dynamic_tmp_fd == vc->ch.fd);                close(MPIDI_CH3I_dynamic_tmp_fd);                MPIDI_CH3I_dynamic_tmp_fd = -1;                MPIDI_CH3I_dynamic_tmp_vc = NULL;                /* free vc->ch.pkt, unneeded so far as initial assertion above holds */                            if(vc->ch.pkt)                    MPIU_Free(vc->ch.pkt);                vc->ch.pkt = NULL;                            } else {                /* not a temporary VC */                if(vc->ref_count == 0 && vc->pg != NULL  && vc->pg->ref_count == 1 ) {                    /* MPIDI_PG_Destroy will be called in the upcall below, so do the                     *  necessary steps since this VC will be destroyed.                     */                    /* this can happen either in finalize or MPI_Comm_disconnect (or both with an                     *  intermediate MPI_Connect/Accept!)                     */                                    /* free vc->ch.pkt, unneeded so far as initial assertion above holds */                                if(vc->ch.pkt)                        MPIU_Free(vc->ch.pkt);                    vc->ch.pkt = NULL;                                    } else {                    /* reset the VC as if it is new */                    int i,loop;                    MPIDI_CH3I_SCTP_Stream_t* str_ptr = NULL;                    MPID_IOV* iov_ptr = NULL;                    MPIDI_CH3_Pkt_t* temp = vc->ch.pkt;                        /* reset state in stream table for sending connection pkts */                    for(i = 0; i < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; i++) {                        str_ptr = &(vc->ch.stream_table[i]);                        /* requires nothing outstanding on this VC */                        MPIU_Assert(str_ptr->sendQ_head == NULL);                        MPIU_Assert(str_ptr->send_active == NULL);                        MPIU_Assert(str_ptr->recv_active == NULL);                        STREAM_INIT(str_ptr);                    }                    /* reset relevant channel fields so connection pkts will be resent */                    vc->ch.state =  MPIDI_CH3I_VC_STATE_UNCONNECTED;                    vc->ch.send_init_count = 0;                    /* if we've called MPIDI_CH3I_VC_post_connect, reset iov's for connection pkt */                    if(temp) {                        /* FIXME done elsewhere so add this to a function */                        for(loop=0; loop < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; loop++) {                            iov_ptr = VC_IOV(vc, loop);                            iov_ptr[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) temp;                            iov_ptr[0].MPID_IOV_LEN = (int) sizeof(*temp);                                                        iov_ptr[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) MPIDI_Process.my_pg->id;                            iov_ptr[1].MPID_IOV_LEN = temp-> sc_open_req.pg_id_len;		       	                        }                    }                                    }                                /* nothing left is sent/recv'd on vc in upcall. remove from assoc_id -> VC hash  */                hash_delete(MPIDI_CH3I_assocID_table, vc->ch.sinfo_assoc_id);            }                            mpi_errno = MPIDI_CH3U_Handle_connection(vc, MPIDI_VC_EVENT_TERMINATED);            if(mpi_errno != MPI_SUCCESS)                goto fn_fail;	    break;	}	/* 	case MPIDU_SOCK_OP_WAKEUP: *//* 	{ *//* 	    MPIDI_CH3_Progress_signal_completion(); *//* 	    /\* MPIDI_CH3I_progress_completion_count++; *\/ *//* 	    break; *//* 	} */    case MPIDU_SCTP_OP_ACCEPT:    {        MPIDI_VC_t *vc;         MPIDI_CH3_Pkt_t * pkt = event->user_ptr;        char * data_ptr = event->user_ptr;                char host_description[MAX_HOST_DESCRIPTION_LEN];        int port;        MPIDU_Sock_ifaddr_t ifaddr;        int hasIfaddr = 0;                MPIU_Assert(pkt->type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT);        /* allocate tmp VC.  this VC is used to exchange PG info w/ dynamic procs */        vc = (MPIDI_VC_t *) MPIU_Malloc(sizeof(MPIDI_VC_t));        /* --BEGIN ERROR HANDLING-- */        if (vc == NULL)        {            mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME,                                             __LINE__, MPI_ERR_OTHER,                                             "**nomem", NULL);            goto fn_exit;        }        /* VC marked as temporary (that what a NULL PG means) */        MPIDI_VC_Init(vc, NULL, 0);        MPIDI_CH3_VC_Init(vc);        /* set the port_name_tag on this VC for verification later */        vc->ch.port_name_tag = pkt->sc_conn_accept.port_name_tag;                            /* data after pkt contains bizcard */        data_ptr += sizeof(MPIDI_CH3_Pkt_t);                    mpi_errno = MPIDU_Sctp_get_conninfo_from_bc( data_ptr, host_description,                                                        sizeof(host_description),                                                        &port, &ifaddr, &hasIfaddr );        if(mpi_errno) {            MPIU_ERR_POP(mpi_errno);        }        /*  save the sockaddr_in of new VC */        giveMeSockAddr(ifaddr.ifaddr, port, &vc->ch.to_address);                               if(pkt->sc_conn_accept.ack) {            /* this is the connect side */                        /* the fd was opened already, so that we could pass the SCTP port # to             *  the accept side in the initial bizcard.             */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -