⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_cluster.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
		client_send_client_qopen(client, &reply, -1, flag);		ha_free(reply.qname);		g_hash_table_remove(mq_open_pending_hash, name);		ha_free(mq_pending->name);		ha_free(mq_pending);	}	return TRUE;}static voidgroup_mem_dispatch(gpointer data, gpointer user_data){	IPC_Message msg;	client_mqgroup_notify_t * cmg = NULL;	int size;	client_mqgroup_track_t * track = (client_mqgroup_track_t *)data;	notify_buffer_t * notify = (notify_buffer_t *)user_data;	cmg = (client_mqgroup_notify_t *)			ha_malloc(sizeof(client_mqgroup_notify_t));	if (cmg == NULL) {		cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);		return;	}	cmg->header.type = CMS_QUEUEGROUP_NOTIFY;	cmg->header.name = notify->name;	cmg->policy = notify->policy;	cmg->group_name = notify->name;	switch (track->flag) {	case SA_TRACK_CHANGES:		size = notify->number * sizeof(SaMsgQueueGroupNotificationT);		msg.msg_len = sizeof(client_mqgroup_notify_t) + size;		cmg->number = notify->number;		cmg = realloc(cmg, msg.msg_len);		cmg->data = (char *)cmg + sizeof(client_mqgroup_notify_t);		memcpy(cmg->data, notify->change_buff, size);		break;	case SA_TRACK_CHANGES_ONLY:		size = sizeof(SaMsgQueueGroupNotificationT);		msg.msg_len = sizeof(client_mqgroup_notify_t) + size;		cmg->number = 1;		cmg = realloc(cmg, msg.msg_len);		cmg->data = (char *)cmg + sizeof(client_mqgroup_notify_t);		*(SaMsgQueueGroupNotificationT *)(cmg->data)				= notify->changeonly_buff;		break;	default:		cl_log(LOG_ERR, "Unknown track flag [%d]", track->flag);		ha_free(cmg);		return;	}	msg.msg_body = cmg;	msg.msg_private = &msg;	msg.msg_done = NULL;	/* TODO: msg.msg_done to free memory here */	dprintf("%s: Send Track information to my clients...\n", __FUNCTION__);	track->ch->ops->send(track->ch, &msg);}intprocess_mqgroup_insert(struct ha_msg *msg){	const char *gname, *name;	mqueue_t *mqg, *mq;	notify_buffer_t buf;	if ((gname = ha_msg_value(msg, F_MQGROUPNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq group name request");		return FALSE;	}	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	/*	 * Check carefully again here in case there are mess	 * message in cluster.	 */	if ((mqg = mqname_lookup(gname, NULL)) == NULL) {		cl_log(LOG_ERR		,	"group name [%s] doesn't exist in local database!"		,	gname);		return FALSE;	}	if (mqg->policy == 0) {		cl_log(LOG_ERR, "[%s] is a mq group name instead of a mq name"		,	gname);		return FALSE;	}	if ((mq = mqname_lookup(name, NULL)) == NULL) {		cl_log(LOG_ERR		,	"mq name [%s] doesn't exist in local database!"		,	name);		return FALSE;	}		/*	 * The mqueue may already in the group, i.e this	 * node is the master name node.	 */	if (g_list_find(mqg->list, mq) == NULL) {		mqg->list = g_list_append(mqg->list, mq);		cl_log(LOG_INFO, "Adding mq <%p> to [%s] list", mq, gname);	}	/*	 * Update the mqueue list to point to append the group.	 */	if (g_list_find(mq->list, mqg) == NULL) {		mq->list = g_list_append(mq->list, mqg);		cl_log(LOG_INFO, "Adding mqg <%p> to [%s] list", mqg, name);	}	/*	 * Current Round Robin counter set to the first list,	 * we may want to set it as a random index to gain	 * more load balance.	 */	if (mqg->current == NULL)		mqg->current = g_list_first(mqg->list);	/*	 * Notify my clients who care about the group	 * membership change message.	 */	if (mqg->notify_list != NULL) {		strcpy(buf.changeonly_buff.member.queueName.value, name);		buf.changeonly_buff.member.queueName.length = strlen(name) + 1;		buf.changeonly_buff.member.queueStatus = mq->status;		buf.changeonly_buff.change = SA_MSG_QUEUE_GROUP_ADDED;		buf.policy = mqg->policy;		buf.number = 0;		strcpy(buf.name.value, gname);		buf.name.length = strlen(gname) + 1;		buf.change_buff = NULL;		g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf);		g_list_foreach(mqg->notify_list, group_mem_dispatch, &buf);	}	return SA_OK;}intprocess_mqgroup_remove(struct ha_msg *msg){	const char *gname, *name;	mqueue_t *mqg, *mq;	notify_buffer_t buf;	if ((gname = ha_msg_value(msg, F_MQGROUPNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq group name request");		return FALSE;	}	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	/*	 * Check carefully again here in case there are mess	 * message in cluster.	 */	if ((mqg = mqname_lookup(gname, NULL)) == NULL) {		cl_log(LOG_ERR		,	"group name [%s] doesn't exist in local database!"		,	gname);		return FALSE;	}	if (!IS_MQGROUP(mqg)) {		cl_log(LOG_ERR, "[%s] is a mq name instead of a mq group name"		,	gname);		return FALSE;	}	if ((mq = mqname_lookup(name, NULL)) == NULL) {		cl_log(LOG_ERR		,	"mq name [%s] doesn't exist in local database!"		,	name);		return FALSE;	}		/*	 * mqueue may already be removed from the group, i.e	 * this node is the master name node	 */	if (g_list_find(mqg->list, mq) != NULL)		mqg->list = g_list_remove(mqg->list, mq);	/*	 * remove the mqgroup from the mqueue list as well	 */	if (g_list_find(mq->list, mqg) != NULL)		mq->list = g_list_remove(mq->list, mqg);	/*	 * Notify my clients who care about the group	 * membership change message.	 */	if (mqg->notify_list != NULL) {		strcpy(buf.changeonly_buff.member.queueName.value, name);		buf.changeonly_buff.member.queueName.length = strlen(name) + 1;		buf.changeonly_buff.member.queueStatus = mq->status;		buf.changeonly_buff.change = SA_MSG_QUEUE_GROUP_REMOVED;		buf.policy = mqg->policy;		buf.number = 0;		strcpy(buf.name.value, gname);		buf.name.length = strlen(gname) + 1;			g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf);		g_list_foreach(mqg->notify_list, group_mem_dispatch, &buf);	}#if DEBUG_CLUSTER	cl_log_message(msg);#endif	return SA_OK;}intprocess_mqname_ack(struct ha_msg *msg){	const char * qname, * error, * request;	const SaInvocationT * invocation;	size_t invocation_len, seq_len;	const unsigned long * seq; 	gpointer orig_seq, client;	mqueue_request_t ack;	gboolean found;	if ((qname = ha_msg_value(msg, F_MQNAME)) == NULL	     || (request = ha_msg_value(msg, F_MQREQUEST)) == NULL	     || (error = ha_msg_value(msg, F_MQERROR)) == NULL	     || (seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL	     || (invocation = cl_get_binary(msg, F_MQINVOCATION, 			     &invocation_len)) == NULL	     ) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	found = g_hash_table_lookup_extended(mq_ack_pending_hash, 			seq, &orig_seq, &client);	if (found) {		/*		 * we have clients waiting for acks, send out reply		 */		dprintf("%s: found client <%p>\n", __FUNCTION__, client);		ack.qname = ha_strdup(qname);		ack.request_type = cmsrequest_string2type(request);		ack.invocation = *invocation;		client_send_ack_msg((IPC_Channel *) client, 				&ack, -1, saerror_string2type(error));		ha_free(ack.qname);		g_hash_table_remove(mq_open_pending_hash, seq); 		ha_free((unsigned long *) orig_seq);		ha_free((IPC_Channel *) client);	} else {		cl_log(LOG_ERR, "client is not found. "			"nobody to send the ack to. mqname = %s", qname);	}	return TRUE;}intprocess_mqname_update(struct ha_msg *msg, cms_data_t * cmsdata){	const struct mq_info * info;	size_t info_len;	if ((info = cl_get_binary(msg, F_MQUPDATE, &info_len)) == NULL) {		cl_log(LOG_ERR, "received NULL mq info update");		return HA_FAIL;	}	/*	 * turn on the ready bit and start accepting client request	 */	if (mqueue_table_unpack(info, info_len) == HA_OK) {		cmsdata->cms_ready = 1;			}	return HA_OK;}intrequest_mqueue_status(mqueue_t * mqueue, cms_data_t * cmsdata){	struct ha_msg * msg;	ll_cluster_t * hb;	dprintf("in request status, used is [%d]\n"	,	mqueue->status.saMsgQueueUsage[3].queueUsed);	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(msg, F_TYPE,		mqname_type2string(MQNAME_TYPE_STATUS_REQUEST)) == HA_FAIL	||	ha_msg_add(msg, F_MQNAME, mqueue->name) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	}	hb = cmsdata->hb_handle;	hb->llc_ops->sendnodemsg(hb, msg, mqueue->host);	ha_msg_del(msg);	return TRUE;}intreply_mqueue_status(struct ha_msg *msg, cms_data_t * cmsdata){	char usedstring[PACKSTRSIZE], numstring[PACKSTRSIZE];	const char * name;	mqueue_t * mq;	struct ha_msg * m;	ll_cluster_t * hb;	const char * expire = "FALSE";	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__);		return FALSE;	}	if ((mq = mqname_lookup(name, NULL)) == NULL) {		cl_log(LOG_ERR, "%s: cannot find mqname [%s]"		,	__FUNCTION__, name);		return FALSE;	}	if (RETENTION_TIME_EXPIRES(mq)) {		cl_log(LOG_WARNING, "%s: retention time expires [%s]"		,	__FUNCTION__, name);		expire = "TRUE";		request_mqname_unlink(name, cmsdata);	}	sa_mqueue_usage_encode(NULL, usedstring, numstring	,	mq->status.saMsgQueueUsage);	dprintf("queueUsed [%s], numberOfMessages [%s]\n"	,	usedstring, numstring);	/*	 * send reply to the request node	 */	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, mqname_type2string(MQNAME_TYPE_STATUS_REPLY))			== HA_FAIL	||	ha_msg_add(m, F_MQNAME, name) == HA_FAIL	||	ha_msg_add(m, F_MQEXPIRE, expire) == HA_FAIL	||	ha_msg_add(m, F_MQUSED, usedstring) == HA_FAIL	||	ha_msg_add(m, F_MQMSGNUM, numstring) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	}	hb = cmsdata->hb_handle;	hb->llc_ops->sendnodemsg(hb, m, mq->host);	ha_msg_del(m);	return TRUE;}intprocess_mqueue_status(struct ha_msg *msg){	const char *name, *expire = NULL, *usedstring = NULL, *numstring = NULL;	SaErrorT ret = SA_OK;	IPC_Channel * client;	mqueue_t * mq;	gpointer orig_key, value;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL	||	(expire = ha_msg_value(msg, F_MQEXPIRE)) == NULL	||	(usedstring = ha_msg_value(msg, F_MQUSED)) == NULL	||	(numstring = ha_msg_value(msg, F_MQMSGNUM)) == NULL) {		cl_log(LOG_ERR, "%s: ha_msg_value failed", __FUNCTION__);		ret = SA_ERR_LIBRARY;	}	if (!strncmp(expire, "TRUE", 4))		ret = SA_ERR_NOT_EXIST;	/* TODO: currently there can be only one client for a mqueue	 * name in the hash table. But there might be more clients	 * query the mqueue status at the same time. Need to make a	 * GList for each mqueue name.	 */	if ((g_hash_table_lookup_extended(mq_status_pending_hash,			name, &orig_key, &value)) == FALSE) {		cl_log(LOG_ALERT, "%s: cannot find mqname [%s]"		,	__FUNCTION__, name);		return FALSE;	}	g_hash_table_remove(mq_status_pending_hash, name);	client = value;	/*	 * update local mqueue hash table	 */	if ((mq = mqueue_table_lookup(name, NULL))) {		sa_mqueue_usage_decode(NULL, usedstring, numstring		,	mq->status.saMsgQueueUsage);	}	/*	 * response to my client	 */	dprintf("%s: before respond to my client, ret = [%d]\n"	,	__FUNCTION__, ret);	if (ret != SA_OK)		client_send_error_msg(client, name, CMS_QUEUE_STATUS, ret);	else		ret = client_send_qstatus(client, mq, SA_OK);	ha_free(orig_key);	return ret;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -