⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ipcsocket.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
				break;			}			conn_info->buf_msg			= socket_message_new(ch, head.msg_len);			conn_info->remaining_data = head.msg_len;			/* Next time we'll read the message body */			continue;		}		/* No, not the start of a new message. Therefore we */		/* must have received (more) data from an old message */		conn_info->remaining_data = conn_info->remaining_data		-	msg_len;		if (conn_info->remaining_data < 0){			cl_log(LOG_CRIT			,	"received more data than expected");			conn_info->remaining_data = 0;			retcode = IPC_FAIL;		}else if (conn_info->remaining_data == 0){#if 0			cl_log(LOG_DEBUG, "channel: 0x%lx"			,	(unsigned long)ch);			cl_log(LOG_DEBUG, "New recv_queue = 0x%lx"			,	(unsigned long)ch->recv_queue);			cl_log(LOG_DEBUG, "buf_msg: len = %ld, body =  0x%lx"			,	(unsigned long)conn_info->buf_msg->msg_len			,	(unsigned long)conn_info->buf_msg->msg_body);			cl_log(LOG_DEBUG, "buf_msg: contents: %s"			,	(char *)conn_info->buf_msg->msg_body);#endif			/* Got the last of the message! */			/* Append gotten message to receive queue */			CHECKFOO(2,ch, conn_info->buf_msg, SavedReceivedBody			,	"received message");			ch->recv_queue->queue =	g_list_append			(	ch->recv_queue->queue, conn_info->buf_msg);			ch->recv_queue->current_qlen++;			SocketIPCStats.ninqueued++;			conn_info->buf_msg = NULL;		}	}	/* Check for errors uncaught by recv() */	/* NOTE: It doesn't seem right we have to do this every time */	/* FIXME?? */	if ((retcode == IPC_OK) 	&&	(sockpoll.fd = conn_info->s) >= 0) {		/* Just check for errors, not for data */		sockpoll.events = 0;		ipc_pollfunc_ptr(&sockpoll, 1, 0);		retcode = socket_check_poll(ch, &sockpoll);	}		CHANAUDIT(ch);	if (retcode != IPC_OK) {		return retcode;	}		return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;}static intsocket_resume_io_write(struct IPC_CHANNEL *ch, int* nmsg){	int				retcode = IPC_OK;	struct SOCKET_CH_PRIVATE*	conn_info;			CHANAUDIT(ch);	conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private;	*nmsg = 0;   	while (ch->ch_status == IPC_CONNECT	&&		retcode == IPC_OK	&&		ch->send_queue->current_qlen > 0) {				GList *				element;		struct IPC_MESSAGE *		msg;		struct SOCKET_MSG_HEAD		head;		int				sendrc = 0;		char*				p;		int				len;		int				head_len = sizeof(struct SOCKET_MSG_HEAD);		int				bytes_send;		int				i;				CHANAUDIT(ch);		element = g_list_first(ch->send_queue->queue);		if (element == NULL) {			/* OOPS!  - correct consistency problem */			ch->send_queue->current_qlen = 0;			break;		}		msg = (struct IPC_MESSAGE *) (element->data);		head.msg_len = msg->msg_len;				if (ch->bytes_remaining  == 0){			/*start to send a new message*/			bytes_send = 0;		}else {			bytes_send = head.msg_len + head_len - ch->bytes_remaining;		}		if (bytes_send < 0){			cl_log(LOG_ERR, "socket_resume_io_write: wrong length"			       "head.msg_len =%d, head_len =%d, ch->bytes_remaining=%d",			       head.msg_len, head_len, ch->bytes_remaining);			return IPC_FAIL;		}				for ( i = 0 ; i < 2; i++){			switch (i){			case 0:				/* for sending out head*/				if (bytes_send <  head_len ){					/*we need send head or partial head*/					p= ((char*)&head) + bytes_send;					len = head_len - bytes_send;				}else{					/*head is already sent out*/					continue;				}								break;			default:				/* for sending out the message body*/				p = (char*)msg->msg_body + bytes_send - head_len;				len = msg->msg_len + head_len - bytes_send;							}						do{				sendrc = send(conn_info->s, p, len, (MSG_DONTWAIT|MSG_NOSIGNAL));				SocketIPCStats.last_send_rc = sendrc;				SocketIPCStats.last_send_errno = errno;				++SocketIPCStats.send_count;								if (sendrc < 0) {					switch (errno) {					case EAGAIN:						retcode = IPC_OK;						break;					case EPIPE:						socket_check_disc_pending(ch);						retcode = IPC_BROKEN;						break;					default:						cl_perror("socket_resume_io_write: send1 bad errno");						ch->ch_status = IPC_DISCONNECT;						retcode = IPC_FAIL;						break;					}					ch->bytes_remaining = msg->msg_len + head_len - bytes_send;						goto out;				}								p += sendrc;				len -= sendrc;								bytes_send += sendrc;							}while( len > 0);					}				if (bytes_send > head_len + msg->msg_len ){			cl_log(LOG_INFO, "socket_resme_io_write:"			       " sending out more bytes then the message has");			return IPC_FAIL;		}				if (bytes_send == head_len + msg->msg_len ){			ch->send_queue->queue = 				g_list_remove(ch->send_queue->queue,	msg);						if (msg->msg_done != NULL) {				msg->msg_done(msg);			}			SocketIPCStats.nsent++;			ch->send_queue->current_qlen--;			(*nmsg)++;			ch->bytes_remaining = 0;		}else {			ch->bytes_remaining = msg->msg_len + head_len - bytes_send;						break;		}	} out:	CHANAUDIT(ch);	if (retcode != IPC_OK) {		return retcode;	}	return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;}static intsocket_resume_io(struct IPC_CHANNEL *ch){	int		rc1 = IPC_OK;	int		rc2 = IPC_OK;	gboolean	rstarted;	gboolean	wstarted;	gboolean	OKonce = FALSE;	int		nwmsg = 1;#ifdef DEBUG	int		count = 0;#endif	CHANAUDIT(ch);	if (!IPC_ISRCONN(ch)) {		return IPC_BROKEN;	}	do {		rc1 = socket_resume_io_read(ch, &rstarted, FALSE);		CHANAUDIT(ch);		if (ch->ch_status == IPC_DISC_PENDING) {			rc2 = IPC_OK;			wstarted = FALSE;		}				if (nwmsg > 0){			nwmsg = 0;			rc2 = socket_resume_io_write(ch, &nwmsg);		}				CHANAUDIT(ch);		if (rc1 == IPC_OK || rc2 == IPC_OK) {			OKonce = TRUE;		}#ifdef DEBUG		++count;		if (rc1 == IPC_OK && rc2 == IPC_OK && (rstarted||wstarted)) {			cl_log(LOG_DEBUG			,	"continuing: rstarted = %d wstarted = %d count: %d"			,	rstarted, wstarted, count);		}#endif	} while ((rstarted||nwmsg > 0) && IPC_ISRCONN(ch));	if (IPC_ISRCONN(ch)) {		if (rc1 != IPC_OK) {			cl_log(LOG_ERR			,	"socket_resume_io_read() failure");		}		if (rc2 != IPC_OK) {			cl_log(LOG_ERR			,	"socket_resume_io_write() failure");		}	}else{		return (OKonce ? IPC_OK : IPC_BROKEN);	}	return (rc1 != IPC_OK ? rc1 : rc2);}static intsocket_get_recv_fd(struct IPC_CHANNEL *ch){	struct SOCKET_CH_PRIVATE* chp = ch ->ch_private;	return (chp == NULL ? -1 : chp->s);}static intsocket_get_send_fd(struct IPC_CHANNEL *ch){	return socket_get_recv_fd(ch);}static intsocket_set_send_qlen (struct IPC_CHANNEL* ch, int q_len){  /* This seems more like an assertion failure than a normal error */  if (ch->send_queue == NULL) {    return IPC_FAIL;  }  ch->send_queue->max_qlen = q_len;  return IPC_OK;   }static intsocket_set_recv_qlen (struct IPC_CHANNEL* ch, int q_len){  /* This seems more like an assertion failure than a normal error */  if (ch->recv_queue == NULL) {    return IPC_FAIL;  }    ch->recv_queue->max_qlen = q_len;  return IPC_OK;}/* socket object of the function table */static struct IPC_WAIT_OPS socket_wait_ops = {  socket_destroy_wait_conn,  socket_wait_selectfd,  socket_accept_connection,};/*  * create a new ipc queue whose length = 0 and inner queue = NULL. * return the pointer to a new ipc queue or NULL is the queue can't be created. */static struct IPC_QUEUE*socket_queue_new(void){  struct IPC_QUEUE *temp_queue;    /* temp queue with length = 0 and inner queue = NULL. */  temp_queue =  g_new(struct IPC_QUEUE, 1);  temp_queue->current_qlen = 0;  temp_queue->max_qlen = DEFAULT_MAX_QLEN;  temp_queue->queue = NULL;  return temp_queue;}/*  * destory a ipc queue and clean all memory space assigned to this queue. * parameters: *      q  (IN) the pointer to the queue which should be destroied. * *	FIXME:  This function does not free up messages that might *	be in the queue. */ voidsocket_destroy_queue(struct IPC_QUEUE * q){  g_list_free(q->queue);  g_free((void *) q);}/*  * socket_wait_conn_new: * Called by ipc_wait_conn_constructor to get a new socket * waiting connection. * (better explanation of this role might be nice) *  * Parameters : *     ch_attrs (IN) the attributes used to create this connection. * * Return : *	the pointer to the new waiting connection or NULL if the connection *	can't be created. *  * NOTE : *   for domain socket implementation, the only attribute needed is path name. *	so the user should  *   create the hash table like this:  *     GHashTable * attrs;  *     attrs = g_hash_table_new(g_str_hash, g_str_equal);  *     g_hash_table_insert(attrs, PATH_ATTR, path_name);    *     here PATH_ATTR is defined as "path".  */struct IPC_WAIT_CONNECTION *socket_wait_conn_new(GHashTable *ch_attrs){  struct IPC_WAIT_CONNECTION * temp_ch;  char *path_name;  char *mode_attr;  struct sockaddr_un my_addr;  int s;  struct SOCKET_WAIT_CONN_PRIVATE *wait_private;  mode_t s_mode;    path_name = (char *) g_hash_table_lookup(ch_attrs, IPC_PATH_ATTR);  mode_attr = (char *) g_hash_table_lookup(ch_attrs, IPC_MODE_ATTR);  if (mode_attr != NULL) {    s_mode = (mode_t)strtoul((const char *)mode_attr, NULL, 8);  }else{    s_mode = 0777;  }  if (path_name == NULL) {    return NULL;  }  /* prepare the unix domain socket */  if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) {    cl_perror("socket_wait_conn_new: socket() failure");    return NULL;  }  if (unlink(path_name) < 0 && errno != ENOENT) {    cl_perror("socket_wait_conn_new: unlink failure");  }  memset(&my_addr, 0, sizeof(my_addr));  my_addr.sun_family = AF_LOCAL;         /* host byte order */  if (strlen(path_name) >= sizeof(my_addr.sun_path)) {    close(s);    return NULL;  }      strncpy(my_addr.sun_path, path_name, sizeof(my_addr.sun_path));      if (bind(s, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) {    cl_perror("socket_wait_conn_new: trying to create in %s bind:"    ,	path_name);    close(s);    return NULL;  }  /* Change the permission of the socket */  if (chmod(path_name,s_mode) < 0){    cl_perror("socket_wait_conn_new: failure trying to chmod %s"    ,	path_name);    close(s);    return NULL;  }  /* listen to the socket */  if (listen(s, MAX_LISTEN_NUM) == -1) {    cl_perror("socket_wait_conn_new: listen(MAX_LISTEN_NUM)");    close(s);    return NULL;  }  if (fcntl(s, F_SETFL, O_NONBLOCK) < 0) {    cl_perror("socket_wait_conn_new: cannot set O_NONBLOCK");    close(s);    return NULL;  }    wait_private =  g_new(struct SOCKET_WAIT_CONN_PRIVATE, 1);  wait_private->s = s;  strncpy(wait_private->path_name, path_name, sizeof(wait_private->path_name));  temp_ch = g_new(struct IPC_WAIT_CONNECTION, 1);  temp_ch->ch_private = (void *) wait_private;  temp_ch->ch_status = IPC_WAIT;  temp_ch->ops = (struct IPC_WAIT_OPS *)&socket_wait_ops;    return temp_ch;}/*  * will be called by ipc_channel_constructor to create a new socket channel. * parameters : *      attrs (IN) the hash table of the attributes used to create this channel. * * return: *      the pointer to the new waiting channel or NULL if the channel can't be created.*/struct IPC_CHANNEL * socket_client_channel_new(GHashTable *ch_attrs) {  struct IPC_CHANNEL * temp_ch;  struct SOCKET_CH_PRIVATE* conn_info;  char *path_name;  int sockfd;  /*   * I don't really understand why the client and the server use different   * parameter names...   *   * It's a really bad idea to store both integers and strings   * in the same table.   *   * Maybe we need an internal function with a different set of parameters?   */   /*   * if we want to seperate them. I suggest

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -