⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 join_dc.c

📁 linux集群服务器软件代码包
💻 C
字号:
/*  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net> *  * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. *  * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <portability.h>#include <heartbeat.h>#include <crm/crm.h>#include <crm/cib.h>#include <crm/msg_xml.h>#include <crm/common/xml.h>#include <crmd_fsa.h>#include <crmd_messages.h>#include <crm/dmalloc_wrapper.h>int num_join_invites = 0;GHashTable *join_offers         = NULL;GHashTable *join_requests       = NULL;GHashTable *confirmed_nodes     = NULL;char *max_generation_from = NULL;void initialize_join(gboolean before);void finalize_join_for(gpointer key, gpointer value, gpointer user_data);void join_send_offer(gpointer key, gpointer value, gpointer user_data);/*	 A_DC_JOIN_OFFER_ALL	*/enum crmd_fsa_inputdo_dc_join_offer_all(long long action,		     enum crmd_fsa_cause cause,		     enum crmd_fsa_state cur_state,		     enum crmd_fsa_input current_input,		     fsa_data_t *msg_data){	/* reset everyones status back to down or in_ccm in the CIB */	crm_data_t *cib_copy   = get_cib_copy(fsa_cib_conn);	crm_data_t *fragment   = create_cib_fragment(NULL, NULL);	crm_data_t *update     = find_xml_node(fragment, XML_TAG_CIB, TRUE);	crm_data_t *tmp1       = get_object_root(XML_CIB_TAG_STATUS, cib_copy);	crm_data_t *tmp2       = NULL;	initialize_join(TRUE);		/* catch any nodes that are active in the CIB but not in the CCM list*/	update = get_object_root(XML_CIB_TAG_STATUS, update);	CRM_DEV_ASSERT(update != NULL);	xml_child_iter(		tmp1, node_entry, XML_CIB_TAG_STATE,		const char *node_id = crm_element_value(node_entry, XML_ATTR_UNAME);		gpointer a_node = g_hash_table_lookup(			fsa_membership_copy->members, node_id);		if(a_node != NULL || (safe_str_eq(fsa_our_uname, node_id))) {			/* handled by do_update_cib_node() */			continue;		}		tmp2 = create_node_state(			node_id, node_id, NULL,			XML_BOOLEAN_NO, NULL, CRMD_JOINSTATE_PENDING, NULL);		add_node_copy(update, tmp2);		free_xml(tmp2);		);		/* now process the CCM data */	do_update_cib_nodes(fragment, TRUE);	free_xml(cib_copy);/*  	free_xml(fragment);  *//* BUG!  This should be able to be freed. * I cant find any reason why it shouldnt be able to be, * but libxml still corrupts memory */	#if 0	/* Avoid ordered message delays caused when the CRMd proc	 * isnt running yet (ie. send as a broadcast msg which are never	 * sent ordered.	 */	send_request(NULL, NULL, CRM_OP_WELCOME,		     NULL, CRM_SYSTEM_CRMD, NULL);	#else	crm_debug("Offering membership to %d clients",		  fsa_membership_copy->members_size);		g_hash_table_foreach(fsa_membership_copy->members,			     join_send_offer, NULL);	#endif/* No point hanging around in S_INTEGRATION if we're the only ones here! */	if(g_hash_table_size(join_requests)	   >= fsa_membership_copy->members_size) {		crm_info("Not expecting any join acks");		register_fsa_input(C_FSA_INTERNAL, I_INTEGRATED, NULL);		return I_NULL;	}	/* dont waste time by invoking the pe yet; */	crm_debug("Still waiting on %d outstanding join acks",		  fsa_membership_copy->members_size		  - g_hash_table_size(join_requests));	return I_NULL;}/*	 A_DC_JOIN_OFFER_ONE	*/enum crmd_fsa_inputdo_dc_join_offer_one(long long action,		     enum crmd_fsa_cause cause,		     enum crmd_fsa_state cur_state,		     enum crmd_fsa_input current_input,		     fsa_data_t *msg_data){	gpointer a_node = NULL;	ha_msg_input_t *welcome = fsa_typed_data(fsa_dt_ha_msg);	const char *join_to = NULL;	if(welcome == NULL) {		crm_err("Attempt to send welcome message "			"without a message to reply to!");		return I_NULL;	}		join_to = cl_get_string(welcome->msg, F_CRM_HOST_FROM);	a_node = g_hash_table_lookup(join_requests, join_to);	if(a_node != NULL	   && (cur_state == S_INTEGRATION || cur_state == S_FINALIZE_JOIN)) {		/* note: it _is_ possible that a node will have been		 *  sick or starting up when the original offer was made.		 *  however, it will either re-announce itself in due course		 *  _or_ we can re-store the original offer on the client.		 */		crm_warn("Already offered membership to %s... discarding",			 join_to);		/* Make sure we end up in the correct state again */		if(g_hash_table_size(join_requests)		   >= fsa_membership_copy->members_size) {			crm_info("False alarm, returning to %s",				 fsa_state2string(S_FINALIZE_JOIN));						register_fsa_input(C_FSA_INTERNAL, I_INTEGRATED, NULL);			return I_NULL;		}					crm_debug("Still waiting on %d outstanding join acks",			  fsa_membership_copy->members_size			  - g_hash_table_size(join_requests));			} else {		oc_node_t member;		crm_debug("Processing annouce request from %s in state %s",			  join_to, fsa_state2string(cur_state));				member.node_uname = crm_strdup(join_to);		join_send_offer(NULL, &member, NULL);		crm_free(member.node_uname);		/* this was a genuine join request, cancel any existing		 * transition and invoke the PE		 */		register_fsa_input_w_actions(			msg_data->fsa_cause, I_NULL, NULL, A_TE_CANCEL);	}		return I_NULL;}/*	 A_DC_JOIN_PROCESS_REQ	*/enum crmd_fsa_inputdo_dc_join_req(long long action,	       enum crmd_fsa_cause cause,	       enum crmd_fsa_state cur_state,	       enum crmd_fsa_input current_input,	       fsa_data_t *msg_data){	crm_data_t *generation = NULL;	crm_data_t *our_generation = NULL;	gboolean is_a_member = FALSE;	const char *ack_nack = CRMD_JOINSTATE_MEMBER;	ha_msg_input_t *join_ack = fsa_typed_data(fsa_dt_ha_msg);	const char *join_from = cl_get_string(join_ack->msg,F_CRM_HOST_FROM);	const char *ref       = cl_get_string(join_ack->msg,XML_ATTR_REFERENCE);	const char *op	   = cl_get_string(join_ack->msg, F_CRM_TASK);	gpointer join_node =		g_hash_table_lookup(fsa_membership_copy->members, join_from);	if(safe_str_neq(op, CRM_OP_WELCOME)) {		crm_warn("Ignoring op=%s message", op);		return I_NULL;	}	crm_devel("Processing req from %s", join_from);		if(join_node != NULL) {		is_a_member = TRUE;	}		generation = join_ack->xml;	our_generation = cib_get_generation(fsa_cib_conn);	CRM_ASSERT(our_generation != NULL); /* what to do here? */	if(cib_compare_generation(our_generation, generation) <= 0) {		clear_bit_inplace(fsa_input_register, R_HAVE_CIB);		crm_devel("%s has a better generation number than us",			  join_from);		crm_xml_devel(our_generation, "Our generation");		crm_xml_devel(generation, "Their generation");		max_generation_from = crm_strdup(join_from);	}	free_xml(our_generation);		crm_devel("Welcoming node %s after ACK (ref %s)", join_from, ref);		if(is_a_member == FALSE) {		/* nack them now so they are not counted towards the		 * expected responses		 */		char *local_from = crm_strdup(join_from);		char *local_down = crm_strdup(CRMD_JOINSTATE_DOWN);		crm_err("Node %s is not known to us (ref %s)", join_from, ref);		finalize_join_for(local_from, local_down, NULL);		crm_free(local_from);		crm_free(local_down);				register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL);		return I_NULL;	} else if(/* some reason */ 0) {		/* NACK this client */		ack_nack = CRMD_JOINSTATE_DOWN;	}		/* add them to our list of CRMD_STATE_ACTIVE nodes	   TODO: check its not already there	*/	g_hash_table_insert(		join_requests, crm_strdup(join_from), crm_strdup(ack_nack));	if(g_hash_table_size(join_requests)	   >= fsa_membership_copy->members_size) {		crm_info("That was the last outstanding join ack");		register_fsa_input(C_FSA_INTERNAL, I_INTEGRATED, NULL);		return I_NULL;	}	/* dont waste time by invoking the PE yet; */	crm_debug("Still waiting on %d (of %d) outstanding join acks",		  fsa_membership_copy->members_size		  - g_hash_table_size(join_requests),		  fsa_membership_copy->members_size);		return I_NULL;}/*	A_DC_JOIN_FINALIZE	*/enum crmd_fsa_inputdo_dc_join_finalize(long long action,		    enum crmd_fsa_cause cause,		    enum crmd_fsa_state cur_state,		    enum crmd_fsa_input current_input,		    fsa_data_t *msg_data){	if(max_generation_from == NULL) {		crm_warn("There is no CIB to get..."			 " how did R_HAVE_CIB get unset?");		set_bit_inplace(fsa_input_register, R_HAVE_CIB);	}		if(! is_set(fsa_input_register, R_HAVE_CIB)) {		/* ask for the agreed best CIB */		enum cib_errors rc = cib_ok;		crm_info("Asking %s for its copy of the CIB",			 crm_str(max_generation_from));		CRM_DEV_ASSERT(cib_ok == fsa_cib_conn->cmds->is_master(fsa_cib_conn));					rc = fsa_cib_conn->cmds->sync_from(			fsa_cib_conn, max_generation_from, NULL, cib_sync_call);		if(rc != cib_ok) {			crm_err("Sync from %s resulted in an error: %s",				max_generation_from, cib_error2string(rc));			register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL);			return I_NULL;		}	}	crm_devel("Bumping the epoche and syncing to %d clients",		  g_hash_table_size(join_requests));	fsa_cib_conn->cmds->bump_epoch(		fsa_cib_conn, cib_scope_local|cib_sync_call);	CRM_DEV_ASSERT(cib_ok == fsa_cib_conn->cmds->sync(			       fsa_cib_conn, NULL, cib_sync_call));		num_join_invites = 0;	crm_debug("Notifying %d clients of join results",		  g_hash_table_size(join_requests));	g_hash_table_foreach(join_requests, finalize_join_for, NULL);		if(num_join_invites <= g_hash_table_size(confirmed_nodes)) {		crm_info("Not expecting any join confirmations");				register_fsa_input(C_FSA_INTERNAL, I_FINALIZED, NULL);		return I_NULL;	}	/* dont waste time by invoking the PE yet; */	crm_debug("Still waiting on %d outstanding join confirmations",		  num_join_invites - g_hash_table_size(confirmed_nodes));	return I_NULL;}/*	A_DC_JOIN_PROCESS_ACK	*/enum crmd_fsa_inputdo_dc_join_ack(long long action,	       enum crmd_fsa_cause cause,	       enum crmd_fsa_state cur_state,	       enum crmd_fsa_input current_input,	       fsa_data_t *msg_data){	/* now update them to "member" */	crm_data_t *update = NULL;	ha_msg_input_t *join_ack = fsa_typed_data(fsa_dt_ha_msg);	const char *join_from = cl_get_string(join_ack->msg, F_CRM_HOST_FROM);	const char *op = cl_get_string(join_ack->msg, F_CRM_TASK);	const char *type = cl_get_string(join_ack->msg, F_SUBTYPE);	const char *join_state = NULL;	if(safe_str_neq(op, CRM_OP_JOINACK)) {		crm_warn("Ignoring op=%s message", op);		return I_NULL;	} else if(safe_str_eq(type, XML_ATTR_REQUEST)) {		crm_verbose("Ignoring request");		crm_log_message(LOG_VERBOSE, join_ack->msg);		return I_NULL;	}		crm_debug("Processing ack from %s", join_from);	join_state = (const char *)		g_hash_table_lookup(join_requests, join_from);		if(join_state == NULL) {		crm_err("Join not in progress: ignoring join from %s",			join_from);		register_fsa_error(C_FSA_INTERNAL, I_FAIL, NULL);		return I_NULL;			} else if(safe_str_neq(join_state, CRMD_JOINSTATE_MEMBER)) {		crm_err("Node %s wasnt invited to join the cluster",join_from);		return I_NULL;	}		g_hash_table_insert(confirmed_nodes, crm_strdup(join_from),			    crm_strdup(CRMD_JOINSTATE_MEMBER));	/* the updates will actually occur in reverse order because of	 * the LIFO nature of the fsa input queue	 */		/* update CIB with the current LRM status from the node */	update_local_cib(copy_xml_node_recursive(join_ack->xml));	/* update node entry in the status section  */	crm_info("Updating node state to %s for %s", join_state, join_from);	update = create_node_state(		join_from, join_from,		ACTIVESTATUS, NULL, ONLINESTATUS, join_state, join_state);	set_xml_property_copy(update,XML_CIB_ATTR_EXPSTATE, CRMD_STATE_ACTIVE);	update_local_cib(create_cib_fragment(update, NULL));	if(num_join_invites <= g_hash_table_size(confirmed_nodes)) {		crm_info("That was the last outstanding join confirmation");		register_fsa_input_later(C_FSA_INTERNAL, I_FINALIZED, NULL);				return I_NULL;	}	/* dont waste time by invoking the pe yet; */	crm_debug("Still waiting on %d outstanding join confirmations",		  num_join_invites - g_hash_table_size(confirmed_nodes));		return I_NULL;}voidfinalize_join_for(gpointer key, gpointer value, gpointer user_data){	const char *join_to = NULL;	const char *join_state = NULL;	HA_Message *acknak = NULL;		if(key == NULL || value == NULL) {		return;	}	join_to    = (const char *)key;	join_state = (const char *)value;	/* make sure the node exists in the config section */	create_node_entry(join_to, join_to, CRMD_JOINSTATE_MEMBER);	/* send the ack/nack to the node */	acknak = create_request(		CRM_OP_JOINACK, NULL, join_to,		CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL);	/* set the ack/nack */	if(safe_str_eq(join_state, CRMD_JOINSTATE_MEMBER)) {		crm_info("ACK'ing join request from %s, state %s",			 join_to, join_state);		num_join_invites++;		ha_msg_add(acknak, CRM_OP_JOINACK, XML_BOOLEAN_TRUE);	} else {		crm_warn("NACK'ing join request from %s, state %s",			 join_to, join_state);				ha_msg_add(acknak, CRM_OP_JOINACK, XML_BOOLEAN_FALSE);	}		send_msg_via_ha(fsa_cluster_conn, acknak);}voidinitialize_join(gboolean before){	/* clear out/reset a bunch of stuff */	if(join_offers != NULL) {		g_hash_table_destroy(join_offers);	}	if(join_requests != NULL) {		g_hash_table_destroy(join_requests);	}	if(confirmed_nodes != NULL) {		g_hash_table_destroy(confirmed_nodes);	}	if(before) {		if(max_generation_from != NULL) {			crm_free(max_generation_from);			max_generation_from = NULL;		}		set_bit_inplace(fsa_input_register, R_HAVE_CIB);		clear_bit_inplace(fsa_input_register, R_CIB_ASKED);	}		join_offers     = g_hash_table_new(g_str_hash, g_str_equal);	join_requests   = g_hash_table_new(g_str_hash, g_str_equal);	confirmed_nodes = g_hash_table_new(g_str_hash, g_str_equal);}voidjoin_send_offer(gpointer key, gpointer value, gpointer user_data){	const char *join_to = NULL;	const oc_node_t *member = (const oc_node_t*)value;	crm_devel("Sending %s offer", CRM_OP_WELCOME);	if(member != NULL) {		join_to = member->node_uname;	}	if(join_to == NULL) {		crm_err("No recipient for welcome message");	} else {		HA_Message *offer = create_request(			CRM_OP_WELCOME, NULL, join_to,			CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL);		/* send the welcome */		crm_devel("Sending %s to %s", CRM_OP_WELCOME, join_to);		send_msg_via_ha(fsa_cluster_conn, offer);		g_hash_table_insert(join_offers, crm_strdup(join_to),				    crm_strdup(CRMD_JOINSTATE_PENDING));	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -