⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ipc.c

📁 linux集群服务器软件代码包
💻 C
字号:
/* $Id: ipc.c,v 1.21 2005/02/19 18:11:03 andrew Exp $ *//*  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net> *  * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. *  * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <sys/param.h>#include <stdio.h>#include <sys/types.h>#include <sys/stat.h>#include <unistd.h>#include <errno.h>#include <fcntl.h>#include <crm/crm.h>#include <clplumbing/ipc.h>#include <clplumbing/Gmain_timeout.h>#include <clplumbing/cl_log.h>#include <clplumbing/cl_signal.h>#include <clplumbing/lsb_exitcodes.h>#include <clplumbing/uids.h>#include <clplumbing/realtime.h>#include <clplumbing/GSource.h>#include <clplumbing/cl_poll.h>#ifdef USE_LIBXML#  include <libxml/xmlmemory.h>#  include <libxml/parser.h>#  include <libxml/xpath.h>#endif#include <crm/common/ipc.h>#include <crm/msg_xml.h>#include <ha_msg.h>#include <crm/dmalloc_wrapper.h>/* frees msg */gboolean send_ipc_message(IPC_Channel *ipc_client, HA_Message *msg){	gboolean all_is_good = TRUE;	if (msg == NULL) {		crm_err("cant send NULL message");		all_is_good = FALSE;	} else if (ipc_client == NULL) {		crm_err("cant send message without an IPC Channel");		all_is_good = FALSE;	} else if(ipc_client->ch_status != IPC_CONNECT) {		crm_err("IPC Channel is not connected");		all_is_good = FALSE;	} else if(ipc_client->should_send_blocking == FALSE) {		crm_verbose("Setting IPC Channel to blocking..."			    " least some messages get lost");		ipc_client->should_send_blocking = TRUE;	}	if(all_is_good && msg2ipcchan(msg, ipc_client) != HA_OK) {		crm_err("Could not send IPC, message");		all_is_good = FALSE;		if(ipc_client->ch_status != IPC_CONNECT) {			crm_err("IPC Channel is no longer connected");		}			}		crm_log_message(all_is_good?LOG_DEV:LOG_ERR, msg);#ifdef MSG_LOG	crm_log_message_adv(all_is_good?LOG_DEV:LOG_ERR,			    DEVEL_DIR"/outbound.ipc.log", msg);#endif		crm_msg_del(msg);		return all_is_good;}voiddefault_ipc_connection_destroy(gpointer user_data){	return;}intinit_server_ipc_comms(	char *channel_name,	gboolean (*channel_client_connect)(IPC_Channel *newclient,gpointer user_data),	void (*channel_connection_destroy)(gpointer user_data)){	/* the clients wait channel is the other source of events.	 * This source delivers the clients connection events.	 * listen to this source at a relatively lower priority.	 */    	char    commpath[SOCKET_LEN];	IPC_WaitConnection *wait_ch;		sprintf(commpath, WORKING_DIR "/%s", channel_name);	wait_ch = wait_channel_init(commpath);	if (wait_ch == NULL) {		return 1;	}		G_main_add_IPC_WaitConnection(		G_PRIORITY_LOW, wait_ch, NULL, FALSE,		channel_client_connect, channel_name,		channel_connection_destroy);	crm_devel("Listening on: %s", commpath);	return 0;}GCHSource*init_client_ipc_comms(const char *channel_name,		      gboolean (*dispatch)(			      IPC_Channel* source_data, gpointer user_data),		      void *client_data, IPC_Channel **ch){	IPC_Channel *a_ch = NULL;	GCHSource *the_source = NULL;	void *callback_data = client_data;	a_ch = init_client_ipc_comms_nodispatch(channel_name);	if(ch != NULL) {		*ch = a_ch;		if(callback_data == NULL) {			callback_data = a_ch;		}	}	if(a_ch == NULL) {		crm_err("Setup of client connection failed,"			" not adding channel to mainloop");				return NULL;	}	if(dispatch == NULL) {		crm_warn("No dispatch method specified..."			 "maybe you meant init_client_ipc_comms_nodispatch()?");	} else {		crm_devel("Adding dispatch method to channel");		the_source = G_main_add_IPC_Channel(			G_PRIORITY_LOW, a_ch, FALSE, dispatch, callback_data, 			default_ipc_connection_destroy);	}		return the_source;}IPC_Channel *init_client_ipc_comms_nodispatch(const char *channel_name){	IPC_Channel *ch;	GHashTable  *attrs;	static char  path[] = IPC_PATH_ATTR;	char *commpath = NULL;	int local_socket_len = 2; /* 2 = '/' + '\0' */		local_socket_len += strlen(channel_name);	local_socket_len += strlen(WORKING_DIR);	crm_malloc(commpath, sizeof(char)*local_socket_len);	if(commpath != NULL) {		sprintf(commpath, WORKING_DIR "/%s", channel_name);		commpath[local_socket_len - 1] = '\0';		crm_devel("Attempting to talk on: %s", commpath);	}		attrs = g_hash_table_new(g_str_hash,g_str_equal);	g_hash_table_insert(attrs, path, commpath);	ch = ipc_channel_constructor(IPC_ANYTYPE, attrs);	g_hash_table_destroy(attrs);	if (ch == NULL) {		crm_err("Could not access channel on: %s", commpath);		return NULL;			} else if (ch->ops->initiate_connection(ch) != IPC_OK) {		crm_debug("Could not init comms on: %s", commpath);		return NULL;	}	ch->ops->set_recv_qlen(ch, 100);	ch->ops->set_send_qlen(ch, 100);	ch->should_send_blocking = TRUE;	crm_devel("Processing of %s complete", commpath);	return ch;}IPC_WaitConnection *wait_channel_init(char daemonsocket[]){	IPC_WaitConnection *wait_ch;	mode_t mask;	char path[] = IPC_PATH_ATTR;	GHashTable * attrs;		attrs = g_hash_table_new(g_str_hash,g_str_equal);	g_hash_table_insert(attrs, path, daemonsocket);    	mask = umask(0);	wait_ch = ipc_wait_conn_constructor(IPC_ANYTYPE, attrs);	if (wait_ch == NULL) {		cl_perror("Can't create wait channel of type %s",			  IPC_ANYTYPE);		exit(1);	}	mask = umask(mask);    	g_hash_table_destroy(attrs);    	return wait_ch;}gbooleansubsystem_msg_dispatch(IPC_Channel *sender, void *user_data){	int lpc = 0;	IPC_Message *msg = NULL;	ha_msg_input_t *new_input = NULL;	gboolean all_is_well = TRUE;	const char *sys_to;	const char *task;	while(sender->ops->is_message_pending(sender)) {		gboolean process = FALSE;		if (sender->ch_status == IPC_DISCONNECT) {			/* The message which was pending for us is that			 * the IPC status is now IPC_DISCONNECT */			break;		}		if (sender->ops->recv(sender, &msg) != IPC_OK) {			perror("Receive failure:");			return !all_is_well;		}		if (msg == NULL) {			crm_err("No message this time");			continue;		}		lpc++;		new_input = new_ipc_msg_input(msg);		msg->msg_done(msg);		crm_log_message(LOG_MSG, new_input->msg);		sys_to = cl_get_string(new_input->msg, F_CRM_SYS_TO);		task   = cl_get_string(new_input->msg, F_CRM_TASK);		if(safe_str_eq(task, CRM_OP_HELLO)) {			process = TRUE;		} else if(sys_to == NULL) {			crm_err("Value of %s was NULL!!", F_CRM_SYS_TO);					} else if(task == NULL) {			crm_err("Value of %s was NULL!!", F_CRM_TASK);					} else {			process = TRUE;		}		if(process){			gboolean (*process_function)				(HA_Message *msg, crm_data_t *data, IPC_Channel *sender) = NULL;			process_function = user_data;#ifdef MSG_LOG			crm_log_message_adv(				LOG_MSG, NULL, new_input->msg);#endif			if(FALSE == process_function(				   new_input->msg, new_input->xml, sender)) {				crm_warn("Received a message destined for %s"					 " by mistake", sys_to);			}		} else {#ifdef MSG_LOG			crm_log_message_adv(				LOG_ERR, NULL, new_input->msg);#endif		}				delete_ha_msg_input(new_input);		msg = NULL;	}	/* clean up after a break */	if(msg != NULL)		msg->msg_done(msg);	crm_verbose("Processed %d messages", lpc);	if (sender->ch_status == IPC_DISCONNECT) {		crm_err("The server has left us: Shutting down...NOW");		exit(1); /* shutdown properly later */				return !all_is_well;	}	return all_is_well;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -