⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_cluster.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 4 页
字号:
static intsend_undelivered_message(ll_cluster_t *hb, mqueue_t *mq, const char *node){	message_t * message;	struct ha_msg *m;	const char *type = mqname_type2string(MQNAME_TYPE_REOPEN_MSGFEED);	int invalid = FALSE;	const char * request_type;	char size_string[PACKSTRSIZE]; 	sa_mqueue_usage_encode(size_string, NULL, NULL	,	mq->status.saMsgQueueUsage);	dprintf("Timer: current - close %s retention\n"	,	get_current_satime() - mq->status.closeTime	<	mq->status.retentionTime ? "<" : ">=");	if (RETENTION_TIME_EXPIRES(mq)) {		/* retention timer expires */		cl_log(LOG_INFO, "Original node: open a expired queue [%s]"		,	mq->name);		invalid = TRUE;		goto end;	}	while ((message = dequeue_message(mq))) {	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(m, F_MQNAME, mq->name) == HA_FAIL	||	ha_msg_addbin(m, F_SENDRECEIVE, (char *) &(message->msgInfo.senderId),			sizeof(int)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGTYPE, (char *) &message->msg.type, 			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGVER, (char *) &message->msg.version,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSIZE, (char *) &message->msg.size,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGPRI, (char *) &message->msg.priority,			sizeof(SaUint8T)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGDATA, (char *) message->msg.data, 			message->msg.size) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ha_msg_del(m);		return FALSE;	} 	hb->llc_ops->sendnodemsg(hb, m, node);	dprintf("Send 1 msgfeed %d size msg to %s\n"	,	message->msg.size, node);	ha_msg_del(m);	}end:	/*	 * send the MSGFEED_END message	 */	type = mqname_type2string(MQNAME_TYPE_MSGFEED_END);	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (invalid == TRUE)		request_type = "invalid";	else		request_type = "valid";	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL	||	ha_msg_add(m, F_MQNAME, mq->name) == HA_FAIL	||      ha_msg_add(m, F_MQREQUEST, request_type) == HA_FAIL	||      ha_msg_addbin(m, F_MQPOLICY, &mq->policy, sizeof(int))			== HA_FAIL	||      ha_msg_addbin(m, F_MQCREATEFLAG, &mq->status.creationFlags,			sizeof(SaMsgQueueCreationFlagsT)) == HA_FAIL	||      ha_msg_addbin(m, F_MQOPENFLAG, &mq->status.openFlags,			sizeof(SaMsgQueueOpenFlagsT)) == HA_FAIL	||      ha_msg_addbin(m, F_MQRETENTION, &mq->status.retentionTime,			sizeof(SaTimeT)) == HA_FAIL	||      ha_msg_add(m, F_MQSIZE, size_string) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ha_msg_del(m);		return FALSE;	}	hb->llc_ops->sendnodemsg(hb, m, node);	dprintf("Send msgfeed_end msg to %s\n", node);	ha_msg_del(m);	return TRUE;}intprocess_mqname_reopen(struct ha_msg *msg, enum mqname_type type,		      cms_data_t * cmsdata){	static struct ha_msg * saved_msg = NULL;	struct ha_msg * new_msg;	const char *name, *node;	ll_cluster_t *hb = cmsdata->hb_handle;	mqueue_t * mq;	message_t * message;	const SaSizeT * msg_type, * msg_ver, * msg_size, * msg_pri, * data, * sendreceive;	size_t type_len, ver_len, pri_len, size_len, data_len, sendreceive_len;	const char *valid;	const char *size_string = NULL;	const int *s_invocation, *policy;	size_t s_invocation_size, cflag_size, oflag_size, retention_size;	const SaTimeT * retention = NULL;	char *s_request, *s_error;	const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL;	SaMsgQueueSendingStateT sending_state = SA_MSG_QUEUE_AVAILABLE;	dprintf("%s: type is %d\n", __FUNCTION__, type);	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "%s: ha_msg_value failed",__FUNCTION__);		return FALSE;	}	switch (type) {	case MQNAME_TYPE_REOPEN:		if ((node = ha_msg_value(msg, F_MQHOST)) == NULL) {			cl_log(LOG_ERR, "%s: ha_msg_value failed",__FUNCTION__);			return FALSE;		}		if (((mq = mqname_lookup(name, NULL)) != NULL)		&&	mq->mqstat == MQ_STATUS_CLOSE) {			/* Do I have the original mq? */ 			cl_log(LOG_INFO, "Original mq host is %s", mq->host);			if (is_host_local(mq->host, cmsdata)) {				/* Send undelivered message to new mq */				send_undelivered_message(hb, mq, node);			}		}		dprintf("mq->mqstat = <%d>\n", mq->mqstat);		if (!is_host_local(node, cmsdata))			return TRUE;		saved_msg = ha_msg_copy(msg);		cl_log(LOG_INFO, "%s: waiting for msgfeed...", __FUNCTION__);		break;	case MQNAME_TYPE_REOPEN_MSGFEED:		if ((sendreceive = cl_get_binary(msg, F_SENDRECEIVE, &sendreceive_len)) 				== NULL		||			(msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len))				== NULL		||	(msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) 				== NULL		||	(msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len))				== NULL		||	(msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len))				== NULL		||	(data = cl_get_binary(msg, F_MQMSGDATA, &data_len))				== NULL) {			cl_log(LOG_ERR, "received bad msgfeed msg.");			return FALSE;		}		if ((mq = mqname_lookup(name, NULL)) == NULL		&&	mq->mqstat != MQ_STATUS_CLOSE) {			cl_log(LOG_ALERT, "State machine BUG");			return FALSE;		}		message = (message_t *)			ha_malloc(sizeof(SaMsgMessageT) + data_len);		if (!message) {			cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);			return FALSE;		}		memset(message, 0, sizeof(message_t));		message->msgInfo.senderId = *sendreceive;		message->msg.type = *msg_type;		message->msg.version = *msg_ver;		message->msg.size = *msg_size;		message->msg.priority = *(const SaUint8T *) msg_pri;		message->msg.data = (char *)message + sizeof(message_t);		memcpy(message->msg.data, data, data_len);		enqueue_message(mq, message->msg.priority, message);		break;	case MQNAME_TYPE_MSGFEED_END:		if ((mq = mqname_lookup(name, NULL)) == NULL		&&	mq->mqstat != MQ_STATUS_CLOSE) {			cl_log(LOG_ALERT, "State machine BUG");			return FALSE;		}		mq->mqstat = MQ_STATUS_OPEN;		dprintf("in feedend, used is [%d]\n"		,	mq->status.saMsgQueueUsage[3].queueUsed);		/*		 * read saved_msg		 */		if (!saved_msg) {			/* somehow we received another msg_feedend			   out of order */			return FALSE;		}		if ((s_request = ha_strdup(ha_msg_value(saved_msg, F_MQREQUEST))) == NULL		||	(s_invocation = cl_get_binary(saved_msg, F_MQINVOCATION,				&s_invocation_size)) == NULL		||	(s_error = ha_strdup(ha_msg_value(saved_msg,F_MQERROR)))				== NULL) {			cl_log(LOG_ERR, "%s: ha_msg_value error", __FUNCTION__);			return FALSE;		}		/*		 * If mq retention time not expired, we will receive		 * a valid mq status information, so that we need to		 * use this original mq status.		 */		if ((valid = ha_msg_value(msg, F_MQREQUEST)) == NULL) {			cl_log(LOG_ERR, "%s: cannot read invalid bit"			,	__FUNCTION__);			return FALSE;		}		if (strncmp(valid, "invalid", 7) == 0)			goto invalid;		/*		 * read msgfeedend		 */		if ((policy = cl_get_binary(msg, F_MQPOLICY, NULL))			== NULL		||	(cflag = cl_get_binary(msg, F_MQCREATEFLAG			,	&cflag_size)) == NULL		||	(oflag = cl_get_binary(msg, F_MQOPENFLAG			,	&oflag_size)) == NULL		||	(retention = cl_get_binary(msg, F_MQRETENTION			,	&retention_size)) == NULL		||	(size_string = ha_msg_value(msg, F_MQSIZE)) == NULL) {			cl_log(LOG_ERR, "%s: cl_get_binary failed"			,	__FUNCTION__);		}		/*		 * create a new msg according to msgfeedend		 */		if ((new_msg = ha_msg_new(0)) == NULL) {			cl_log(LOG_ERR, "%s: no memory", __FUNCTION__);			return FALSE;		}		if (ha_msg_add(new_msg, F_TYPE		,	mqname_type2string(MQNAME_TYPE_GRANTED)) == HA_FAIL		||	ha_msg_add(new_msg, F_MQNAME, name) == HA_FAIL		||	ha_msg_add(new_msg, F_MQREQUEST, s_request)				== HA_FAIL		||	ha_msg_addbin(new_msg, F_MQINVOCATION, s_invocation,				s_invocation_size) == HA_FAIL		||	ha_msg_add(new_msg, F_MQHOST, mq->host) == HA_FAIL		||	ha_msg_addbin(new_msg, F_MQSTATUS, &sending_state,				sizeof(SaMsgQueueSendingStateT)) == HA_FAIL		||	ha_msg_addbin(new_msg, F_MQPOLICY, policy, sizeof(int))				== HA_FAIL		||	ha_msg_addbin(new_msg, F_MQCREATEFLAG, cflag,				cflag_size) == HA_FAIL		||	ha_msg_addbin(new_msg, F_MQOPENFLAG, oflag, oflag_size)				== HA_FAIL		||	ha_msg_addbin(new_msg, F_MQRETENTION, retention,				retention_size) == HA_FAIL		||	ha_msg_add(new_msg, F_MQSIZE, size_string) == HA_FAIL		||	ha_msg_add(new_msg, F_MQERROR, s_error) == HA_FAIL) {			cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);			ha_msg_del(new_msg);			return FALSE;		}		ha_msg_del(saved_msg);		saved_msg = new_msg;		goto sendmsg;invalid:		/*		 * If the open request is not set SA_MSG_QUEUE_CREATE,		 * deny this request, since retention timer expired.		 */		mq = g_hash_table_lookup(mq_open_pending_hash, name);		if (!mq) {			cl_log(LOG_ERR, "BUG: cannot find mq in pending hash");			return TRUE;		}		if (!(mq->status.creationFlags & SA_MSG_QUEUE_CREATE)) {			mqueue_request_t reply;			cl_log(LOG_INFO, "retention timer expired and "				"SA_MSG_QUEUE_CREATE is not set, reject!");			reply.qname = ha_strdup(name);			reply.gname = NULL;			reply.request_type = cmsrequest_string2type(s_request);			reply.invocation = *s_invocation;			client_send_client_qopen(mq->client, &reply, -1			,	SA_ERR_NOT_EXIST);			g_hash_table_remove(mq_open_pending_hash, name);			/*			 * We get the change to unlink the mqueue here.			 */			request_mqname_unlink(name, cmsdata);			ha_free(mq);			ha_free(reply.qname);			return TRUE;		}sendmsg:		/*		 * Send granted message to all.		 */		ha_msg_mod(saved_msg, F_TYPE		,	mqname_type2string(MQNAME_TYPE_GRANTED));		hb->llc_ops->sendclustermsg(hb, saved_msg);		ha_msg_del(saved_msg);		saved_msg = NULL;		return TRUE;	default:		break;	}	return TRUE;}/** * process_mqname_denied - process the denied message from the master node *			   for this message queue name * @msg: received message from heartbeat IPC Channel */intprocess_mqname_denied(struct ha_msg *msg){	const char * name, * error, * request;	const int * invocation;	size_t invocation_size;	IPC_Channel *client = NULL;	mqueue_request_t reply;	mqueue_t * mq_pending;	int flag;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL ||		(error = ha_msg_value(msg, F_MQERROR)) == NULL ||		(request = ha_msg_value(msg, F_MQREQUEST)) == NULL ||		(invocation = cl_get_binary(msg, F_MQINVOCATION,				&invocation_size)) == NULL ) {		cl_log(LOG_ERR, "received NULL mq name or mq error reply");		return FALSE;	}	flag = saerror_string2type(error);	mq_pending = g_hash_table_lookup(mq_open_pending_hash, name);	if (mq_pending != NULL) {		/*		 * we have clients open pending, send out reply		 */		client = mq_pending->client;		cl_log(LOG_INFO, "%s: found client <%p>", __FUNCTION__, client);		reply.qname = ha_strdup(name);		reply.gname = NULL;		reply.request_type = cmsrequest_string2type(request);		reply.invocation = *invocation;		client_send_client_qopen(client, &reply, -1, flag);		ha_free(reply.qname);		g_hash_table_remove(mq_open_pending_hash, name);		ha_free(mq_pending->name);		ha_free(mq_pending);	}	return TRUE;}static voidgroup_mem_dispatch(gpointer data, gpointer user_data){	IPC_Message msg;	client_mqgroup_notify_t * cmg = NULL;	int size;	client_mqgroup_track_t * track = (client_mqgroup_track_t *)data;	notify_buffer_t * notify = (notify_buffer_t *)user_data;	cmg = (client_mqgroup_notify_t *)			malloc(sizeof(client_mqgroup_notify_t));	if (cmg == NULL) {		cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);		return;	}	cmg->header.type = CMS_QUEUEGROUP_NOTIFY;	cmg->header.name = notify->name;	cmg->policy = notify->policy;	cmg->group_name = notify->name;	switch (track->flag) {	case SA_TRACK_CHANGES:		dprintf("group_mem_dispatch: SA_TRACK_CHANGES\n");		size = notify->number * sizeof(SaMsgQueueGroupNotificationT);		msg.msg_len = sizeof(client_mqgroup_notify_t) + size;		cmg->number = notify->number;		cmg = realloc(cmg, msg.msg_len);		cmg->data = (char *)cmg + sizeof(client_mqgroup_notify_t);		memcpy(cmg->data, notify->change_buff, size);		break;	case SA_TRACK_CHANGES_ONLY:		dprintf("group_mem_dispatch: SA_TRACK_CHANGES_ONLY\n");		size = sizeof(SaMsgQueueGroupNotificationT);		msg.msg_len = sizeof(client_mqgroup_notify_t) + size;		cmg->number = 1;		cmg = realloc(cmg, msg.msg_len);		cmg->data = (char *)cmg + sizeof(client_mqgroup_notify_t);		*(SaMsgQueueGroupNotificationT *)(cmg->data)				= notify->changeonly_buff;		break;	default:		cl_log(LOG_ERR, "Unknown track flag [%d]", track->flag);		return;	}	msg.msg_body = cmg;	msg.msg_private = &msg;	msg.msg_done = NULL;	msg.msg_buf = NULL;	/* TODO: msg.msg_done to free memory here */	dprintf("%s: Send Track information to my clients...\n", __FUNCTION__);	track->ch->ops->send(track->ch, &msg);}intprocess_mqgroup_insert(struct ha_msg *msg){	const char *gname, *name;	mqueue_t *mqg, *mq;	notify_buffer_t buf;	if ((gname = ha_msg_value(msg, F_MQGROUPNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq group name request");		return FALSE;	}	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	dprintf("%s: gname = %s, qname = %s\n", __FUNCTION__, gname, name);	/*	 * Check carefully again here in case there are mess	 * message in cluster.	 */	if ((mqg = mqname_lookup(gname, NULL)) == NULL) {		cl_log(LOG_ERR		,	"group name [%s] doesn't exist in local database!"		,	gname);		return FALSE;	}	if (mqg->policy == 0) {		cl_log(LOG_ERR, "[%s] is a mq group name instead of a mq name"		,	gname);		return FALSE;	}	if ((mq = mqname_lookup(name, NULL)) == NULL) {		cl_log(LOG_ERR		,	"mq name [%s] doesn't exist in local database!"		,	name);		return FALSE;	}		/*	 * The mqueue may already in the group, i.e this	 * node is the master name node.	 */	if (g_list_find(mqg->list, mq) == NULL) {		mqg->list = g_list_append(mqg->list, mq);		cl_log(LOG_INFO, "Adding mq <%p> to [%s] list", mq, gname);	}	/*	 * Update the mqueue list to point to append the group.	 */	if (g_list_find(mq->list, mqg) == NULL) {		mq->list = g_list_append(mq->list, mqg);		cl_log(LOG_INFO, "Adding mqg <%p> to [%s] list", mqg, name);	}	/*	 * Current Round Robin counter set to the first list,	 * we may want to set it as a random index to gain	 * more load balance.	 */	if (mqg->current == NULL)		mqg->current = g_list_first(mqg->list);	/*	 * Notify my clients who care about the group	 * membership change message.	 */	if (mqg->notify_list != NULL) {		strcpy(buf.changeonly_buff.member.queueName.value, name);		buf.changeonly_buff.member.queueName.length = strlen(name) + 1;		buf.changeonly_buff.member.queueStatus = mq->status;		buf.changeonly_buff.change = SA_MSG_QUEUE_GROUP_ADDED;		buf.policy = mqg->policy;		buf.number = 0;		strcpy(buf.name.value, gname);		buf.name.length = strlen(gname) + 1;		buf.change_buff = NULL;		g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -