⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sock_wait.i

📁 fortran并行计算包
💻 I
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. *//* Make sure that we can properly ensure atomic access to the poll routine */#ifdef MPICH_IS_THREADED#if (USE_THREAD_IMPL != MPICH_THREAD_IMPL_GLOBAL_MUTEX)#error selected multi-threaded implementation is not supported#endif#endifstatic int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, 				      struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd, 				      struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_read(struct pollfd * const pollfd, 				   struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_write(struct pollfd * const pollfd, 				    struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd, 				      struct pollinfo * const pollinfo);/* * MPIDU_Sock_wait() * * NOTES: * * For fatal errors, the state of the connection progresses directly to the  * failed state and the connection is marked inactive in * the poll array.  Under normal conditions, the fatal error should result in  * the termination of the process; but, if that * doesn't happen, we try to leave the implementation in a somewhat sane state. */#undef FUNCNAME#define FUNCNAME MPIDU_Sock_wait#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDU_Sock_wait(struct MPIDU_Sock_set * sock_set, int millisecond_timeout,		    struct MPIDU_Sock_event * eventp){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_WAIT);    MPIDI_STATE_DECL(MPID_STATE_POLL);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_WAIT);    for (;;)    { 	int elem=0;   /* Keep compiler happy */	int n_fds;	int n_elems;	int found_active_elem = FALSE;	mpi_errno = MPIDU_Socki_event_dequeue(sock_set, &elem, eventp);	if (mpi_errno == MPI_SUCCESS) {	    struct pollinfo * pollinfo;	    int flags;	    	    if (eventp->op_type != MPIDU_SOCK_OP_CLOSE)	    {		break;	    }	    pollinfo = &sock_set->pollinfos[elem];	    /*	     * Attempt to set socket back to blocking.  This *should* prevent 	     * any data in the socket send buffer from being	     * discarded.  Instead close() will block until the buffer is 	     * flushed or the connection timeouts and is considered	     * lost.  Theoretically, this could cause the MPIDU_Sock_wait() to	     * hang indefinitely; however, the calling code	     * should ensure this will not happen by going through a shutdown 	     * protocol before posting a close operation.	     *	     * FIXME: If the attempt to set the socket back to blocking fails, 	     * we presently ignore it.  Should we return an	     * error?  We need to define acceptible data loss at close time.  	     * MS Windows has worse problems with this, so it	     * may not be possible to make any guarantees.	     */	    flags = fcntl(pollinfo->fd, F_GETFL, 0);	    if (flags != -1)	    { 		fcntl(pollinfo->fd, F_SETFL, flags & ~O_NONBLOCK);	    }	    /* FIXME: return code?  If an error occurs do we return it 	       instead of the error specified in the event? */	    close(pollinfo->fd);	    MPIDU_Socki_sock_free(pollinfo->sock);	    break;	}	for(;;)	{#	    ifndef MPICH_IS_THREADED	    {		MPIDI_FUNC_ENTER(MPID_STATE_POLL);		n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, 			     millisecond_timeout);		MPIDI_FUNC_EXIT(MPID_STATE_POLL);	    }#	    else /* MPICH_IS_THREADED */	    {		/* If we've enabled runtime checking of the thread level,		 then test for that and if we are *not* multithreaded, 		 just use the same code as above.  Otherwise, use 		 multithreaded code (and we don't then need the 		 MPIU_THREAD_CHECK_BEGIN/END macros) */#ifdef HAVE_RUNTIME_THREADCHECK		if (!MPIR_ThreadInfo.isThreaded) {		    MPIDI_FUNC_ENTER(MPID_STATE_POLL);		    n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, 				 millisecond_timeout);		    MPIDI_FUNC_EXIT(MPID_STATE_POLL);		}		else#endif		{    		/*		 * First try a non-blocking poll to see if any immediate 		 * progress can be made.  This avoids the lock manipulation		 * overhead.		 */		MPIDI_FUNC_ENTER(MPID_STATE_POLL);		n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, 0);		MPIDI_FUNC_EXIT(MPID_STATE_POLL);				if (n_fds == 0 && millisecond_timeout != 0)		{		    int pollfds_active_elems = sock_set->poll_array_elems;				    sock_set->pollfds_active = sock_set->pollfds;		    		    /* Release the lock so that other threads may make 		       progress while this thread waits for something to 		       do */		    MPIU_DBG_MSG(THREAD,TYPICAL,"Exit global critical section");		    MPID_Thread_mutex_unlock(&MPIR_ThreadInfo.global_mutex);			    		    MPIDI_FUNC_ENTER(MPID_STATE_POLL);		    n_fds = poll(sock_set->pollfds_active, 				 pollfds_active_elems, millisecond_timeout);		    MPIDI_FUNC_EXIT(MPID_STATE_POLL);		    		    /* Reaquire the lock before processing any of the 		       information returned from poll */		    MPIU_DBG_MSG(THREAD,TYPICAL,"Enter global critical section");		    MPID_Thread_mutex_lock(&MPIR_ThreadInfo.global_mutex);		    /*		     * Update pollfds array if changes were posted while we 		     * were blocked in poll		     */		    if (sock_set->pollfds_updated) {			mpi_errno = MPIDI_Sock_update_sock_set( 				       sock_set, pollfds_active_elems );		    }		    sock_set->pollfds_active = NULL;		    sock_set->wakeup_posted = FALSE;		}		} /* else !MPIR_ThreadInfo.isThreaded */	    } #	    endif /* MPICH_IS_THREADED */	    if (n_fds > 0)	    {		break;	    }	    else if (n_fds == 0)	    {		mpi_errno = MPIDU_SOCK_ERR_TIMEOUT;		goto fn_exit;	    }	    else if (errno == EINTR)	    {		if (millisecond_timeout != MPIDU_SOCK_INFINITE_TIME)		{		    mpi_errno = MPIDU_SOCK_ERR_TIMEOUT;		    goto fn_exit;		}		continue;	    }	    /* --BEGIN ERROR HANDLING-- */	    else if (errno == ENOMEM || errno == EAGAIN)	    {		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM,						 "**sock|osnomem", NULL);		goto fn_exit;	    }	    else	    {		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL,						 "**sock|oserror", "**sock|poll|oserror %d %s", errno, MPIU_Strerror(errno));		goto fn_exit;	    }	    /* --END ERROR HANDLING-- */	}	elem = sock_set->starting_elem;	n_elems = sock_set->poll_array_elems;	while (n_fds > 0 && n_elems > 0)	{	    /*	     * Acquire pointers to the pollfd and pollinfo structures for the next element	     *	     * NOTE: These pointers could become stale, if a new sock were to be allocated during the processing of the element.	     * At present, none of the handler routines allocate a sock, so the issue does not arise.	     */	    struct pollfd * const pollfd = &sock_set->pollfds[elem];	    struct pollinfo * const pollinfo = &sock_set->pollinfos[elem];		    MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1);	    MPIU_Assert(pollfd->fd >= 0 || pollfd->fd == -1);				    if (pollfd->fd < 0 || pollfd->revents == 0)	    {		/* This optimization assumes that most FDs will not have a pending event. */		n_elems -= 1;		elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;		continue;	    }	    if (found_active_elem == FALSE)	    {		found_active_elem = TRUE;		sock_set->starting_elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;	    }	    if (pollfd->revents & POLLNVAL)	    {		mpi_errno = MPIR_Err_create_code(		    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|badhandle",		    "**sock|poll|badhandle %d %d %d %d", pollinfo->sock_set->id, pollinfo->sock_id, pollfd->fd, pollinfo->fd);		goto fn_exit;	    }	    /* --BEGIN ERROR HANDLING-- */	    if (pollfd->revents & POLLHUP)	    {		mpi_errno = MPIDU_Socki_handle_pollhup(pollfd, pollinfo);		if (MPIR_Err_is_fatal(mpi_errno))		{		    goto fn_exit;		}	    }	    /* According to Stevens, some errors are reported as normal data 	       (POLLIN) and some are reported with POLLERR. */	    if (pollfd->revents & POLLERR)	    {		mpi_errno = MPIDU_Socki_handle_pollerr(pollfd, pollinfo);		if (MPIR_Err_is_fatal(mpi_errno))		{		    goto fn_exit;		}	    }	    /* --END ERROR HANDLING-- */	    	    if (pollfd->revents & POLLIN)	    {		if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION)		{ 		    if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW || 			pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO)		    {			mpi_errno = MPIDU_Socki_handle_read(pollfd, pollinfo);			/* --BEGIN ERROR HANDLING-- */			if (MPIR_Err_is_fatal(mpi_errno))			{			    goto fn_exit;			}			/* --END ERROR HANDLING-- */		    }		    /* --BEGIN ERROR HANDLING-- */		    else		    {			mpi_errno = MPIR_Err_create_code(			    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",			    "**sock|poll|unhandledstate %d", pollinfo->state);			goto fn_exit;		    }		    /* --END ERROR HANDLING-- */		}		else if (pollinfo->type == MPIDU_SOCKI_TYPE_LISTENER)		{		    pollfd->events &= ~POLLIN;		    MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_ACCEPT, 0, pollinfo->user_ptr,					      MPI_SUCCESS, mpi_errno, fn_exit);		}	else if ((MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) && pollinfo->type == MPIDU_SOCKI_TYPE_INTERRUPTER)		{		    char c[16];		    int nb;		    do		    {			nb = read(pollfd->fd, c, 16);		    }		    while (nb > 0 || (nb < 0 && errno == EINTR));		}		/* --BEGIN ERROR HANDLING-- */		else		{		    mpi_errno = MPIR_Err_create_code(			MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",			"**sock|poll|unhandledtype %d", pollinfo->type);		    goto fn_exit;		}		/* --END ERROR HANDLING-- */	    }	    if (pollfd->revents & POLLOUT)	    {		if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION)		{		    if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)		    {			mpi_errno = MPIDU_Socki_handle_write(pollfd, pollinfo);			/* --BEGIN ERROR HANDLING-- */			if (MPIR_Err_is_fatal(mpi_errno))			{			    goto fn_exit;			}			/* --END ERROR HANDLING-- */		    }		    else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING)		    {			mpi_errno = MPIDU_Socki_handle_connect(pollfd, pollinfo);			/* --BEGIN ERROR HANDLING-- */			if (MPIR_Err_is_fatal(mpi_errno))			{			    goto fn_exit;			}			/* --END ERROR HANDLING-- */		    }		    /* --BEGIN ERROR HANDLING-- */		    else		    {			mpi_errno = MPIR_Err_create_code(			    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",			    "**sock|poll|unhandledstate %d", pollinfo->state);			goto fn_exit;		    }		    /* --END ERROR HANDLING-- */		}		/* --BEGIN ERROR HANDLING-- */		else		{		    mpi_errno = MPIR_Err_create_code(			MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",			"**sock|poll|unhandledtype %d", pollinfo->type);		    goto fn_exit;		}		/* --END ERROR HANDLING-- */	    }	    n_fds -= 1;	    n_elems -= 1;	    elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;	}    }      fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Socki_handle_pollhup#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);        if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)    {	/*	 * If a write was posted then cancel it and generate an connection closed event.  If a read is posted, it will be handled	 * by the POLLIN handler.	 */	/* --BEGIN ERROR HANDLING-- */	if (pollfd->events & POLLOUT)	{	    int event_mpi_errno;	    	    event_mpi_errno = MPIR_Err_create_code(		MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_CONN_CLOSED,		"**sock|connclosed", "**sock|connclosed %d %d", pollinfo->sock_set->id, pollinfo->sock_id);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -