⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ipcsocket.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 4 页
字号:
	&&	ch->send_queue->current_qlen > 0){		if (socket_resume_io(ch) != IPC_OK){			break;		}	}	socket_disconnect(ch);	socket_destroy_queue(ch->send_queue);	socket_destroy_queue(ch->recv_queue);	if (ch->ch_private != NULL) {		struct SOCKET_CH_PRIVATE *priv = (struct SOCKET_CH_PRIVATE *)			ch->ch_private;		if(priv->peer_addr != NULL) {			unlink(priv->peer_addr->sun_path);			g_free((void*)(priv->peer_addr));		}    		g_free((void*)(ch->ch_private));			}	memset(ch, 0xff, sizeof(*ch));	g_free((void*)ch);}/*  * Called by socket_destory(). Disconnect the connection  * and set ch_status to IPC_DISCONNECT.  * * parameters : *     ch (IN) the pointer to the channel. * * return values :  *     IPC_OK   the connection is disconnected successfully. *      IPC_FAIL     operation fails.*/static intsocket_disconnect(struct IPC_CHANNEL* ch){	struct SOCKET_CH_PRIVATE* conn_info;	conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private;#if 0	if (ch->ch_status != IPC_DISCONNECT) {  		cl_log(LOG_INFO, "forced disconnect for fd %d", conn_info->s);	}#endif	close(conn_info->s);	cl_poll_ignore(conn_info->s);	conn_info->s = -1;	ch->ch_status = IPC_DISCONNECT;	return IPC_OK;}static intsocket_check_disc_pending(struct IPC_CHANNEL* ch){	int		rc;	struct pollfd	sockpoll;	if (ch->ch_status == IPC_DISCONNECT) {		cl_log(LOG_ERR, "check_disc_pending() already disconnected");		return IPC_BROKEN;	}	if (ch->recv_queue->current_qlen > 0) {		return IPC_OK;	}	sockpoll.fd = ch->ops->get_recv_select_fd(ch);	sockpoll.events = POLLIN;	rc = ipc_pollfunc_ptr(&sockpoll, 1, 0); 	if (rc < 0) {		cl_log(LOG_INFO		,	"socket_check_disc_pending() bad poll call");		ch->ch_status = IPC_DISCONNECT; 		return IPC_BROKEN;	}		if (sockpoll.revents & POLLHUP) {		if (sockpoll.revents & POLLIN) {			ch->ch_status = IPC_DISC_PENDING;		}else{#if 1			cl_log(LOG_INFO, "HUP without input");#endif			ch->ch_status = IPC_DISCONNECT;			return IPC_BROKEN;		}	}	if (sockpoll.revents & POLLIN) {		int dummy;		socket_resume_io_read(ch, &dummy, FALSE);	}	return IPC_OK;}static int socket_initiate_connection(struct IPC_CHANNEL * ch){	struct SOCKET_CH_PRIVATE* conn_info;  	struct sockaddr_un peer_addr; /* connector's address information */  	conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private;  	/* Prepare the socket */	memset(&peer_addr, 0, sizeof(peer_addr));	peer_addr.sun_family = AF_LOCAL;    /* host byte order */ 	if (strlen(conn_info->path_name) >= sizeof(peer_addr.sun_path)) {		return IPC_FAIL;	}	strncpy(peer_addr.sun_path, conn_info->path_name, sizeof(peer_addr.sun_path));	/* Send connection request */	if (connect(conn_info->s, (struct sockaddr *)&peer_addr	, 	sizeof(struct sockaddr_un)) == -1) {		cl_log(LOG_WARNING, "initiate_connection: connect failure: %s", strerror(errno) );		return IPC_FAIL;	}	ch->ch_status = IPC_CONNECT;	ch->farside_pid = socket_get_farside_pid(conn_info->s);	return IPC_OK;}static voidsocket_set_high_flow_callback(IPC_Channel* ch,			      flow_callback_t callback,			      void* userdata){		ch->high_flow_callback = callback;	ch->high_flow_userdata = userdata;	}static voidsocket_set_low_flow_callback(IPC_Channel* ch,			     flow_callback_t callback,			     void* userdata){		ch->low_flow_callback = callback;	ch->low_flow_userdata = userdata;	}static voidsocket_check_flow_control(struct IPC_CHANNEL* ch, 			  int orig_qlen, 			  int curr_qlen){	if (!IPC_ISRCONN(ch)) {		return;	}	if (curr_qlen >= ch->high_flow_mark	    && ch->high_flow_callback){			ch->high_flow_callback(ch, ch->high_flow_userdata);	} 		if (curr_qlen <= ch->low_flow_mark	    && orig_qlen > ch->low_flow_mark	    && ch->low_flow_callback){		ch->low_flow_callback(ch, ch->low_flow_userdata);	       	}					return;	}static int socket_send(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* msg){	int orig_qlen;		if (msg->msg_len < 0 || msg->msg_len > MAXDATASIZE) {		cl_log(LOG_ERR, "socket_send: "		       "invalid message");		       		return IPC_FAIL;	}		if (ch->ch_status != IPC_CONNECT){		return IPC_FAIL;	}		ch->ops->resume_io(ch);	if ( !ch->should_send_blocking &&	    ch->send_queue->current_qlen >= ch->send_queue->max_qlen) {		/*cl_log(LOG_WARNING, "send queue maximum length(%d) exceeded",		  ch->send_queue->max_qlen );*/		return IPC_FAIL;	}		while (ch->send_queue->current_qlen >= ch->send_queue->max_qlen){		cl_shortsleep();		ch->ops->resume_io(ch);	}		/* add the message into the send queue */	CHECKFOO(0,ch, msg, SavedQueuedBody, "queued message");	SocketIPCStats.noutqueued++;	ch->send_queue->queue = g_list_append(ch->send_queue->queue,					      msg);	orig_qlen = ch->send_queue->current_qlen++;		socket_check_flow_control(ch, orig_qlen, orig_qlen +1 );		/* resume io */	ch->ops->resume_io(ch);	return IPC_OK;  }static int socket_recv(struct IPC_CHANNEL * ch, struct IPC_MESSAGE** message){	GList *element;	int		nbytes;	int		result;		socket_resume_io(ch);	result = socket_resume_io_read(ch, &nbytes, TRUE);	*message = NULL;	if (ch->recv_queue->current_qlen == 0) {		return result != IPC_OK ? result : IPC_FAIL;		/*return IPC_OK;*/	}	element = g_list_first(ch->recv_queue->queue);	if (element == NULL) {		/* Internal accounting error, but correctable */		cl_log(LOG_ERR		, "recv failure: qlen (%d) > 0, but no message found."		,	ch->recv_queue->current_qlen);		ch->recv_queue->current_qlen = 0;		return IPC_FAIL;	}	*message = (struct IPC_MESSAGE *) (element->data);	CHECKFOO(1,ch, *message, SavedReadBody, "read message");	SocketIPCStats.nreceived++;	ch->recv_queue->queue =	g_list_remove(ch->recv_queue->queue	,	element->data);	ch->recv_queue->current_qlen--;	return IPC_OK;}static intsocket_check_poll(struct IPC_CHANNEL * ch,		struct pollfd * sockpoll){	if (ch->ch_status == IPC_DISCONNECT) {		return IPC_OK;	}	if (sockpoll->revents & POLLHUP) {		/* If input present, or this is an output-only poll... */		if (sockpoll->revents & POLLIN		|| (sockpoll-> events & POLLIN) == 0 ) {			ch->ch_status = IPC_DISC_PENDING;			return IPC_OK;		}#if 1		cl_log(LOG_INFO, "socket_check_poll(): HUP without input");#endif		ch->ch_status = IPC_DISCONNECT;		return IPC_BROKEN;	}else if (sockpoll->revents & (POLLNVAL|POLLERR)) {		/* Have we already closed the socket? */		if (fcntl(sockpoll->fd, F_GETFL) < 0) {			cl_perror("socket_check_poll(pid %d): bad fd [%d]"			,	(int) getpid(), sockpoll->fd);			ch->ch_status = IPC_DISCONNECT;			return IPC_OK;		}		cl_log(LOG_ERR		,	"revents failure: fd %d, flags 0x%x"		,	sockpoll->fd, sockpoll->revents);		errno = EINVAL;		return IPC_FAIL;	}	return IPC_OK;}static intsocket_waitfor(struct IPC_CHANNEL * ch,	gboolean (*finished)(struct IPC_CHANNEL * ch)){	struct pollfd sockpoll;	CHANAUDIT(ch);	if (finished(ch)) {		return IPC_OK;	} 	if (ch->ch_status == IPC_DISCONNECT) { 		return IPC_BROKEN;	}	sockpoll.fd = ch->ops->get_recv_select_fd(ch);		while (!finished(ch) &&	IPC_ISRCONN(ch)) {		int	rc;		sockpoll.events = POLLIN;				/* Cannot call resume_io after the call to finished()		 * and before the call to poll because we might		 * change the state of the thing finished() is		 * waiting for.		 * This means that the poll call below would be		 * not only pointless, but might		 * make us hang forever waiting for this		 * event which has already happened		 */		if (ch->send_queue->current_qlen > 0) {			sockpoll.events |= POLLOUT;		}				rc = ipc_pollfunc_ptr(&sockpoll, 1, -1);				if (rc < 0) {			return (errno == EINTR ? IPC_INTR : IPC_FAIL);		}		rc = socket_check_poll(ch, &sockpoll);		if (sockpoll.revents & POLLIN) {			socket_resume_io(ch);		}		if (rc != IPC_OK) {			CHANAUDIT(ch);			return rc;		}	}	CHANAUDIT(ch);	return IPC_OK;}static intsocket_waitin(struct IPC_CHANNEL * ch){	return socket_waitfor(ch, ch->ops->is_message_pending);}static gbooleansocket_is_output_flushed(struct IPC_CHANNEL * ch){	return ! ch->ops->is_sending_blocked(ch);}static intsocket_waitout(struct IPC_CHANNEL * ch){	int	rc;	CHANAUDIT(ch);	rc = socket_waitfor(ch, socket_is_output_flushed);	if (rc != IPC_OK) {		cl_log(LOG_ERR		,	"socket_waitout failure: rc = %d", rc);	}else if (ch->ops->is_sending_blocked(ch)) {		cl_log(LOG_ERR, "socket_waitout output still blocked");	}	CHANAUDIT(ch);	return rc;}static gbooleansocket_is_message_pending(struct IPC_CHANNEL * ch){	ch->ops->resume_io(ch);	if (ch->recv_queue->current_qlen > 0) {		return TRUE;	}	return !IPC_ISRCONN(ch);}static gbooleansocket_is_output_pending(struct IPC_CHANNEL * ch){	socket_resume_io(ch);	return 	ch->ch_status == IPC_CONNECT	&&	 ch->send_queue->current_qlen > 0;}static int socket_assert_auth(struct IPC_CHANNEL *ch, GHashTable *auth){	cl_log(LOG_ERR	, "the assert_auth function for domain socket is not implemented");	return IPC_FAIL;}static intsocket_resume_io_read(struct IPC_CHANNEL *ch, int* nbytes, gboolean read1anyway){	struct SOCKET_CH_PRIVATE*	conn_info;	int				retcode = IPC_OK;	struct pollfd			sockpoll;	int				debug_loopcount = 0;	int				debug_bytecount = 0;	int				maxqlen = ch->recv_queue->max_qlen;	struct ipc_bufpool*		pool = ch->pool;	int				nmsgs = 0;	int				spaceneeded;	*nbytes = 0;		CHANAUDIT(ch);	conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;	if (ch->ch_status == IPC_DISCONNECT) {		return IPC_BROKEN;	}		if (pool == NULL){		ch->pool = pool = ipc_bufpool_new(0);		if (pool == NULL){						cl_log(LOG_ERR, "socket_resume_io_read: "			       "memory allocation for ipc pool failed");			exit(1);		}	}		if (ipc_bufpool_full(pool, ch, &spaceneeded)){				struct ipc_bufpool*	newpool;				newpool = ipc_bufpool_new(spaceneeded);		if (newpool == NULL){						cl_log(LOG_ERR, "socket_resume_io_read: "			       "memory allocation for a new ipc pool failed");			exit(1);		}				ipc_bufpool_partial_copy(newpool, pool);		ipc_bufpool_unref(pool);		ch->pool = pool = newpool;	}			if (maxqlen <= 0 && read1anyway) {		maxqlen = 1;	}  	if (ch->recv_queue->current_qlen < maxqlen && retcode == IPC_OK) {				void *				msg_begin;		int				msg_len;		int				len;		CHANAUDIT(ch);		++debug_loopcount;		len = ipc_bufpool_spaceleft(pool);		msg_begin = pool->currpos;				CHANAUDIT(ch);				/* Now try to receive some data */		msg_len = recv(conn_info->s, msg_begin, len, MSG_DONTWAIT);		SocketIPCStats.last_recv_rc = msg_len;		SocketIPCStats.last_recv_errno = errno;		++SocketIPCStats.recv_count;				/* Did we get an error? */		if (msg_len < 0) {			switch (errno) {			case EAGAIN:				if (ch->ch_status==IPC_DISC_PENDING){					ch->ch_status =IPC_DISCONNECT;					retcode = IPC_BROKEN;				}				break;									case ECONNREFUSED:			case ECONNRESET:				retcode= socket_check_disc_pending(ch);				break;							default:				cl_perror("socket_resume_io_read"					  ": unknown recv error");				ch->ch_status = IPC_DISCONNECT;				retcode = IPC_FAIL;				break;			}					}else if (msg_len == 0) {			if (ch->ch_status == IPC_DISC_PENDING			    &&	ch->recv_queue->current_qlen <= 0) {				ch->ch_status = IPC_DISCONNECT;				retcode = IPC_FAIL;			}		}else {			/* We read something! */			/* Note that all previous cases break out of the loop */			debug_bytecount += msg_len;			*nbytes = msg_len;			nmsgs = ipc_bufpool_update(pool, ch, msg_len, ch->recv_queue) ;						SocketIPCStats.ninqueued += nmsgs;					}	}	/* Check for errors uncaught by recv() */	/* NOTE: It doesn't seem right we have to do this every time */	/* FIXME?? */		memset(&sockpoll,0, sizeof(struct pollfd));		if ((retcode == IPC_OK) 	&&	(sockpoll.fd = conn_info->s) >= 0) {		/* Just check for errors, not for data */		sockpoll.events = 0;		ipc_pollfunc_ptr(&sockpoll, 1, 0);		retcode = socket_check_poll(ch, &sockpoll);	}		CHANAUDIT(ch);	if (retcode != IPC_OK) {		return retcode;	}	return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;}static intsocket_resume_io_write(struct IPC_CHANNEL *ch, int* nmsg){	int				retcode = IPC_OK;	struct SOCKET_CH_PRIVATE*	conn_info;			CHANAUDIT(ch);	*nmsg = 0;	conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;   	while (ch->ch_status == IPC_CONNECT	&&		retcode == IPC_OK	&&		ch->send_queue->current_qlen > 0) {		GList *				element;		struct IPC_MESSAGE *		msg;		struct SOCKET_MSG_HEAD*		head;                struct IPC_MESSAGE* 		oldmsg = NULL;		int				sendrc = 0;                struct IPC_MESSAGE* 		newmsg;		char*				p;		unsigned int			bytes_remaining;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -