📄 ch3_progress.c
字号:
MPIDI_CH3I_Hash_entry lresult; MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SCTP_EVENT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_HANDLE_SCTP_EVENT); switch (event->op_type) { case MPIDU_SCTP_OP_READ: { MPIDI_VC_t * vc; int stream = event-> sri.sinfo_stream; int num_bytes = event-> num_bytes; /* see if this is a new association */ result = hash_find(MPIDI_CH3I_assocID_table, (int4) event->sri.sinfo_assoc_id); if((result == NULL) && (MPIDI_CH3I_onetomany_fd == event->fd) ) { /* new association */ result = &lresult; result->assoc_id = event->sri.sinfo_assoc_id; /* check flags for MSG_EOR (i.e. a complete message) */ MPIU_Assert(event->user_value & MSG_EOR); MPIDI_PG_t * pg; MPIDI_CH3_Pkt_t * pkt = event->user_ptr; char * data_ptr = event->user_ptr; if(pkt->type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT) { /* a connect is being initiated by the other side, and we've never seen * this association. */ /* so that this is done instantaneously, change the type to ACCEPT and call * this recursively (this way we can put the accept code in its own place) */ event->op_type = MPIDU_SCTP_OP_ACCEPT; mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(event); goto fn_exit; } /* a new association so the first message must contain the pg_id, or * our protocol has been broken... */ MPIU_Assert(pkt->type == MPIDI_CH3I_PKT_SC_OPEN_REQ); /* read pg_id from event->user_ptr and lookup VC using pg_id */ data_ptr += sizeof(MPIDI_CH3_Pkt_t); /* assumed to be NULL terminated */ mpi_errno = MPIDI_PG_Find(data_ptr, &pg); if (pg == NULL) { MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**pglookup", "**pglookup %s", data_ptr); } /* get VC from an existing PG */ MPIDI_PG_Get_vc(pg, pkt->sc_open_req.pg_rank, &vc); MPIU_Assert(vc->pg_rank == pkt->sc_open_req.pg_rank); result->vc = vc; vc->ch.sinfo_assoc_id = result->assoc_id; /* will insert fully populated hash_entry below */ hash_insert(MPIDI_CH3I_assocID_table, result); } else { /* we have seen this association before */ MPIDI_VC_t * vc; MPIDI_CH3_Pkt_t * pkt = event->user_ptr; MPIDI_msg_sz_t buflen = sizeof (MPIDI_CH3_Pkt_t); if(MPIDI_CH3I_dynamic_tmp_fd == event->fd) { if(MPIDI_CH3I_dynamic_tmp_vc == NULL && pkt-> type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT) { /* the other side is ack'ing our connect. */ /* TODO assert received on control stream? */ /* process this instantaneously. Change the type to ACCEPT * and call this recursively (this way we can put the * accept code in its own place) */ event->op_type = MPIDU_SCTP_OP_ACCEPT; mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(event); goto fn_exit; } else { /* FIXME is it possible for tmp_vc to be non-NULL and pkt * to be an accept pkt? */ /* in the middle of an MPI_Connect/Accept, so use tmp_vc */ vc = MPIDI_CH3I_dynamic_tmp_vc; } } else { /* business as usual */ vc = result-> vc; } char* adv_buffer = (char*) event-> user_ptr; /* VC cannot be NULL */ MPIU_Assert(vc != NULL); /* whatever is in the adv_buffer is not control packet */ MPID_Request* rreq = RECV_ACTIVE(vc, stream); MPID_IOV* iovp; int actual_bytes_read = event-> num_bytes; /* if recv active is NULL, we are waiting for a pkt/(pkt+msg) */ if(rreq == NULL) { /* FIXME what if not a full pkt (!MSG_EOR)? */ MPIU_Assert(num_bytes >= sizeof(MPIDI_CH3_Pkt_t)); /* if it's connection PKT, discard */ if(pkt-> type == MPIDI_CH3I_PKT_SC_OPEN_REQ) break; /* need to do ACCEPT stuff if this is an initial pkt */ if(pkt-> type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT) { /* don't know if this can happen here... */ event->op_type = MPIDU_SCTP_OP_ACCEPT; mpi_errno = MPIDI_CH3I_Progress_handle_sctp_event(event); goto fn_exit; } mpi_errno = MPIDI_CH3U_Handle_recv_pkt(vc, pkt, &buflen, &rreq); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } /* rreq can equal NULL (e.g. in close protocol), so need to * break cases apart to avoid dereferencing a NULL ptr */ if(rreq) { /* minus the size of envelope */ actual_bytes_read -= sizeof(*pkt); adv_buffer += sizeof(*pkt); } } mpi_errno = read_from_advbuf_and_adjust(vc, stream, actual_bytes_read, adv_buffer, rreq); } break; } case MPIDU_SCTP_OP_WRITE: { MPID_IOV* iovp; MPIU_Size_t nb; int iov_cnt; SCTP_IOV* post_ptr = NULL; /* retrieve the information from the event */ int event_fd = event->fd; MPIDI_VC_t* vc = (MPIDI_VC_t*) event-> user_ptr; /* points to VC for writes */ int stream_no = event->user_value; MPID_Request* nextReq = NULL; /* --BEGIN ERROR HANDLING-- */ if (event->error != MPI_SUCCESS) { mpi_errno = event->error; MPIU_ERR_POP(mpi_errno); } /* --END ERROR HANDLING-- */ MPIU_Assert(SEND_ACTIVE(vc, stream_no) != NULL); MPID_Request* sreq = SEND_ACTIVE(vc, stream_no); mpi_errno = MPIDI_CH3U_Handle_send_req(vc, sreq, &complete); if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); } if (complete){ if(MPIDI_CH3I_VC_STATE_CONNECTING == SEND_CONNECTED(vc, stream_no)) { SEND_CONNECTED(vc, stream_no) = MPIDI_CH3I_VC_STATE_CONNECTED;#if 0 /* this code sets a threshold for the number of connection pkts sent * before setting them all to connected. Connection pkts are required * so that the assoc_id -> VC hash can be appropriately populated. To * be 100% sure, a separate connection pkt should be sent for each * stream. This code is an optimization to only send connection pkts * up to a particular threshold. */ vc->ch.send_init_count++; if(vc->ch.send_init_count >= CONNECT_THRESHOLD) { int i = 0; for(i = 0; i < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; i++) { if(SEND_CONNECTED(vc, i) != MPIDI_CH3I_VC_STATE_CONNECTING) SEND_CONNECTED(vc, i) = MPIDI_CH3I_VC_STATE_CONNECTED; } vc->ch.send_init_count = -1; }#endif } stream_post_sendq_req(vc, stream_no); } else /* more data to send */ { MPIDU_Sctp_post_writev(vc, sreq, 0, NULL, stream_no); } break; } case MPIDU_SCTP_OP_CLOSE: { MPIDI_VC_t* vc = (MPIDI_VC_t*) event-> user_ptr; /* points to VC for close */ MPIU_Assert(vc->state == MPIDI_VC_STATE_CLOSE_ACKED); if(vc == MPIDI_CH3I_dynamic_tmp_vc) { MPIU_Assert(MPIDI_CH3I_dynamic_tmp_fd == vc->ch.fd); close(MPIDI_CH3I_dynamic_tmp_fd); MPIDI_CH3I_dynamic_tmp_fd = -1; MPIDI_CH3I_dynamic_tmp_vc = NULL; /* free vc->ch.pkt, unneeded so far as initial assertion above holds */ if(vc->ch.pkt) MPIU_Free(vc->ch.pkt); vc->ch.pkt = NULL; } else { /* not a temporary VC */ if(vc->ref_count == 0 && vc->pg != NULL && vc->pg->ref_count == 1 ) { /* MPIDI_PG_Destroy will be called in the upcall below, so do the * necessary steps since this VC will be destroyed. */ /* this can happen either in finalize or MPI_Comm_disconnect (or both with an * intermediate MPI_Connect/Accept!) */ /* free vc->ch.pkt, unneeded so far as initial assertion above holds */ if(vc->ch.pkt) MPIU_Free(vc->ch.pkt); vc->ch.pkt = NULL; } else { /* reset the VC as if it is new */ int i,loop; MPIDI_CH3I_SCTP_Stream_t* str_ptr = NULL; MPID_IOV* iov_ptr = NULL; MPIDI_CH3_Pkt_t* temp = vc->ch.pkt; /* reset state in stream table for sending connection pkts */ for(i = 0; i < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; i++) { str_ptr = &(vc->ch.stream_table[i]); /* requires nothing outstanding on this VC */ MPIU_Assert(str_ptr->sendQ_head == NULL); MPIU_Assert(str_ptr->send_active == NULL); MPIU_Assert(str_ptr->recv_active == NULL); STREAM_INIT(str_ptr); } /* reset relevant channel fields so connection pkts will be resent */ vc->ch.state = MPIDI_CH3I_VC_STATE_UNCONNECTED; vc->ch.send_init_count = 0; /* if we've called MPIDI_CH3I_VC_post_connect, reset iov's for connection pkt */ if(temp) { /* FIXME done elsewhere so add this to a function */ for(loop=0; loop < MPICH_SCTP_NUM_REQS_ACTIVE_TO_INIT; loop++) { iov_ptr = VC_IOV(vc, loop); iov_ptr[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) temp; iov_ptr[0].MPID_IOV_LEN = (int) sizeof(*temp); iov_ptr[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) MPIDI_Process.my_pg->id; iov_ptr[1].MPID_IOV_LEN = temp-> sc_open_req.pg_id_len; } } } /* nothing left is sent/recv'd on vc in upcall. remove from assoc_id -> VC hash */ hash_delete(MPIDI_CH3I_assocID_table, vc->ch.sinfo_assoc_id); } mpi_errno = MPIDI_CH3U_Handle_connection(vc, MPIDI_VC_EVENT_TERMINATED); if(mpi_errno != MPI_SUCCESS) goto fn_fail; break; } /* case MPIDU_SOCK_OP_WAKEUP: *//* { *//* MPIDI_CH3_Progress_signal_completion(); *//* /\* MPIDI_CH3I_progress_completion_count++; *\/ *//* break; *//* } */ case MPIDU_SCTP_OP_ACCEPT: { MPIDI_VC_t *vc; MPIDI_CH3_Pkt_t * pkt = event->user_ptr; char * data_ptr = event->user_ptr; char host_description[MAX_HOST_DESCRIPTION_LEN]; int port; MPIDU_Sock_ifaddr_t ifaddr; int hasIfaddr = 0; MPIU_Assert(pkt->type == MPIDI_CH3I_PKT_SC_CONN_ACCEPT); /* allocate tmp VC. this VC is used to exchange PG info w/ dynamic procs */ vc = (MPIDI_VC_t *) MPIU_Malloc(sizeof(MPIDI_VC_t)); /* --BEGIN ERROR HANDLING-- */ if (vc == NULL) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", NULL); goto fn_exit; } /* VC marked as temporary (that what a NULL PG means) */ MPIDI_VC_Init(vc, NULL, 0); MPIDI_CH3_VC_Init(vc); /* set the port_name_tag on this VC for verification later */ vc->ch.port_name_tag = pkt->sc_conn_accept.port_name_tag; /* data after pkt contains bizcard */ data_ptr += sizeof(MPIDI_CH3_Pkt_t); mpi_errno = MPIDU_Sctp_get_conninfo_from_bc( data_ptr, host_description, sizeof(host_description), &port, &ifaddr, &hasIfaddr ); if(mpi_errno) { MPIU_ERR_POP(mpi_errno); } /* save the sockaddr_in of new VC */ giveMeSockAddr(ifaddr.ifaddr, port, &vc->ch.to_address); if(pkt->sc_conn_accept.ack) { /* this is the connect side */ /* the fd was opened already, so that we could pass the SCTP port # to * the accept side in the initial bizcard. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -