⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ipcsocket.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
			(	wait_conn->ch_private);			ch_private = (struct SOCKET_CH_PRIVATE *)(ch->ch_private);			strncpy(ch_private->path_name,conn_private->path_name			,		sizeof(conn_private->path_name));		}	}	/* Verify the client authorization information. */	if (ch->ops->verify_auth(ch, auth_info) == IPC_OK) {		ch->ch_status = IPC_CONNECT;		ch->farside_pid = socket_get_farside_pid(new_sock);		return ch;	}  	return NULL;}static voidsocket_destroy_channel(struct IPC_CHANNEL * ch){	socket_disconnect(ch);	socket_destroy_queue(ch->send_queue);	socket_destroy_queue(ch->recv_queue);	if (ch->ch_private != NULL) {    		g_free((void*)(ch->ch_private));	}	memset(ch, 0xff, sizeof(*ch));	g_free((void*)ch);}/*  * Called by socket_destory(). Disconnect the connection  * and set ch_status to IPC_DISCONNECT.  * * parameters : *     ch (IN) the pointer to the channel. * * return values :  *     IPC_OK   the connection is disconnected successfully. *      IPC_FAIL     operation fails.*/static intsocket_disconnect(struct IPC_CHANNEL* ch){	struct SOCKET_CH_PRIVATE* conn_info;	conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private;#if 0	if (ch->ch_status != IPC_DISCONNECT) {  		cl_log(LOG_INFO, "forced disconnect for fd %d", conn_info->s);	}#endif	close(conn_info->s);	cl_poll_ignore(conn_info->s);	conn_info->s = -1;	ch->ch_status = IPC_DISCONNECT;	return IPC_OK;}static intsocket_check_disc_pending(struct IPC_CHANNEL* ch){	int		rc;	struct pollfd	sockpoll;	if (ch->ch_status == IPC_DISCONNECT) {		cl_log(LOG_ERR, "check_disc_pending() already disconnected");		return IPC_BROKEN;	}	if (ch->recv_queue->current_qlen > 0) {		return IPC_OK;	}	sockpoll.fd = ch->ops->get_recv_select_fd(ch);	sockpoll.events = POLLIN;	rc = ipc_pollfunc_ptr(&sockpoll, 1, 0); 	if (rc < 0) {		cl_log(LOG_INFO		,	"socket_check_disc_pending() bad poll call");		ch->ch_status = IPC_DISCONNECT; 		return IPC_BROKEN;	}		if (sockpoll.revents & POLLHUP) {		if (sockpoll.revents & POLLIN) {			ch->ch_status = IPC_DISC_PENDING;		}else{#if 0			cl_log(LOG_INFO, "HUP without input");#endif			ch->ch_status = IPC_DISCONNECT;			return IPC_BROKEN;		}	}	if (sockpoll.revents & POLLIN) {		int dummy;		socket_resume_io_read(ch, &dummy, FALSE);	}	return IPC_OK;}static int socket_initiate_connection(struct IPC_CHANNEL * ch){	struct SOCKET_CH_PRIVATE* conn_info;  	struct sockaddr_un peer_addr; /* connector's address information */  	conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private;  	/* Prepare the socket */	memset(&peer_addr, 0, sizeof(peer_addr));	peer_addr.sun_family = AF_LOCAL;    /* host byte order */ 	if (strlen(conn_info->path_name) >= sizeof(peer_addr.sun_path)) {		return IPC_FAIL;	}	strncpy(peer_addr.sun_path, conn_info->path_name, sizeof(peer_addr.sun_path));	/* Send connection request */	if (connect(conn_info->s, (struct sockaddr *)&peer_addr	, 	sizeof(struct sockaddr_un)) == -1) {		cl_perror("initiate_connection: connect failure");		return IPC_FAIL;	}	ch->ch_status = IPC_CONNECT;	ch->farside_pid = socket_get_farside_pid(conn_info->s);	return IPC_OK;}static int socket_send(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* msg){	if (msg->msg_len < 0 || msg->msg_len > MAXDATASIZE) {		return IPC_FAIL;	}	if (ch->ch_status != IPC_CONNECT){                return IPC_FAIL;        }		if ( !ch->is_send_blocking &&	     ch->send_queue->current_qlen >= ch->send_queue->max_qlen) {                cl_log(LOG_ERR, "send queue maximum length(%d) exceeded",                       ch->send_queue->max_qlen );                return IPC_FAIL;        }        do{                ch->ops->resume_io(ch);        }        while (ch->send_queue->current_qlen >= ch->send_queue->max_qlen);	        /* add the message into the send queue */        CHECKFOO(0,ch, msg, SavedQueuedBody, "queued message");        SocketIPCStats.noutqueued++;        ch->send_queue->queue = g_list_append(ch->send_queue->queue,                                              msg);        ch->send_queue->current_qlen++;        /* resume io */        ch->ops->resume_io(ch);        return IPC_OK;		}static int socket_recv(struct IPC_CHANNEL * ch, struct IPC_MESSAGE** message){	GList *element;	gboolean	started;	int		result;	(void)socket_resume_io(ch);	result = socket_resume_io_read(ch, &started, TRUE);	*message = NULL;	if (ch->recv_queue->current_qlen == 0) {		return result != IPC_OK ? result : IPC_FAIL;	}	element = g_list_first(ch->recv_queue->queue);	if (element == NULL) {		/* Internal accounting error, but correctable */		cl_log(LOG_ERR		, "recv failure: qlen (%d) > 0, but no message found."		,	ch->recv_queue->current_qlen);		ch->recv_queue->current_qlen = 0;		return IPC_FAIL;	}	*message = (struct IPC_MESSAGE *) (element->data);	CHECKFOO(1,ch, *message, SavedReadBody, "read message");	SocketIPCStats.nreceived++;	ch->recv_queue->queue =	g_list_remove(ch->recv_queue->queue	,	element->data);	ch->recv_queue->current_qlen--;	return IPC_OK;}static intsocket_check_poll(struct IPC_CHANNEL * ch,		struct pollfd * sockpoll){	if (ch->ch_status == IPC_DISCONNECT) {		return IPC_OK;	}	if (sockpoll->revents & POLLHUP) {		/* If input present, or this is an output-only poll... */		if (sockpoll->revents & POLLIN		|| (sockpoll-> events & POLLIN) == 0 ) {			ch->ch_status = IPC_DISC_PENDING;			return IPC_OK;		}#if 0		cl_log(LOG_INFO, "socket_check_poll(): HUP without input");#endif		ch->ch_status = IPC_DISCONNECT;		return IPC_BROKEN;	}else if (sockpoll->revents & (POLLNVAL|POLLERR)) {		/* Have we already closed the socket? */		if (fcntl(sockpoll->fd, F_GETFL) < 0) {			cl_perror("socket_check_poll(pid %d): bad fd [%d]"			,	(int) getpid(), sockpoll->fd);			ch->ch_status = IPC_DISCONNECT;			return IPC_OK;		}		cl_log(LOG_ERR		,	"revents failure: fd %d, flags 0x%x"		,	sockpoll->fd, sockpoll->revents);		errno = EINVAL;		return IPC_FAIL;	}	return IPC_OK;}static intsocket_waitfor(struct IPC_CHANNEL * ch,	gboolean (*finished)(struct IPC_CHANNEL * ch)){	struct pollfd sockpoll;	CHANAUDIT(ch);	if (finished(ch)) {		return IPC_OK;	} 	if (ch->ch_status == IPC_DISCONNECT) { 		return IPC_BROKEN;	}	sockpoll.fd = ch->ops->get_recv_select_fd(ch);		while (!finished(ch) &&	IPC_ISRCONN(ch)) {		int	rc;		sockpoll.events = POLLIN;				/* Cannot call resume_io after the call to finished()		 * and before the call to poll because we might		 * change the state of the thing finished() is		 * waiting for.		 * This means that the poll call below would be		 * not only pointless, but might		 * make us hang forever waiting for this		 * event which has already happened		 */		if (ch->send_queue->current_qlen > 0) {			sockpoll.events |= POLLOUT;		}				rc = ipc_pollfunc_ptr(&sockpoll, 1, -1);				if (rc < 0) {			return (errno == EINTR ? IPC_INTR : IPC_FAIL);		}		rc = socket_check_poll(ch, &sockpoll);		if (sockpoll.revents & POLLIN) {			socket_resume_io(ch);		}		if (rc != IPC_OK) {			CHANAUDIT(ch);			return rc;		}	}	CHANAUDIT(ch);	return IPC_OK;}static intsocket_waitin(struct IPC_CHANNEL * ch){	return socket_waitfor(ch, ch->ops->is_message_pending);}static gbooleansocket_is_output_flushed(struct IPC_CHANNEL * ch){	return ! ch->ops->is_sending_blocked(ch);}static intsocket_waitout(struct IPC_CHANNEL * ch){	int	rc;	CHANAUDIT(ch);	rc = socket_waitfor(ch, socket_is_output_flushed);	if (rc != IPC_OK) {		cl_log(LOG_ERR		,	"socket_waitout failure: rc = %d", rc);	}else if (ch->ops->is_sending_blocked(ch)) {		cl_log(LOG_ERR, "socket_waitout output still blocked");	}	CHANAUDIT(ch);	return rc;}static gbooleansocket_is_message_pending(struct IPC_CHANNEL * ch){	ch->ops->resume_io(ch);	if (ch->recv_queue->current_qlen > 0) {		return TRUE;	}	return !IPC_ISRCONN(ch);}static gbooleansocket_is_output_pending(struct IPC_CHANNEL * ch){	socket_resume_io(ch);	return 	ch->ch_status == IPC_CONNECT	&&	 ch->send_queue->current_qlen > 0;}static int socket_assert_auth(struct IPC_CHANNEL *ch, GHashTable *auth){	cl_log(LOG_ERR	, "the assert_auth function for domain socket is not implemented");	return IPC_FAIL;}static intsocket_resume_io_read(struct IPC_CHANNEL *ch, gboolean* started, gboolean read1anyway){	struct SOCKET_CH_PRIVATE*	conn_info;	int				retcode = IPC_OK;	struct pollfd			sockpoll;	int				debug_loopcount = 0;	int				debug_bytecount = 0;	int				maxqlen = ch->recv_queue->max_qlen;	CHANAUDIT(ch);	conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;	*started = FALSE; 	if (maxqlen <= 0 && read1anyway) {		maxqlen = 1;	}  	while (ch->recv_queue->current_qlen < maxqlen && retcode == IPC_OK) {		gboolean			new_msg;		void *				msg_begin;		int				msg_len;		struct SOCKET_MSG_HEAD		head;		int				len;		CHANAUDIT(ch);		++debug_loopcount;		new_msg = (conn_info->remaining_data == 0);		if (new_msg) {			len = sizeof(struct SOCKET_MSG_HEAD);			msg_begin = &head;		}else{			struct IPC_MESSAGE * msg = conn_info->buf_msg;			len = conn_info->remaining_data;			msg_begin = ((char*)msg->msg_body)			+	(msg->msg_len - len);		}		if (len <= 0 || len > MAXDATASIZE) {			cl_log(LOG_ERR			,	"socket_resume_io_read()"			": bad packet length [%d]", len);			ch->ch_status = IPC_DISCONNECT;			retcode = IPC_BROKEN;			break;		}		CHANAUDIT(ch);		/* Now try to receive some data */		msg_len = recv(conn_info->s, msg_begin, len, MSG_DONTWAIT);		SocketIPCStats.last_recv_rc = msg_len;		SocketIPCStats.last_recv_errno = errno;		++SocketIPCStats.recv_count;#ifdef DEBUG		cl_perror("recv() => %d, errno = %d loopcount = %d, %s"		,	msg_len, errno, debug_loopcount		,	(new_msg ? "msg head": "msg body"));#endif		CHANAUDIT(ch);		/* Did we get an error? */		if (msg_len < 0) {			/* What kind of error did we get? */			switch (errno) {				case EAGAIN:					if (ch->ch_status==IPC_DISC_PENDING){						ch->ch_status =IPC_DISCONNECT;						retcode = IPC_BROKEN;					}					break;										case ECONNREFUSED:				case ECONNRESET:					retcode					= socket_check_disc_pending(ch);					break;				default:					cl_perror("socket_resume_io_read"					": unknown recv error");					ch->ch_status = IPC_DISCONNECT;					retcode = IPC_FAIL;					break;			}			break; /* out of loop */    		}		if (msg_len == 0) {			if (ch->ch_status == IPC_DISC_PENDING			&&	ch->recv_queue->current_qlen <= 0) {				ch->ch_status = IPC_DISCONNECT;				retcode = IPC_FAIL;			}			break;		}		/* How about that!  We read something! */		/* Note that all previous cases break out of the loop */		debug_bytecount += msg_len;		*started=TRUE;#if 0		cl_log(LOG_DEBUG, "Got %d byte message", msg_len);		cl_log(LOG_DEBUG, "Contents: %s", (char*)msg_begin);#endif		/* Is this data for the start of a new message? */		if (new_msg){			/* We assume we read 'len' bytes */			if (head.msg_len <= 0			||	head.msg_len > MAXDATASIZE) {				cl_log(LOG_CRIT				,	"socket_resume_io_read()"				": Invalid msg len [%d]"				,	head.msg_len);				ch->ch_status = IPC_DISCONNECT;				retcode = IPC_FAIL;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -