⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_cluster.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 4 页
字号:
		g_list_foreach(mqg->notify_list, group_mem_dispatch, &buf);	}	return SA_OK;}intprocess_mqgroup_remove(struct ha_msg *msg){	const char *gname, *name;	mqueue_t *mqg, *mq;	notify_buffer_t buf;	if ((gname = ha_msg_value(msg, F_MQGROUPNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq group name request");		return FALSE;	}	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	/*	 * Check carefully again here in case there are mess	 * message in cluster.	 */	if ((mqg = mqname_lookup(gname, NULL)) == NULL) {		cl_log(LOG_ERR		,	"group name [%s] doesn't exist in local database!"		,	gname);		return FALSE;	}	if (!IS_MQGROUP(mqg)) {		cl_log(LOG_ERR, "[%s] is a mq name instead of a mq group name"		,	gname);		return FALSE;	}	if ((mq = mqname_lookup(name, NULL)) == NULL) {		cl_log(LOG_ERR		,	"mq name [%s] doesn't exist in local database!"		,	name);		return FALSE;	}		/*	 * mqueue may already be removed from the group, i.e	 * this node is the master name node	 */	if (g_list_find(mqg->list, mq) != NULL)		mqg->list = g_list_remove(mqg->list, mq);	/*	 * remove the mqgroup from the mqueue list as well	 */	if (g_list_find(mq->list, mqg) != NULL)		mq->list = g_list_remove(mq->list, mqg);	/*	 * Notify my clients who care about the group	 * membership change message.	 */	if (mqg->notify_list != NULL) {		strcpy(buf.changeonly_buff.member.queueName.value, name);		buf.changeonly_buff.member.queueName.length = strlen(name) + 1;		buf.changeonly_buff.member.queueStatus = mq->status;		buf.changeonly_buff.change = SA_MSG_QUEUE_GROUP_REMOVED;		buf.policy = mqg->policy;		buf.number = 0;		strcpy(buf.name.value, gname);		buf.name.length = strlen(gname) + 1;			g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf);		g_list_foreach(mqg->notify_list, group_mem_dispatch, &buf);	}#if DEBUG_CLUSTER	cl_log_message(msg);#endif	return SA_OK;}intprocess_mqname_ack(struct ha_msg *msg){	const char * qname, * error, * request;	const SaInvocationT * invocation;	size_t invocation_len, seq_len;	const unsigned long * seq; 	gpointer orig_seq, client;	mqueue_request_t ack;	gboolean found;	if ((request = ha_msg_value(msg, F_MQREQUEST)) == NULL	     || (error = ha_msg_value(msg, F_MQERROR)) == NULL	     || (seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL	     || (invocation = cl_get_binary(msg, F_MQINVOCATION, 			     &invocation_len)) == NULL	     ) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	qname = ha_msg_value(msg, F_MQNAME);	found = g_hash_table_lookup_extended(mq_ack_pending_hash, 			seq, &orig_seq, &client);	if (found) {		/*		 * we have clients waiting for acks, send out reply		 */		dprintf("%s: found client <%p>\n", __FUNCTION__, client);		if (qname) {			ack.qname = ha_strdup(qname);		}		ack.request_type = cmsrequest_string2type(request);		ack.invocation = *invocation;		/* we don't ack for sendreceive here because we are		   waiting for the CMS_MSG_RECEIVE */		if (ack.request_type != CMS_MSG_SEND_RECEIVE) {			client_send_ack_msg((IPC_Channel *) client, 				&ack, -1, saerror_string2type(error));		}		ha_free(ack.qname);		g_hash_table_remove(mq_open_pending_hash, seq); 		ha_free((unsigned long *) orig_seq);	} else {		cl_log(LOG_ERR, "client is not found. "			"nobody to send the ack to. mqname = %s, seq = %ld", qname, *seq);	}	return TRUE;}intprocess_mqname_update(struct ha_msg *msg, cms_data_t * cmsdata){	const struct mq_info * info;	size_t info_len;	if ((info = cl_get_binary(msg, F_MQUPDATE, &info_len)) == NULL) {		cl_log(LOG_INFO, "received NULL mq info update");		// cmsdata->cms_ready = 1;	}	/*	 * turn on the ready bit and start accepting client request	 */	if (mqueue_table_unpack(info, info_len) == HA_OK) {		cmsdata->cms_ready = 1;			}	return HA_OK;}intrequest_mqueue_status(mqueue_t * mqueue, cms_data_t * cmsdata){	struct ha_msg * msg;	ll_cluster_t * hb;	dprintf("in request status, used is [%d]\n"	,	mqueue->status.saMsgQueueUsage[3].queueUsed);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE,		mqname_type2string(MQNAME_TYPE_STATUS_REQUEST)) == HA_FAIL	||	ha_msg_add(msg, F_MQNAME, mqueue->name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	}	hb = cmsdata->hb_handle;	hb->llc_ops->sendnodemsg(hb, msg, mqueue->host);	ha_msg_del(msg);	return TRUE;}intreply_mqueue_status(struct ha_msg *msg, cms_data_t * cmsdata){	char usedstring[PACKSTRSIZE], numstring[PACKSTRSIZE];	const char *name, *host;	mqueue_t * mq;	struct ha_msg * m;	ll_cluster_t * hb;	const char * expire = "FALSE";	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__);		return FALSE;	}	if ((mq = mqname_lookup(name, NULL)) == NULL) {		cl_log(LOG_INFO, "%s: cannot find mqname [%s], return."		,	__FUNCTION__, name);		return FALSE;	}	if ((host = ha_msg_value(msg, F_ORIG)) == NULL) {		cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__);		return FALSE;	}	if (RETENTION_TIME_EXPIRES(mq)) {		cl_log(LOG_WARNING, "%s: retention time expires [%s]"		,	__FUNCTION__, name);		expire = "TRUE";		request_mqname_unlink(name, cmsdata);	}	sa_mqueue_usage_encode(NULL, usedstring, numstring	,	mq->status.saMsgQueueUsage);	dprintf("queueUsed [%s], numberOfMessages [%s]\n"	,	usedstring, numstring);	/*	 * send reply to the request node	 */	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, mqname_type2string(MQNAME_TYPE_STATUS_REPLY))			== HA_FAIL	||	ha_msg_add(m, F_MQNAME, name) == HA_FAIL	||	ha_msg_add(m, F_MQEXPIRE, expire) == HA_FAIL	||	ha_msg_add(m, F_MQUSED, usedstring) == HA_FAIL	||	ha_msg_add(m, F_MQMSGNUM, numstring) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	}	hb = cmsdata->hb_handle;	hb->llc_ops->sendnodemsg(hb, m, host);	ha_msg_del(m);	return TRUE;}intprocess_mqueue_status(struct ha_msg *msg){	const char *name, *expire = NULL, *usedstring = NULL, *numstring = NULL;	SaErrorT ret = SA_OK;	IPC_Channel * client;	mqueue_t * mq;	gpointer orig_key, value;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL	||	(expire = ha_msg_value(msg, F_MQEXPIRE)) == NULL	||	(usedstring = ha_msg_value(msg, F_MQUSED)) == NULL	||	(numstring = ha_msg_value(msg, F_MQMSGNUM)) == NULL) {		cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__);		ret = SA_ERR_LIBRARY;	}	if (!strncmp(expire, "TRUE", 4))		ret = SA_ERR_NOT_EXIST;	/* TODO: currently there can be only one client for a mqueue	 * name in the hash table. But there might be more clients	 * query the mqueue status at the same time. Need to make a	 * GList for each mqueue name.	 */	if ((g_hash_table_lookup_extended(mq_status_pending_hash,			name, &orig_key, &value)) == FALSE) {		cl_log(LOG_ALERT, "%s: cannot find mqname [%s]"		,	__FUNCTION__, name);		return FALSE;	}	g_hash_table_remove(mq_status_pending_hash, name);	client = value;	/*	 * update local mqueue hash table	 */	if ((mq = mqueue_table_lookup(name, NULL))) {		sa_mqueue_usage_decode(NULL, usedstring, numstring		,	mq->status.saMsgQueueUsage);	}	/*	 * response to my client	 */	dprintf("%s: before respond to my client, ret = [%d]\n"	,	__FUNCTION__, ret);	dprintf("queue status = %d\n", (mq->status).sendingState);	if (ret != SA_OK)		client_send_error_msg(client, name, CMS_QUEUE_STATUS, ret);	else		ret = client_send_qstatus(client, mq, SA_OK);	ha_free(orig_key);	return ret;}intrequest_mqinfo_update(cms_data_t * cmsdata){	int ret;	ll_cluster_t *hb;	const char * type;	struct ha_msg *msg;	type = mqname_type2string(MQNAME_TYPE_UPDATE_REQUEST);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ret = FALSE;	} else {		hb = cmsdata->hb_handle;		hb->llc_ops->sendclustermsg(hb, msg);		ret = TRUE;	}	ha_msg_del(msg);	return ret;}intprocess_mqinfo_update_request(struct ha_msg *msg, cms_data_t * cmsdata){	const char * host;	const char * node;	if ((node = ha_msg_value(msg, F_ORIG)) == NULL) {		cl_log(LOG_ERR, "%s: cannot find node name", __FUNCTION__);		return FALSE;	}	/* we ourselves just joined, no update needed. */	if (g_list_length(mqmember_list) <= 1) {		return HA_OK;	}	/*	 * always the first node in the list should send out the mq 	 * update.  in case that the new node is the first node, 	 * choose the second node to send out the mq update	 */	host = g_list_nth_data(mqmember_list, 0);	if (strcmp(host, node) == 0) {		host = g_list_nth_data(mqmember_list, 1);	}	/* are we the one that should send out the mq update? */	if (strcmp(host, cmsdata->my_nodeid) != 0) {		return HA_OK;	}	cl_log(LOG_INFO, "%s: host is %s", __FUNCTION__, host);	return reply_mqinfo_update(node, cmsdata);}intreply_mqinfo_update(const char * node, cms_data_t * cmsdata){	struct mq_info * mqinfo;	size_t mqinfo_len;	const char * type;	struct ha_msg *msg;	ll_cluster_t *hb;	if (mqueue_table_pack(&mqinfo, &mqinfo_len) != HA_OK) {		return HA_FAIL;	}	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return HA_FAIL;	}	type = mqname_type2string(MQNAME_TYPE_UPDATE);		if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL	||	ha_msg_addbin(msg, F_MQUPDATE, mqinfo, mqinfo_len) == HA_FAIL)	{		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return HA_FAIL;	} 	hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(msg);#endif	hb->llc_ops->sendnodemsg(hb, msg, node);	ha_free(mqinfo);	ha_msg_del(msg);	return HA_OK;}intprocess_mqsend_reply(struct ha_msg * msg, cms_data_t * cmsdata){	SaErrorT ret = SA_OK;	const char * request_type, * ack_type;	const void * data; 	const int * ack, * invocation, * msg_pri;	const unsigned long * seq, * reply_seq;	const SaSizeT * msg_type, * msg_ver, * msg_size, * sendreceive;	const char *node;	size_t data_len, ack_len, invocation_len, seq_len, sendreceive_len, reply_seq_len;	size_t type_len, ver_len, pri_len, size_len;	client_message_t * m;	gboolean found;	gpointer orig_seq, client;	ll_cluster_t *hb;	struct ha_msg * ack_msg;	if ((request_type = ha_msg_value(msg, F_MQREQUEST)) == NULL	||	(msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len)) 			== NULL	||	(msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) 			== NULL	||	(msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len))			== NULL	||	(msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len))			== NULL	||	(data = cl_get_binary(msg, F_MQMSGDATA, &data_len)) == NULL	||	(invocation = cl_get_binary(msg, F_MQINVOCATION, 			&invocation_len)) == NULL	||	(sendreceive = cl_get_binary(msg, F_SENDRECEIVE, 			&sendreceive_len)) == NULL	||	(reply_seq = cl_get_binary(msg, F_MQMSGREPLYSEQ, 			&reply_seq_len)) == NULL	||	(seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL	||	(ack = cl_get_binary(msg, F_MQMSGACK, &ack_len)) == NULL	||	(node = ha_msg_value(msg, F_ORIG)) == NULL	||	*msg_size != data_len ) {		cl_log(LOG_ERR, "received bad mqname_send request.");		return FALSE;	}	found = g_hash_table_lookup_extended(mq_ack_pending_hash, 			seq, &orig_seq, &client);	if (found) {		/*		 * we have clients waiting for reply, send it out 		 */		dprintf("%s: found client <%p>\n", __FUNCTION__, client);		m = (client_message_t *) ha_malloc(sizeof(client_message_t) + data_len);		m->header.type = CMS_MSG_RECEIVE;		m->header.len = sizeof(client_message_t) + data_len;		m->header.flag = SA_OK;		m->header.name.length = 0;		m->handle = 0;		m->msg.type = *msg_type;		m->msg.version = *msg_ver;		m->msg.size = *msg_size;		m->msg.priority = * ((const SaUint8T *) msg_pri);		m->msg.data = m + 1;		m->invocation = 0;		m->ack = 0;		m->senderId = 0;		m->data = m + 1;		memcpy(m->data, data, data_len);		ret = client_send_msg((IPC_Channel *) client, sizeof(client_message_t) + data_len, m);		g_hash_table_remove(mq_open_pending_hash, seq); 		ha_free((unsigned long *) orig_seq);	} else {		cl_log(LOG_ERR, "client is not found. "			"nobody to send the reply msg to. ");	}	if (ret == SA_OK && ack) {		ack_type = mqname_type2string(MQNAME_TYPE_ACK);		if ((ack_msg = ha_msg_new(0)) == NULL) {			cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);			return FALSE;		}		if (ha_msg_add(ack_msg, F_TYPE, ack_type) == HA_FAIL		||  ha_msg_add(ack_msg, F_MQREQUEST, request_type) == HA_FAIL		|| ha_msg_addbin(ack_msg, F_MQINVOCATION, invocation, sizeof(int)) == HA_FAIL		|| ha_msg_addbin(ack_msg, F_MQMSGSEQ, reply_seq, sizeof(unsigned long)) == HA_FAIL		|| ha_msg_add(ack_msg, F_MQERROR, saerror_type2string(ret)) == HA_FAIL) {			cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);			return FALSE;		}		hb = cmsdata->hb_handle;		#if DEBUG_CLUSTER		cl_log_message(ack_msg);#endif		hb->llc_ops->sendnodemsg(hb, ack_msg, node);		ha_msg_del(ack_msg);		dprintf("send the ack, ret = %d\n", ret);	}	return TRUE;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -