⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ipcsocket.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 4 页
字号:
/* $Id: ipcsocket.c,v 1.123 2005/02/11 21:39:35 alan Exp $ *//* * ipcsocket unix domain socket implementation of IPC abstraction. * * Copyright (c) 2002 Xiaoxiang Liu <xiliu@ncsa.uiuc.edu> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA * */#include <portability.h>#include <clplumbing/ipc.h>#include <clplumbing/cl_log.h>#include <clplumbing/realtime.h>#include <clplumbing/cl_poll.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <syslog.h>#include <sched.h>#include <sys/types.h>#include <sys/stat.h>#include <sys/param.h>#include <sys/uio.h>#ifdef HAVE_SYS_FILIO_H#	include <sys/filio.h>#endif#ifdef HAVE_SYS_SYSLIMITS_H#	include <sys/syslimits.h>#endif#ifdef HAVE_SYS_CRED_H#	include <sys/cred.h>#endif#ifdef HAVE_SYS_UCRED_H#	include <sys/ucred.h>#endif#include <sys/socket.h>#include <sys/poll.h>#include <netinet/in.h>#include <sys/un.h>#include <sys/ioctl.h>#include <unistd.h>#include <errno.h>#include <fcntl.h>#ifndef UNIX_PATH_MAX#	define UNIX_PATH_MAX 108#endif#define MAX_LISTEN_NUM 10#ifndef SUN_LEN#    define SUN_LEN(ptr) ((size_t) (offsetof (sockaddr_un, sun_path) + strlen ((ptr)->sun_path))#endif#ifndef MSG_NOSIGNAL#define		MSG_NOSIGNAL	0#endif#ifndef AF_LOCAL#define         AF_LOCAL AF_UNIX#endif/*********************************************************************** * * Determine the IPC authentication scheme...  More machine dependent than * we'd like, but don't know any better way... * ***********************************************************************/#ifdef SO_PEERCRED#	define	USE_SO_PEERCRED#elif HAVE_GETPEEREID#	define USE_GETPEEREID#elif ON_DARWIN/* Darwin has SCM_CREDS but it has been crippled by Apple *  - force USE_BINDSTAT_CREDS instead */#	define	USE_BINDSTAT_CREDS#elif defined(SCM_CREDS)#	define	USE_SCM_CREDS#else#	define	USE_DUMMY_CREDS/* This will make it compile, but attempts to authenticate * will fail.  This is a stopgap measure ;-) */#endif/* wait connection private data. */struct SOCKET_WAIT_CONN_PRIVATE{  /* the path name wich the connection will be built on. */  char path_name[UNIX_PATH_MAX];  /* the domain socket. */  int s;};/* channel private data. */struct SOCKET_CH_PRIVATE{  /* the path name wich the connection will be built on. */  char path_name[UNIX_PATH_MAX];  /* the domain socket. */  int s;  /* the size of expecting data for below buffered message buf_msg */  int remaining_data;  /* The address of our peer - used by USE_BINDSTAT_CREDS version of   *   socket_verify_auth()   */  struct sockaddr_un *peer_addr;		  /* the buf used to save unfinished message */  struct IPC_MESSAGE *buf_msg;};struct IPC_Stats {	long	nsent;	long	noutqueued;	long	send_count;	long	nreceived;	long	ninqueued;	long	recv_count;	int	last_recv_errno;	int	last_recv_rc;	int	last_send_errno;	int	last_send_rc;};struct IPC_Stats	SocketIPCStats = {0,0,0,0};/* unix domain socket implementations of IPC functions. */static void socket_destroy_wait_conn(struct IPC_WAIT_CONNECTION * wait_conn);static int socket_wait_selectfd(struct IPC_WAIT_CONNECTION *wait_conn);static struct IPC_CHANNEL * socket_accept_connection(struct IPC_WAIT_CONNECTION * wait_conn, struct IPC_AUTH *auth_info);static void socket_destroy_channel(struct IPC_CHANNEL * ch);static int socket_initiate_connection(struct IPC_CHANNEL * ch);static int socket_send(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* message);static int socket_recv(struct IPC_CHANNEL * ch, struct IPC_MESSAGE** message);static int socket_resume_io(struct IPC_CHANNEL *ch);static gboolean socket_is_message_pending(struct IPC_CHANNEL *ch);static gboolean socket_is_output_pending(struct IPC_CHANNEL *ch);static int socket_assert_auth(struct IPC_CHANNEL *ch, GHashTable *auth);static int socket_verify_auth(struct IPC_CHANNEL*ch, struct IPC_AUTH*auth_info);/* for domain socket, reve_fd = send_fd. */static int socket_get_recv_fd(struct IPC_CHANNEL *ch);static int socket_get_send_fd(struct IPC_CHANNEL *ch);static int socket_set_send_qlen (struct IPC_CHANNEL* ch, int q_len);static int socket_set_recv_qlen (struct IPC_CHANNEL* ch, int q_len);/* helper functions. */static int socket_disconnect(struct IPC_CHANNEL* ch);static struct IPC_QUEUE* socket_queue_new(void);static void socket_destroy_queue(struct IPC_QUEUE * q);static struct IPC_MESSAGE* socket_message_new(struct IPC_CHANNEL*ch,	int msg_len);void socket_free_message(struct IPC_MESSAGE * msg);struct IPC_WAIT_CONNECTION *socket_wait_conn_new(GHashTable* ch_attrs);struct IPC_CHANNEL* socket_client_channel_new(GHashTable *attrs);struct IPC_CHANNEL* socket_server_channel_new(int sockfd);pid_t socket_get_farside_pid(int sockfd);extern int (*ipc_pollfunc_ptr)(struct pollfd *, nfds_t, int);static int socket_waitin(struct IPC_CHANNEL * ch);static int socket_waitout(struct IPC_CHANNEL * ch);static int socket_resume_io_read(struct IPC_CHANNEL *ch, int*, gboolean read1anyway);static void socket_set_high_flow_callback(IPC_Channel* ch,					  flow_callback_t callback,					  void* userdata);static void socket_set_low_flow_callback(IPC_Channel* ch,					 flow_callback_t callback,					 void* userdata);static IPC_Message* socket_new_ipcmsg(IPC_Channel* ch, 				      const void* data,				      int len,				      void* private);/* socket object of the function table */static struct IPC_OPS socket_ops = {  socket_destroy_channel,  socket_initiate_connection,  socket_verify_auth,  socket_assert_auth,  socket_send,  socket_recv,  socket_waitin,  socket_waitout,  socket_is_message_pending,  socket_is_output_pending,  socket_resume_io,  socket_get_send_fd,  socket_get_recv_fd,  socket_set_send_qlen,  socket_set_recv_qlen,  socket_set_high_flow_callback,  socket_set_low_flow_callback,  socket_new_ipcmsg,};void dump_ipc_info(const IPC_Channel* chan);#undef AUDIT_CHANNELS#ifndef AUDIT_CHANNELS#	define	CHANAUDIT(ch)	/*NOTHING */#else#	define CHANAUDIT(ch)	socket_chan_audit(ch)#	define MAXPID	65535static voidsocket_chan_audit(const struct IPC_CHANNEL* ch){	int	badch = FALSE;  	struct SOCKET_CH_PRIVATE *chp;	struct stat		b;		if ((chp = ch->ch_private) == NULL) {		cl_log(LOG_CRIT, "Bad ch_private");		badch = TRUE;	}	if (ch->ops != &socket_ops) {		cl_log(LOG_CRIT, "Bad socket_ops");		badch = TRUE;	}	if (ch->ch_status == IPC_DISCONNECT) {		return;	}	if (!IPC_ISRCONN(ch)) {		cl_log(LOG_CRIT, "Bad ch_status [%d]", ch->ch_status);		badch = TRUE;	}	if (ch->farside_pid < 0 || ch->farside_pid > MAXPID) {		cl_log(LOG_CRIT, "Bad farside_pid");		badch = TRUE;	}	if (fstat(chp->s, &b) < 0) {		badch = TRUE;	}else if ((b.st_mode & S_IFMT) != S_IFSOCK) {		cl_log(LOG_CRIT, "channel @ 0x%lx: not a socket"		,	(unsigned long)ch);		badch = TRUE;	}	if (chp->remaining_data < 0) {		cl_log(LOG_CRIT, "Negative remaining_data");		badch = TRUE;	}	if (chp->remaining_data < 0 || chp->remaining_data > MAXDATASIZE) {		cl_log(LOG_CRIT, "Excessive/bad remaining_data");		badch = TRUE;	}	if (chp->remaining_data && chp->buf_msg == NULL) {		cl_log(LOG_CRIT		,	"inconsistent remaining_data [%ld]/buf_msg[0x%lx]"		,	(long)chp->remaining_data, (unsigned long)chp->buf_msg);		badch = TRUE;	}	if (chp->remaining_data == 0 && chp->buf_msg != NULL) {		cl_log(LOG_CRIT		,	"inconsistent remaining_data [%ld]/buf_msg[0x%lx] (2)"		,	(long)chp->remaining_data, (unsigned long)chp->buf_msg);		badch = TRUE;	}	if (ch->send_queue == NULL || ch->recv_queue == NULL) {		cl_log(LOG_CRIT, "bad send/recv queue");		badch = TRUE;	}	if (ch->recv_queue->current_qlen < 0	||	ch->recv_queue->current_qlen > ch->recv_queue->max_qlen) {		cl_log(LOG_CRIT, "bad recv queue");		badch = TRUE;	}	if (ch->send_queue->current_qlen < 0	||	ch->send_queue->current_qlen > ch->send_queue->max_qlen) {		cl_log(LOG_CRIT, "bad send_queue");		badch = TRUE;	}	if (badch) {		cl_log(LOG_CRIT, "Bad channel @ 0x%lx", (unsigned long)ch);		dump_ipc_info(ch);		abort();	}}#endif#ifdef CHEAT_CHECKSlong	SeqNums[32];static longcheat_get_sequence(IPC_Message* msg){	const char header [] = "String-";	size_t header_len = sizeof(header)-1;	char *	body;	if (msg == NULL || msg->msg_len < sizeof(header)	||	msg->msg_len > sizeof(header) + 10	||	strncmp(msg->msg_body, header, header_len) != 0) {		return -1L;	}	body = msg->msg_body;	return atol(body+header_len);}static char SavedReadBody[32];static char SavedReceivedBody[32];static char SavedQueuedBody[32];static char SavedSentBody[32];#ifndef MIN#	define MIN(a,b)	(a < b ? a : b)#endifstatic voidsave_body(struct IPC_MESSAGE *msg, char * savearea, size_t length){	int mlen = strnlen(msg->msg_body, MIN(length, msg->msg_len));	memcpy(savearea, msg->msg_body, mlen);	savearea[mlen] = EOS;}static voidaudit_readmsgq_msg(gpointer msg, gpointer user_data){	long	cheatseq = cheat_get_sequence(msg);	if (cheatseq < SeqNums[1] || cheatseq > SeqNums[2]) {		cl_log(LOG_ERR		,	"Read Q Message %ld not in range [%ld:%ld]"		,	cheatseq, SeqNums[1], SeqNums[2]);	}}static void saveandcheck(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* msg, char * savearea,	size_t savesize, long* lastseq, const char * text){	long	cheatseq = cheat_get_sequence(msg);	save_body(msg, savearea, savesize);	if (*lastseq != 0 ) {		if (cheatseq != *lastseq +1) {			int	j;			cl_log(LOG_ERR			,	"%s packets out of sequence! %ld versus %ld [pid %d]"			,	text, cheatseq, *lastseq, (int)getpid());			dump_ipc_info(ch);			for (j=0; j < 4; ++j) {				cl_log(LOG_DEBUG				,	"SeqNums[%d] = %ld"				,	j, SeqNums[j]);			}			cl_log(LOG_ERR			,	"SocketIPCStats.nsent = %ld"			,	SocketIPCStats.nsent);			cl_log(LOG_ERR			,	"SocketIPCStats.noutqueued = %ld"			,	SocketIPCStats.noutqueued);			cl_log(LOG_ERR			,	"SocketIPCStats.nreceived = %ld"			,	SocketIPCStats.nreceived);			cl_log(LOG_ERR			,	"SocketIPCStats.ninqueued = %ld"			,	SocketIPCStats.ninqueued);		}			}	g_list_foreach(ch->recv_queue->queue, audit_readmsgq_msg, NULL);	if (cheatseq > 0) {		*lastseq = cheatseq;	}}#	define	CHECKFOO(which, ch, msg, area, text)	{			\		saveandcheck(ch,msg,area,sizeof(area),SeqNums+which,text);	\	}#else#	define	CHECKFOO(which, ch, msg, area, text)	/* Nothing */#endifstatic voiddump_msg(struct IPC_MESSAGE *msg, const char * label){#ifdef CHEAT_CHECKS	cl_log(LOG_DEBUG, "%s packet (length %d) [%s] %ld pid %d"	,	label,	(int)msg->msg_len, (char*)msg->msg_body	,	cheat_get_sequence(msg), (int)getpid());#else	cl_log(LOG_DEBUG, "%s length %d [%s] pid %d"	,	label,	(int)msg->msg_len, (char*)msg->msg_body	,	(int)getpid());#endif}static voiddump_msgq_msg(gpointer data, gpointer user_data){	dump_msg(data, user_data);}voiddump_ipc_info(const IPC_Channel* chan){	char squeue[] = "Send queue";	char rqueue[] = "Receive queue";#ifdef CHEAT_CHECKS	cl_log(LOG_DEBUG, "Saved Last Body read[%s]", SavedReadBody);	cl_log(LOG_DEBUG, "Saved Last Body received[%s]", SavedReceivedBody);	cl_log(LOG_DEBUG, "Saved Last Body Queued[%s]", SavedQueuedBody);	cl_log(LOG_DEBUG, "Saved Last Body Sent[%s]", SavedSentBody);#endif	g_list_foreach(chan->send_queue->queue, dump_msgq_msg, squeue);	g_list_foreach(chan->recv_queue->queue, dump_msgq_msg, rqueue);	CHANAUDIT(chan);}/* destroy socket wait channel */ static void socket_destroy_wait_conn(struct IPC_WAIT_CONNECTION * wait_conn){	struct SOCKET_WAIT_CONN_PRIVATE * wc = wait_conn->ch_private;	if (wc != NULL) {		close(wc->s);		cl_poll_ignore(wc->s);		unlink(wc->path_name);		g_free(wc);	}	g_free((void*) wait_conn);}/* return a fd which can be listened on for new connections. */static int socket_wait_selectfd(struct IPC_WAIT_CONNECTION *wait_conn){	struct SOCKET_WAIT_CONN_PRIVATE * wc = wait_conn->ch_private;	return (wc == NULL ? -1 : wc->s);}/* socket accept connection. */static struct IPC_CHANNEL* socket_accept_connection(struct IPC_WAIT_CONNECTION * wait_conn,	struct IPC_AUTH *auth_info){	/* make peer_addr a pointer so it can be used by the	 *   USE_BINDSTAT_CREDS implementation of socket_verify_auth()	 */	struct sockaddr_un *			peer_addr;	struct IPC_CHANNEL *			ch = NULL;	int					sin_size;	int					s;	int					new_sock;	struct SOCKET_WAIT_CONN_PRIVATE*	conn_private;	struct SOCKET_CH_PRIVATE *		ch_private ;	int auth_result = IPC_FAIL;	gboolean was_error = FALSE;		peer_addr = g_new(struct sockaddr_un, 1);	/* get select fd */	s = wait_conn->ops->get_select_fd(wait_conn); 	if (s < 0) {		cl_log(LOG_ERR, "get_select_fd: invalid fd");		g_free(peer_addr);		peer_addr = NULL;		return NULL;	}	/* Get client connection. */	sin_size = sizeof(struct sockaddr_un);	if ((new_sock = accept(s, (struct sockaddr *)peer_addr, &sin_size)) == -1){		if (errno != EAGAIN && errno != EWOULDBLOCK) {			cl_perror("socket_accept_connection: accept");		}		was_error = TRUE;			}else{		if ((ch = socket_server_channel_new(new_sock)) == NULL) {			cl_log(LOG_ERR			,	"socket_accept_connection:"			        " Can't create new channel");			was_error = TRUE;		}else{			conn_private=(struct SOCKET_WAIT_CONN_PRIVATE*)			(	wait_conn->ch_private);			ch_private = (struct SOCKET_CH_PRIVATE *)(ch->ch_private);			strncpy(ch_private->path_name,conn_private->path_name			,		sizeof(conn_private->path_name));			ch_private->peer_addr = peer_addr;		}	}	/* Verify the client authorization information. */	if(was_error == FALSE) {		auth_result = ch->ops->verify_auth(ch, auth_info);		if (auth_result == IPC_OK) {			ch->ch_status = IPC_CONNECT;			ch->farside_pid = socket_get_farside_pid(new_sock);			return ch;		}	}  	g_free(peer_addr);	peer_addr = NULL;	return NULL;}static voidsocket_destroy_channel(struct IPC_CHANNEL * ch){	while (ch->ch_status == IPC_CONNECT

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -