⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ipcsocket.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 4 页
字号:
		int				diff; 		CHANAUDIT(ch);		element = g_list_first(ch->send_queue->queue);		if (element == NULL) {			/* OOPS!  - correct consistency problem */			ch->send_queue->current_qlen = 0;			break;		}		msg = (struct IPC_MESSAGE *) (element->data);				diff = 0;		if (msg->msg_buf ){			diff = (char*)msg->msg_body - (char*)msg->msg_buf;						}		if ( diff < sizeof(struct SOCKET_MSG_HEAD) ){			/* either we don't have msg->msg_buf set			 * or we don't have enough bytes for socket head			 * we delete this message and creates 			 * a new one and delete the old one			 */						newmsg= socket_message_new(ch, msg->msg_len);			if (newmsg == NULL){				cl_log(LOG_ERR, "socket_resume_io_write: "					"allocating memory for new ipc msg failed");                		return IPC_FAIL;			}                	memcpy(newmsg->msg_body, msg->msg_body, msg->msg_len);                	oldmsg = msg;			msg = newmsg;		}				head = (struct SOCKET_MSG_HEAD*) msg->msg_buf;                head->msg_len = msg->msg_len;		head->magic = HEADMAGIC;				if (ch->bytes_remaining == 0){			bytes_remaining = msg->msg_len + ch->msgpad;			p = msg->msg_buf;		}else {			bytes_remaining = ch->bytes_remaining;			p = ((char*)msg->msg_buf) + msg->msg_len + ch->msgpad				- bytes_remaining;					}				sendrc = 0;		                do {                        CHANAUDIT(ch);			                       sendrc = send(conn_info->s, p                       ,       bytes_remaining, (MSG_DONTWAIT|MSG_NOSIGNAL));                        SocketIPCStats.last_send_rc = sendrc;                        SocketIPCStats.last_send_errno = errno;                        ++SocketIPCStats.send_count;						if (sendrc <= 0){				break;			}else {								p = p + sendrc;				bytes_remaining -= sendrc;			}                } while(bytes_remaining > 0 );		ch->bytes_remaining = bytes_remaining;				if (sendrc < 0) {			switch (errno) {			case EAGAIN:				retcode = IPC_OK;				break;			case EPIPE:				socket_check_disc_pending(ch);				retcode = IPC_BROKEN;				break;			default:				cl_perror("socket_resume_io_write"					  ": send2 bad errno");				ch->ch_status = IPC_DISCONNECT;				retcode = IPC_FAIL;				break;			}			break;		}else{			int orig_qlen;			CHECKFOO(3,ch, msg, SavedSentBody, "sent message")			if (oldmsg){		                if (msg->msg_done != NULL) {                                	msg->msg_done(msg);                        	}				msg=oldmsg;			}						if(ch->bytes_remaining ==0){				ch->send_queue->queue = g_list_remove(ch->send_queue->queue,	msg);								if (msg->msg_done != NULL) {					msg->msg_done(msg);				}								SocketIPCStats.nsent++;				orig_qlen = ch->send_queue->current_qlen--;				socket_check_flow_control(ch, orig_qlen, orig_qlen -1 );				(*nmsg)++;			}		}	}	CHANAUDIT(ch);	if (retcode != IPC_OK) {		return retcode;	}	return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;}static intsocket_resume_io(struct IPC_CHANNEL *ch){	int		rc1 = IPC_OK;	int		rc2 = IPC_OK;	int		nwmsg = 1;	int		nbytes_r = 1;	gboolean	OKonce = FALSE;	CHANAUDIT(ch);	if (!IPC_ISRCONN(ch)) {		return IPC_BROKEN;	}				do {		if (nbytes_r > 0){			rc1 = socket_resume_io_read(ch, &nbytes_r, FALSE);		}				if (nwmsg > 0){			nwmsg = 0;			rc2 = socket_resume_io_write(ch, &nwmsg);		}				if (rc1 == IPC_OK || rc2 == IPC_OK) {			OKonce = TRUE;		}			} while ((nbytes_r > 0  || nwmsg > 0) && IPC_ISRCONN(ch));		if (IPC_ISRCONN(ch)) {		if (rc1 != IPC_OK) {			cl_log(LOG_ERR			       ,	"socket_resume_io_read() failure");		}		if (rc2 != IPC_OK) {			cl_log(LOG_ERR			,	"socket_resume_io_write() failure");		}	}else{		return (OKonce ? IPC_OK : IPC_BROKEN);	}	return (rc1 != IPC_OK ? rc1 : rc2);}static intsocket_get_recv_fd(struct IPC_CHANNEL *ch){	struct SOCKET_CH_PRIVATE* chp = ch ->ch_private;	return (chp == NULL ? -1 : chp->s);}static intsocket_get_send_fd(struct IPC_CHANNEL *ch){	return socket_get_recv_fd(ch);}static intsocket_set_send_qlen (struct IPC_CHANNEL* ch, int q_len){  /* This seems more like an assertion failure than a normal error */  if (ch->send_queue == NULL) {    return IPC_FAIL;  }  ch->send_queue->max_qlen = q_len;  return IPC_OK;   }static intsocket_set_recv_qlen (struct IPC_CHANNEL* ch, int q_len){  /* This seems more like an assertion failure than a normal error */  if (ch->recv_queue == NULL) {    return IPC_FAIL;  }    ch->recv_queue->max_qlen = q_len;  return IPC_OK;}static voidsocket_del_ipcmsg(IPC_Message* m){	if (m == NULL){		cl_log(LOG_ERR, "socket_del_ipcmsg:"		       "msg is NULL");		return;	}		if (m->msg_body){		memset(m->msg_body, 0, m->msg_len);	}	if (m->msg_buf){		g_free(m->msg_buf);	}		memset(m, 0, sizeof(*m));	g_free(m);		return;}static IPC_Message*socket_new_ipcmsg(IPC_Channel* ch, const void* data, int len, void* private){	IPC_Message*	hdr;	char*	copy = NULL;	char*	buf;	char*	body;	if (ch == NULL || len < 0){		cl_log(LOG_ERR, "socket_new_ipcmsg:"		       " invalid parameter");		return NULL;	}		if (ch->msgpad > MAX_MSGPAD){		cl_log(LOG_ERR, "socket_new_ipcmsg: too many pads "		       "something is wrong");		return NULL;	}		if ((hdr = (IPC_Message*)g_malloc(sizeof(*hdr)))  == NULL) {		return NULL;	}		memset(hdr, 0, sizeof(*hdr));		if (len > 0){		if ((copy = (char*)g_malloc(ch->msgpad + len)) == NULL) {			g_free(hdr);			return NULL;		}				if (data){			memcpy(copy + ch->msgpad, data, len);		}				buf = copy;		body = copy + ch->msgpad;;	}else {		len = 0;		buf = body = NULL;	}	hdr->msg_len = len;	hdr->msg_buf = buf;	hdr->msg_body = body;	hdr->msg_ch = ch;	hdr->msg_done = socket_del_ipcmsg;	hdr->msg_private = private;		return hdr;}/* socket object of the function table */static struct IPC_WAIT_OPS socket_wait_ops = {  socket_destroy_wait_conn,  socket_wait_selectfd,  socket_accept_connection,};/*  * create a new ipc queue whose length = 0 and inner queue = NULL. * return the pointer to a new ipc queue or NULL is the queue can't be created. */static struct IPC_QUEUE*socket_queue_new(void){  struct IPC_QUEUE *temp_queue;    /* temp queue with length = 0 and inner queue = NULL. */  temp_queue =  g_new(struct IPC_QUEUE, 1);  temp_queue->current_qlen = 0;  temp_queue->max_qlen = DEFAULT_MAX_QLEN;  temp_queue->queue = NULL;  return temp_queue;}/*  * destory a ipc queue and clean all memory space assigned to this queue. * parameters: *      q  (IN) the pointer to the queue which should be destroied. * *	FIXME:  This function does not free up messages that might *	be in the queue. */ voidsocket_destroy_queue(struct IPC_QUEUE * q){  g_list_free(q->queue);  g_free((void *) q);}/*  * socket_wait_conn_new: * Called by ipc_wait_conn_constructor to get a new socket * waiting connection. * (better explanation of this role might be nice) *  * Parameters : *     ch_attrs (IN) the attributes used to create this connection. * * Return : *	the pointer to the new waiting connection or NULL if the connection *	can't be created. *  * NOTE : *   for domain socket implementation, the only attribute needed is path name. *	so the user should  *   create the hash table like this:  *     GHashTable * attrs;  *     attrs = g_hash_table_new(g_str_hash, g_str_equal);  *     g_hash_table_insert(attrs, PATH_ATTR, path_name);    *     here PATH_ATTR is defined as "path".  */struct IPC_WAIT_CONNECTION *socket_wait_conn_new(GHashTable *ch_attrs){  struct IPC_WAIT_CONNECTION * temp_ch;  char *path_name;  char *mode_attr;  struct sockaddr_un my_addr;  int s, flags;  struct SOCKET_WAIT_CONN_PRIVATE *wait_private;  mode_t s_mode;    path_name = (char *) g_hash_table_lookup(ch_attrs, IPC_PATH_ATTR);  mode_attr = (char *) g_hash_table_lookup(ch_attrs, IPC_MODE_ATTR);  if (mode_attr != NULL) {    s_mode = (mode_t)strtoul((const char *)mode_attr, NULL, 8);  }else{    s_mode = 0777;  }  if (path_name == NULL) {    return NULL;  }  /* prepare the unix domain socket */  if ((s = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) {    cl_perror("socket_wait_conn_new: socket() failure");    return NULL;  }  if (unlink(path_name) < 0 && errno != ENOENT) {    cl_perror("socket_wait_conn_new: unlink failure");  }  memset(&my_addr, 0, sizeof(my_addr));  my_addr.sun_family = AF_LOCAL;         /* host byte order */  if (strlen(path_name) >= sizeof(my_addr.sun_path)) {    close(s);    return NULL;  }      strncpy(my_addr.sun_path, path_name, sizeof(my_addr.sun_path));      if (bind(s, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1) {    cl_perror("socket_wait_conn_new: trying to create in %s bind:"    ,	path_name);    close(s);    return NULL;  }  /* Change the permission of the socket */  if (chmod(path_name,s_mode) < 0){    cl_perror("socket_wait_conn_new: failure trying to chmod %s"    ,	path_name);    close(s);    return NULL;  }  /* listen to the socket */  if (listen(s, MAX_LISTEN_NUM) == -1) {    cl_perror("socket_wait_conn_new: listen(MAX_LISTEN_NUM)");    close(s);    return NULL;  }  flags = fcntl(s, F_GETFL, O_NONBLOCK);  if (flags == -1) {    cl_perror("socket_wait_conn_new: cannot read file descriptor flags");    close(s);    return NULL;  }  flags |= O_NONBLOCK;  if (fcntl(s, F_SETFL, flags) < 0) {    cl_perror("socket_wait_conn_new: cannot set O_NONBLOCK");    close(s);    return NULL;  }    wait_private =  g_new(struct SOCKET_WAIT_CONN_PRIVATE, 1);  wait_private->s = s;  strncpy(wait_private->path_name, path_name, sizeof(wait_private->path_name));  temp_ch = g_new(struct IPC_WAIT_CONNECTION, 1);  temp_ch->ch_private = (void *) wait_private;  temp_ch->ch_status = IPC_WAIT;  temp_ch->ops = (struct IPC_WAIT_OPS *)&socket_wait_ops;    return temp_ch;}/*  * will be called by ipc_channel_constructor to create a new socket channel. * parameters : *      attrs (IN) the hash table of the attributes used to create this channel. * * return: *      the pointer to the new waiting channel or NULL if the channel can't be created.*/struct IPC_CHANNEL * socket_client_channel_new(GHashTable *ch_attrs) {  struct IPC_CHANNEL * temp_ch;  struct SOCKET_CH_PRIVATE* conn_info;  char *path_name;  int sockfd, flags;#ifdef USE_BINDSTAT_CREDS  char rand_id[16];  char uuid_str_tmp[40];  struct sockaddr_un sock_addr;#endif  /*   * I don't really understand why the client and the server use different   * parameter names...   *   * It's a really bad idea to store both integers and strings   * in the same table.   *   * Maybe we need an internal function with a different set of parameters?   */   /*   * if we want to seperate them. I suggest   * <client side>   * user call ipc_channel_constructor(ch_type,attrs) to create a new channel.   * ipc_channel_constructor() call socket_channel_new(GHashTable*)to   * create a new socket channel.   * <server side>   * wait_conn->accept_connection() will call another function to create a   * new channel.  This function will take socketfd as the parameter to   * create a socket channel.   */  path_name = (char *) g_hash_table_lookup(ch_attrs, IPC_PATH_ATTR);  if (path_name != NULL) { 	  if (strlen(path_name) >= sizeof(conn_info->path_name)) {	    return NULL;    }    /* prepare the socket */    if ((sockfd = socket(AF_LOCAL, SOCK_STREAM, 0)) == -1) {      cl_perror("socket_client_channel_new: socket");      return NULL;    }  }else{    return NULL;  }    temp_ch = g_new(struct IPC_CHANNEL, 1);  if (temp_ch == NULL){	  cl_log(LOG_ERR, "socket_server_channel_new:"		 " allocating memory for channel failed");	  return NULL;	    }  memset(temp_ch, 0, sizeof(struct IPC_CHANNEL));        conn_info = g_new(struct SOCKET_CH_PRIVATE, 1);  conn_info->peer_addr = NULL;  #ifdef USE_BINDSTAT_CREDS  /* Prepare the socket */  memset(&sock_addr, 0, sizeof(sock_addr));  sock_addr.sun_family = AF_UNIX;  /* make sure socket paths never clash */  uuid_generate(rand_id);  uuid_unparse(rand_id, uuid_str_tmp);    snprintf(sock_addr.sun_path, sizeof(sock_addr.sun_path),	   "%s/%s/%s", HA_VARLIBDIR, PACKAGE, uuid_str_tmp);    unlink(sock_addr.sun_path);  if(bind(sockfd, (struct sockaddr*)&sock_addr, SUN_LEN(&sock_addr)) < 0) {	  perror("Client bind() failure");	  close(sockfd);	  g_free(conn_info); conn_info = NULL;	  g_free(temp_ch);	  return NULL;  }#endif    flags = fcntl(sockfd, F_GETFL, O_NONBLOCK);  if (flags == -1) {	  cl_perror("socket_client_channel_new: cannot read file descriptor flags");	  g_free(conn_info); conn_info = NULL;	  g_free(temp_ch);	  close(sockfd);    return NULL;  }  flags |= O_NONBLOCK;  if (fcntl(sockfd, F_SETFL, flags) < 0) {    cl_perror("socket_client_channel_new: cannot set O_NONBLOCK");    close(sockfd);    g_free(conn_info);    g_free(temp_ch);    return NULL;  }  conn_info->s = sockfd;  conn_info->remaining_data = 0;  conn_info->buf_msg = NULL;    strncpy(conn_info->path_name, path_name, sizeof(conn_info->path_name));  temp_ch->ch_status = IPC_DISCONNECT;#ifdef DEBUG  cl_log(LOG_INFO, "Initializing client socket %d to DISCONNECT", sockfd);#endif  temp_ch->ch_private = (void*) conn_info;  temp_ch->ops = (struct IPC_OPS *)&socket_ops;  temp_ch->msgpad = sizeof(struct SOCKET_MSG_HEAD);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -