⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 callbacks.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 2 页
字号:
/* $Id: callbacks.c,v 1.23 2005/02/19 18:11:03 andrew Exp $ *//*  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net> *  * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. *  * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <sys/param.h>#include <stdio.h>#include <sys/types.h>#include <unistd.h>#include <stdlib.h>#include <errno.h>#include <fcntl.h>#include <hb_api.h>#include <clplumbing/uids.h>#include <crm/crm.h>#include <crm/cib.h>#include <crm/msg_xml.h>#include <crm/common/ipc.h>#include <crm/common/ctrl.h>#include <crm/common/xml.h>#include <crm/common/msg.h>#include <cibio.h>#include <callbacks.h>#include <cibmessages.h>#include <crm/dmalloc_wrapper.h>gint cib_GCompareFunc(gconstpointer a, gconstpointer b);gboolean cib_msg_timeout(gpointer data);void cib_GHFunc(gpointer key, gpointer value, gpointer user_data);GHashTable *peer_hash = NULL;int        next_client_id  = 0;gboolean   cib_is_master   = FALSE;gboolean   cib_have_quorum = FALSE;GHashTable *client_list    = NULL;extern const char *cib_our_uname;extern ll_cluster_t *hb_conn;/* technically bump does modify the cib... * but we want to split the "bump" from the "sync" */cib_operation_t cib_server_ops[] = {	{NULL,		     FALSE, FALSE, FALSE, FALSE, cib_process_default},	{CRM_OP_NOOP,	     FALSE, FALSE, FALSE, FALSE, cib_process_default},	{CRM_OP_RETRIVE_CIB, FALSE, FALSE, FALSE, FALSE, cib_process_query},	{CRM_OP_CIB_SLAVE,   FALSE, TRUE,  FALSE, FALSE, cib_process_readwrite},	{CRM_OP_CIB_SLAVEALL,TRUE,  TRUE,  FALSE, FALSE, cib_process_readwrite},	{CRM_OP_CIB_MASTER,  FALSE, TRUE,  FALSE, FALSE, cib_process_readwrite},	{CRM_OP_CIB_ISMASTER,FALSE, TRUE,  FALSE, FALSE, cib_process_readwrite},	{CRM_OP_CIB_BUMP,    FALSE, TRUE,  TRUE,  FALSE, cib_process_bump},	{CRM_OP_CIB_REPLACE, TRUE,  TRUE,  TRUE,  TRUE,  cib_process_replace},	{CRM_OP_CIB_CREATE,  TRUE,  TRUE,  TRUE,  TRUE,  cib_process_modify},	{CRM_OP_CIB_UPDATE,  TRUE,  TRUE,  TRUE,  TRUE,  cib_process_modify},	{CRM_OP_JOINACK,     TRUE,  TRUE,  TRUE,  TRUE,  cib_process_modify},	{CRM_OP_SHUTDOWN_REQ,TRUE,  TRUE,  TRUE,  TRUE,  cib_process_modify},	{CRM_OP_CIB_DELETE,  TRUE,  TRUE,  TRUE,  TRUE,  cib_process_modify},	{CRM_OP_CIB_QUERY,   FALSE, FALSE, TRUE,  FALSE, cib_process_query},	{CRM_OP_QUIT,	     FALSE, TRUE,  FALSE, FALSE, cib_process_quit},	{CRM_OP_PING,	     FALSE, FALSE, FALSE, FALSE, cib_process_ping},	{CRM_OP_CIB_ERASE,   TRUE,  TRUE,  TRUE,  FALSE, cib_process_erase}};int send_via_callback_channel(HA_Message *msg, const char *token);enum cib_errors cib_process_command(	const HA_Message *request, HA_Message **reply, gboolean privileged);gboolean cib_common_callback(	IPC_Channel *channel, gpointer user_data, gboolean privileged);enum cib_errors cib_get_operation_id(const HA_Message * msg, int *operation);gboolean cib_process_disconnect(IPC_Channel *channel, cib_client_t *cib_client);gbooleancib_client_connect(IPC_Channel *channel, gpointer user_data){	gboolean auth_failed = FALSE;	gboolean can_connect = TRUE;	gboolean (*client_callback)(IPC_Channel *channel, gpointer user_data) = NULL;	cib_client_t *new_client = NULL;	crm_devel("Connecting channel");	if (channel == NULL) {		crm_err("Channel was NULL");		can_connect = FALSE;			} else if (channel->ch_status == IPC_DISCONNECT) {		crm_err("Channel was disconnected");		can_connect = FALSE;			} else if(user_data == NULL) {		crm_err("user_data must contain channel name");		can_connect = FALSE;			} else {		crm_malloc(new_client, sizeof(cib_client_t));		new_client->id          = NULL;		new_client->callback_id = NULL;		new_client->source      = NULL;		new_client->channel     = channel;		new_client->channel_name = user_data;		new_client->delegated_calls = NULL;		crm_devel("Created channel %p for channel %s",			  new_client, new_client->channel_name);				client_callback = NULL;				/* choose callback and do auth based on channel_name */		if(safe_str_eq(new_client->channel_name, cib_channel_callback)) {			client_callback = cib_null_callback;		} else {			uuid_t client_id;			uuid_generate(client_id);			crm_malloc(new_client->id, sizeof(char)*36);			uuid_unparse(client_id, new_client->id);			new_client->id[35] = EOS;						uuid_generate(client_id);			crm_malloc(new_client->callback_id, sizeof(char)*36);			uuid_unparse(client_id, new_client->callback_id);			new_client->callback_id[35] = EOS;						client_callback = cib_ro_callback;			if(safe_str_eq(new_client->channel_name, cib_channel_rw)) {				client_callback = cib_rw_callback;			} 		}	}	if(auth_failed) {		crm_err("Connection to %s channel failed authentication",			(char *)user_data);		can_connect = FALSE;	}	if(can_connect == FALSE) {		if(new_client) {			crm_free(new_client->id);			crm_free(new_client->callback_id);		}		crm_free(new_client);		return FALSE;	}		channel->ops->set_recv_qlen(channel, 100);	channel->ops->set_send_qlen(channel, 100);	if(client_callback != NULL) {		new_client->source = G_main_add_IPC_Channel(			G_PRIORITY_LOW, channel, FALSE, client_callback,			new_client, default_ipc_connection_destroy);	}	if(client_callback != cib_null_callback) {		/* send msg to client with uuid to use when signing up for		 * callback channel		 */		HA_Message *reg_msg = ha_msg_new(3);		ha_msg_add(reg_msg, F_CIB_OPERATION, CRM_OP_REGISTER);		ha_msg_add(reg_msg, F_CIB_CLIENTID,  new_client->id);		ha_msg_add(			reg_msg, F_CIB_CALLBACK_TOKEN, new_client->callback_id);				msg2ipcchan(reg_msg, channel);		crm_msg_del(reg_msg);				/* make sure we can find ourselves later for sync calls		 * redirected to the master instance		 */		g_hash_table_insert(client_list, new_client->id, new_client);	}	crm_devel("Channel %s connected for client %s",		  new_client->channel_name, new_client->id);		return TRUE;}gbooleancib_rw_callback(IPC_Channel *channel, gpointer user_data){	return cib_common_callback(channel, user_data, TRUE);}gbooleancib_ro_callback(IPC_Channel *channel, gpointer user_data){	return cib_common_callback(channel, user_data, FALSE);}gbooleancib_null_callback(IPC_Channel *channel, gpointer user_data){	gboolean did_disconnect = TRUE;	HA_Message *op_request = NULL;	cib_client_t *cib_client = user_data;	cib_client_t *hash_client = NULL;	const char *type = NULL;	const char *uuid_ticket = NULL;	if(cib_client == NULL) {		crm_err("Discarding IPC message from unknown source"			" on callback channel.");		return FALSE;	}		while(channel->ops->is_message_pending(channel)) {		if (channel->ch_status == IPC_DISCONNECT) {			/* The message which was pending for us is that			 * the IPC status is now IPC_DISCONNECT			 */			break;		}		op_request = msgfromIPC_noauth(channel);		type = cl_get_string(op_request, F_CIB_OPERATION);		if(safe_str_neq(type, CRM_OP_REGISTER) ) {			crm_warn("Discarding IPC message from %s on callback channel",				 cib_client->id);			crm_msg_del(op_request);			continue;		}				uuid_ticket = cl_get_string(op_request, F_CIB_CALLBACK_TOKEN);		hash_client = g_hash_table_lookup(client_list, uuid_ticket);		if(hash_client != NULL) {			crm_err("Duplicate registration request... disconnecting");			crm_msg_del(op_request);			return FALSE;		}		cib_client->id = crm_strdup(uuid_ticket);		g_hash_table_insert(client_list, cib_client->id, cib_client);		crm_info("Registered %s on %s channel",			    cib_client->id, cib_client->channel_name);		crm_msg_del(op_request);		op_request = ha_msg_new(2);		ha_msg_add(op_request, F_CIB_OPERATION, CRM_OP_REGISTER);		ha_msg_add(op_request, F_CIB_CLIENTID,  cib_client->id);				msg2ipcchan(op_request, channel);		crm_msg_del(op_request);	}	did_disconnect = cib_process_disconnect(channel, cib_client);		if(did_disconnect) {		crm_info("Client disconnected");	}		return did_disconnect;}gbooleancib_common_callback(	IPC_Channel *channel, gpointer user_data, gboolean privileged){	int rc = cib_ok;	int lpc = 0;	int call_type = 0;	int call_options = 0;	const char *op = NULL;	const char *host = NULL;		HA_Message *op_request = NULL;	HA_Message *op_reply   = NULL;	gboolean needs_processing = FALSE;	cib_client_t *cib_client = user_data;	if(cib_client == NULL) {		crm_err("Receieved call from unknown source. Discarding.");		return FALSE;	}		crm_verbose("Callback for %s on %s channel",		    cib_client->id, cib_client->channel_name);	while(channel->ops->is_message_pending(channel)) {		if (channel->ch_status == IPC_DISCONNECT) {			/* The message which was pending for us is that			 * the IPC status is now IPC_DISCONNECT */			break;		}		op_request = msgfromIPC(channel);		if (op_request == NULL) {			perror("Receive failure:");			break;		}		crm_verbose("Processing IPC message from %s on %s channel",			    cib_client->id, cib_client->channel_name); 		crm_log_message(LOG_MSG, op_request);		crm_log_message_adv(LOG_DEV, "cib.client-in.log", op_request);				lpc++;		rc = cib_ok;				if(HA_OK != ha_msg_add(			   op_request, F_CIB_CLIENTID, cib_client->id)) {			crm_err("Couldnt add F_CIB_CLIENTID to message");			rc = cib_msg_field_add;		}		if(rc == cib_ok) {			ha_msg_value_int(				op_request, F_CIB_CALLOPTS, &call_options);			crm_devel("Call options: %.8lx", (long)call_options);						host = cl_get_string(op_request, F_CIB_HOST);			op = cl_get_string(op_request, F_CIB_OPERATION);			rc = cib_get_operation_id(op_request, &call_type);		}				if(rc == cib_ok		   && cib_server_ops[call_type].needs_privileges		   && privileged == FALSE) {			rc = cib_not_authorized;		}		needs_processing = FALSE;		if(rc != cib_ok) {			/* TODO: construct error reply */			crm_err("Pre-processing of command failed: %s",				cib_error2string(rc));					} else if(host == NULL && cib_is_master			&& !(call_options & cib_scope_local)) { 			crm_devel("Processing master %s op locally", op);			needs_processing = TRUE;		} else if(			(host == NULL && (call_options & cib_scope_local))			  || safe_str_eq(host, cib_our_uname)) { 			crm_devel("Processing %s op locally", op);			needs_processing = TRUE;		} else {			/* send via HA to other nodes */			ha_msg_add(op_request, F_CIB_DELEGATED, cib_our_uname);			crm_log_message(LOG_MSG, op_request);			if(host != NULL) {				crm_devel("Forwarding %s op to %s", op, host);				hb_conn->llc_ops->send_ordered_nodemsg(					hb_conn, op_request, host);			} else {				crm_info("Forwarding %s op to master instance",					 op);				hb_conn->llc_ops->sendclustermsg(					hb_conn, op_request);			}			if(call_options & cib_discard_reply) {				crm_trace("Client not interested in reply");			} else if(call_options & cib_sync_call) {				/* keep track of the request so we can time it				 * out if required				 */				HA_Message *saved = ha_msg_copy(op_request);				crm_devel("Registering delegated call from %s",					  cib_client->id);				cib_client->delegated_calls = g_list_append(					cib_client->delegated_calls, saved);			}			crm_msg_del(op_request);			op_request = NULL;			continue;		}		if(needs_processing) { 			crm_verbose("Processing %s op", op);			rc = cib_process_command(				op_request, &op_reply, privileged);			crm_devel("Performing local processing: op=%s origin=%s/%s,%s (update=%s)",				  op, cib_our_uname, cib_client->id,				  cl_get_string(op_request, F_CIB_CALLID),				  (rc==cib_ok && cib_server_ops[call_type].modifies_cib)?"true":"false"); 			crm_devel("Processing complete");		}						crm_devel("processing response cases");		if(rc != cib_ok) {			crm_err("Input message");			crm_log_message(LOG_ERR, op_request);			crm_err("Output message");			crm_log_message(LOG_ERR, op_reply);			crm_log_message_adv(LOG_ERR, DEVEL_DIR"/cib.out.log", op_reply);		}				if(op_reply == NULL) {			crm_trace("No reply is required for op %s", crm_str(op));					} else if(call_options & cib_sync_call) { 			crm_devel("Sending sync reply to %s op", crm_str(op));			crm_log_message(LOG_MSG, op_reply);			if(msg2ipcchan(op_reply, channel) != HA_OK) {				crm_err("Sync reply failed: %s",					cib_error2string(cib_reply_failed));				if(rc == cib_ok) {					rc = cib_reply_failed;				}			}		} else {			enum cib_errors local_rc = cib_ok;			/* send reply via client's callback channel */ 			crm_devel("Sending async reply %p to %s op",				  op_reply, crm_str(op));			crm_log_message(LOG_MSG, op_reply);			local_rc = send_via_callback_channel(				op_reply, cib_client->callback_id);			if(local_rc != cib_ok) {				crm_err("ASync reply failed: %s",					cib_error2string(local_rc));				if(rc != cib_ok) {					local_rc = cib_reply_failed;				}			}		}				crm_devel("Cleaning up reply");		crm_msg_del(op_reply);		op_reply = NULL;		crm_devel("Processing forward cases");		if(rc == cib_ok		   && cib_server_ops[call_type].modifies_cib		   && !(call_options & cib_scope_local)) {			/* send via HA to other nodes */ 			crm_info("Forwarding %s op to all instances", op);			ha_msg_add(op_request,				   F_CIB_GLOBAL_UPDATE, XML_BOOLEAN_TRUE);			cl_log_message(LOG_DEV, op_request);			CRM_DEV_ASSERT(hb_conn->llc_ops->sendclustermsg(					       hb_conn, op_request) == HA_OK);		} else {			if(call_options & cib_scope_local ) {				crm_devel("Request not broadcast : local scope");			}			if(cib_server_ops[call_type].modifies_cib == FALSE) {				crm_devel("Request not broadcast : R/O call");			}			if(rc != cib_ok) {				crm_devel("Request not broadcast : call failed : %s",					  cib_error2string(rc));			}		}		crm_devel("Cleaning up request");		crm_msg_del(op_request);		op_request = NULL;	}	crm_verbose("Processed %d messages", lpc);    	return cib_process_disconnect(channel, cib_client);}enum cib_errorscib_process_command(	const HA_Message *request, HA_Message **reply, gboolean privileged){	crm_data_t *output   = NULL;	crm_data_t *input    = NULL;	int call_type      = 0;	int call_options   = 0;	enum cib_errors rc = cib_ok;	const char *op = NULL;	const char *call_id = NULL;	const char *section = NULL;	const char *tmp = NULL;	if(reply) { *reply = NULL; }		/* Start processing the request... */	op = cl_get_string(request, F_CIB_OPERATION);	call_id = cl_get_string(request, F_CIB_CALLID);	ha_msg_value_int(request, F_CIB_CALLOPTS, &call_options);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -