⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sock_wait.i

📁 mpi并行计算的c++代码 可用vc或gcc编译通过 可以用来搭建并行计算试验环境
💻 I
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_pollerr(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_read(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_write(struct pollfd * const pollfd, struct pollinfo * const pollinfo);static int MPIDU_Socki_handle_connect(struct pollfd * const pollfd, struct pollinfo * const pollinfo);/* * MPIDU_Sock_wait() * * NOTES: * * For fatal errors, the state of the connection progresses directly to the failed state and the connection is marked inactive in * the poll array.  Under normal conditions, the fatal error should result in the termination of the process; but, if that * doesn't happen, we try to leave the implementation in a somewhat sane state. */#undef FUNCNAME#define FUNCNAME MPIDU_Sock_wait#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)int MPIDU_Sock_wait(struct MPIDU_Sock_set * sock_set, int millisecond_timeout, struct MPIDU_Sock_event * eventp){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_WAIT);    MPIDI_STATE_DECL(MPID_STATE_POLL);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_WAIT);    for (;;)    { 	int elem;	int n_fds;	int n_elems;	int found_active_elem = FALSE;		if (MPIDU_Socki_event_dequeue(sock_set, &elem, eventp) == MPI_SUCCESS)	{	    struct pollinfo * pollinfo;	    int flags;	    	    if (eventp->op_type != MPIDU_SOCK_OP_CLOSE)	    {		break;	    }	    pollinfo = &sock_set->pollinfos[elem];	    /*	     * Attempt to set socket back to blocking.  This *should* prevent any data in the socket send buffer from being	     * discarded.  Instead close() will block until the buffer is flushed or the connection timeouts and is considered	     * lost.  Theoretically, this could cause the MPIDU_Sock_wait() to hang indefinitely; however, the calling code	     * should ensure this will not happen by going through a shutdown protocol before posting a close operation.	     *	     * FIXME: If the attempt to set the socket back to blocking fails, we presently ignore it.  Should we return an	     * error?  We need to define acceptible data loss at close time.  MS Windows has worse problems with this, so it	     * may not be possible to make any guarantees.	     */	    flags = fcntl(pollinfo->fd, F_GETFL, 0);	    if (flags != -1)	    { 		fcntl(pollinfo->fd, F_SETFL, flags & ~O_NONBLOCK);	    }	    /* FIXME: return code?  If an error occurs do we return it instead of the error specified in the event? */	    close(pollinfo->fd);	    MPIDU_Socki_sock_free(pollinfo->sock);	    break;	}	for(;;)	{#	    if (MPICH_THREAD_LEVEL < MPI_THREAD_MULTIPLE)	    {		MPIDI_FUNC_ENTER(MPID_STATE_POLL);		n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, millisecond_timeout);		MPIDI_FUNC_EXIT(MPID_STATE_POLL);	    }#	    else /* (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) */	    {		/*		 * First try a non-blocking poll to see if any immediate progress can be made.  This avoids the lock manipulation		 * overhead.		 */		MPIDI_FUNC_ENTER(MPID_STATE_POLL);		n_fds = poll(sock_set->pollfds, sock_set->poll_array_elems, 0);		MPIDI_FUNC_EXIT(MPID_STATE_POLL);				if (n_fds == 0 && millisecond_timeout != 0)		{		    int pollfds_active_elems = sock_set->poll_array_elems;				    sock_set->pollfds_active = sock_set->pollfds;		    #                   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)		    {			/* Release the lock so that other threads may make progress while this thread waits for something to do */			MPIU_DBG_MSG(THREAD,TYPICAL,"Exit global critical section");			MPID_Thread_mutex_unlock(&MPIR_Process.global_mutex);		    }#                   elif (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MONITOR)		    {			/* FIXME: this code is an experiment and is not even close to correct. */			if (MPIU_Monitor_closet_get_occupany_count(MPIR_Process.global_closet) == 0)			{			    MPIU_Monitor_exit(&MPIR_Process.global_monitor);			}			else			{			    MPIU_Monitor_continue(&MPIR_Process.global_monitor, &MPIR_Process.global_closet);			}		    }#                   else#                       error selected multi-threaded implementation is not supported#                   endif			    		    MPIDI_FUNC_ENTER(MPID_STATE_POLL);		    n_fds = poll(sock_set->pollfds_active, pollfds_active_elems, millisecond_timeout);		    MPIDI_FUNC_EXIT(MPID_STATE_POLL);		    #                   if (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MUTEX)		    {			/* Reaquire the lock before processing any of the information returned from poll */			MPIU_DBG_MSG(THREAD,TYPICAL,"Enter global critical section");			MPID_Thread_mutex_lock(&MPIR_Process.global_mutex);		    }#                   elif (USE_THREAD_IMPL == MPICH_THREAD_IMPL_GLOBAL_MONITOR)		    {			MPIU_Monitor_enter(&MPIR_Process.global_monitor);		    }#                   else#                       error selected multi-threaded implementation is not supported#                   endif		    /*		     * Update pollfds array if changes were posted while we were blocked in poll		     */		    if (sock_set->pollfds_updated)		    { 			for (elem = 0; elem < sock_set->poll_array_elems; elem++)			{			    sock_set->pollfds[elem].events = sock_set->pollinfos[elem].pollfd_events;			    if ((sock_set->pollfds[elem].events & (POLLIN | POLLOUT)) != 0)			    {				sock_set->pollfds[elem].fd = sock_set->pollinfos[elem].fd;			    }			    else			    {				sock_set->pollfds[elem].fd = -1;			    }			    if (elem < pollfds_active_elems)			    {				if (sock_set->pollfds_active == sock_set->pollfds)				{				    sock_set->pollfds[elem].revents &= ~(POLLIN | POLLOUT) | sock_set->pollfds[elem].events;				}				else 				{				    sock_set->pollfds[elem].revents = sock_set->pollfds_active[elem].revents &					(~(POLLIN | POLLOUT) | sock_set->pollfds[elem].events);								}			    }			    else   			    {				sock_set->pollfds[elem].revents = 0;			    }			}			if (sock_set->pollfds_active != sock_set->pollfds)			{			    MPIU_Free(sock_set->pollfds_active);			}			sock_set->pollfds_updated = FALSE;		    }		    sock_set->pollfds_active = NULL;		    sock_set->wakeup_posted = FALSE;		}	    }#	    endif /* (MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) */	    if (n_fds > 0)	    {		break;	    }	    else if (n_fds == 0)	    {		mpi_errno = MPIDU_SOCK_ERR_TIMEOUT;		goto fn_exit;	    }	    else if (errno == EINTR)	    {		if (millisecond_timeout != MPIDU_SOCK_INFINITE_TIME)		{		    mpi_errno = MPIDU_SOCK_ERR_TIMEOUT;		    goto fn_exit;		}		continue;	    }	    /* --BEGIN ERROR HANDLING-- */	    else if (errno == ENOMEM || errno == EAGAIN)	    {		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM,						 "**sock|osnomem", NULL);		goto fn_exit;	    }	    else	    {		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL,						 "**sock|oserror", "**sock|poll|oserror %d %s", errno, MPIU_Strerror(errno));		goto fn_exit;	    }	    /* --END ERROR HANDLING-- */	}	elem = sock_set->starting_elem;	n_elems = sock_set->poll_array_elems;	while (n_fds > 0 && n_elems > 0)	{	    /*	     * Acquire pointers to the pollfd and pollinfo structures for the next element	     *	     * NOTE: These pointers could become stale, if a new sock were to be allocated during the processing of the element.	     * At present, none of the handler routines allocate a sock, so the issue does not arise.	     */	    struct pollfd * const pollfd = &sock_set->pollfds[elem];	    struct pollinfo * const pollinfo = &sock_set->pollinfos[elem];		    MPIU_Assert((pollfd->events & (POLLIN | POLLOUT)) || pollfd->fd == -1);	    MPIU_Assert(pollfd->fd >= 0 || pollfd->fd == -1);				    if (pollfd->fd < 0 || pollfd->revents == 0)	    {		/* This optimization assumes that most FDs will not have a pending event. */		n_elems -= 1;		elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;		continue;	    }	    if (found_active_elem == FALSE)	    {		found_active_elem = TRUE;		sock_set->starting_elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;	    }	    if (pollfd->revents & POLLNVAL)	    {		mpi_errno = MPIR_Err_create_code(		    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|badhandle",		    "**sock|poll|badhandle %d %d %d %d", pollinfo->sock_set->id, pollinfo->sock_id, pollfd->fd, pollinfo->fd);		goto fn_exit;	    }	    /* --BEGIN ERROR HANDLING-- */	    if (pollfd->revents & POLLHUP)	    {		mpi_errno = MPIDU_Socki_handle_pollhup(pollfd, pollinfo);		if (MPIR_Err_is_fatal(mpi_errno))		{		    goto fn_exit;		}	    }	    /* According to Stevens, some errors are reported as normal data (POLLIN) and some are reported with POLLERR. */	    if (pollfd->revents & POLLERR)	    {		mpi_errno = MPIDU_Socki_handle_pollerr(pollfd, pollinfo);		if (MPIR_Err_is_fatal(mpi_errno))		{		    goto fn_exit;		}	    }	    /* --END ERROR HANDLING-- */	    	    if (pollfd->revents & POLLIN)	    {		if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION)		{ 		    if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW || pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RO)		    {			mpi_errno = MPIDU_Socki_handle_read(pollfd, pollinfo);			/* --BEGIN ERROR HANDLING-- */			if (MPIR_Err_is_fatal(mpi_errno))			{			    goto fn_exit;			}			/* --END ERROR HANDLING-- */		    }		    /* --BEGIN ERROR HANDLING-- */		    else		    {			mpi_errno = MPIR_Err_create_code(			    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",			    "**sock|poll|unhandledstate %d", pollinfo->state);			goto fn_exit;		    }		    /* --END ERROR HANDLING-- */		}		else if (pollinfo->type == MPIDU_SOCKI_TYPE_LISTENER)		{		    pollfd->events &= ~POLLIN;		    MPIDU_SOCKI_EVENT_ENQUEUE(pollinfo, MPIDU_SOCK_OP_ACCEPT, 0, pollinfo->user_ptr,					      MPI_SUCCESS, mpi_errno, fn_exit);		}		else if ((MPICH_THREAD_LEVEL == MPI_THREAD_MULTIPLE) && pollinfo->type == MPIDU_SOCKI_TYPE_INTERRUPTER)		{		    char c[16];		    int nb;		    do		    {			nb = read(pollfd->fd, c, 16);		    }		    while (nb > 0 || (nb < 0 && errno == EINTR));		}		/* --BEGIN ERROR HANDLING-- */		else		{		    mpi_errno = MPIR_Err_create_code(			MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",			"**sock|poll|unhandledtype %d", pollinfo->type);		    goto fn_exit;		}		/* --END ERROR HANDLING-- */	    }	    if (pollfd->revents & POLLOUT)	    {		if (pollinfo->type == MPIDU_SOCKI_TYPE_COMMUNICATION)		{		    if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)		    {			mpi_errno = MPIDU_Socki_handle_write(pollfd, pollinfo);			/* --BEGIN ERROR HANDLING-- */			if (MPIR_Err_is_fatal(mpi_errno))			{			    goto fn_exit;			}			/* --END ERROR HANDLING-- */		    }		    else if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTING)		    {			mpi_errno = MPIDU_Socki_handle_connect(pollfd, pollinfo);			/* --BEGIN ERROR HANDLING-- */			if (MPIR_Err_is_fatal(mpi_errno))			{			    goto fn_exit;			}			/* --END ERROR HANDLING-- */		    }		    /* --BEGIN ERROR HANDLING-- */		    else		    {			mpi_errno = MPIR_Err_create_code(			    MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledstate",			    "**sock|poll|unhandledstate %d", pollinfo->state);			goto fn_exit;		    }		    /* --END ERROR HANDLING-- */		}		/* --BEGIN ERROR HANDLING-- */		else		{		    mpi_errno = MPIR_Err_create_code(			MPI_SUCCESS, MPIR_ERR_FATAL, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL, "**sock|poll|unhandledtype",			"**sock|poll|unhandledtype %d", pollinfo->type);		    goto fn_exit;		}		/* --END ERROR HANDLING-- */	    }	    n_fds -= 1;	    n_elems -= 1;	    elem = (elem + 1 < sock_set->poll_array_elems) ? elem + 1 : 0;	}    }      fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_WAIT);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPIDU_Socki_handle_pollhup#undef FCNAME#define FCNAME MPIU_QUOTE(FUNCNAME)static int MPIDU_Socki_handle_pollhup(struct pollfd * const pollfd, struct pollinfo * const pollinfo){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCKI_HANDLE_POLLHUP);        if (pollinfo->state == MPIDU_SOCKI_STATE_CONNECTED_RW)    {	/*	 * If a write was posted then cancel it and generate an connection closed event.  If a read is posted, it will be handled	 * by the POLLIN handler.	 */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -