⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_cluster.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
}intrequest_mqname_update(const char * node, cms_data_t * cmsdata){	struct mq_info * mqinfo;	size_t mqinfo_len;	const char * type;	struct ha_msg *msg;	ll_cluster_t *hb;	if (mqueue_table_pack(&mqinfo, &mqinfo_len) != HA_OK) {		return HA_FAIL;	}	if ((msg = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return HA_FAIL;	}	type = mqname_type2string(MQNAME_TYPE_UPDATE);		if (ha_msg_add(msg, F_TYPE, type) == HA_FAIL	||	ha_msg_addbin(msg, F_MQUPDATE, mqinfo, mqinfo_len) == HA_FAIL)	{		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return HA_FAIL;	} 	hb = cmsdata->hb_handle;#if DEBUG_CLUSTER	cl_log_message(msg);#endif	hb->llc_ops->sendnodemsg(hb, msg, node);	ha_free(mqinfo);	ha_msg_del(msg);	return HA_OK;}/** * reply_mqname_open - process the request message as the master node *		       for this message queue name * * @hb:		heartbeat IPC Channel handle * @msg:	received message from heartbeat IPC Channel */intreply_mqname_open(ll_cluster_t *hb, struct ha_msg *msg){	const char *name, *type, *request;	size_t  invocation_size, cflag_size, oflag_size, retention_size;	const SaInvocationT * invocation = NULL;	const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL;	const SaTimeT * retention = NULL;	const int * policy = NULL;	const char * size_string;	SaMsgQueueSendingStateT sending_state;	mqueue_t *mq;	struct ha_msg *reply;	SaErrorT error = SA_OK;	request = NULL;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL	||	(request = ha_msg_value(msg, F_MQREQUEST)) == NULL	||	(invocation = cl_get_binary(msg, F_MQINVOCATION, 			&invocation_size)) == NULL	||	(policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL	||	(cflag = cl_get_binary(msg, F_MQCREATEFLAG, &cflag_size))			== NULL	||	(oflag = cl_get_binary(msg, F_MQOPENFLAG, &oflag_size))			== NULL	||	(retention = cl_get_binary(msg, F_MQRETENTION, &retention_size))			== NULL	||	(size_string = ha_msg_value(msg, F_MQSIZE)) == NULL) {		cl_log(LOG_ERR, "received bad mq request: name = %s, request"		 		" = %s, invo = %d, policy = %d", name, request		 		,	*invocation, *policy);		return FALSE;	}	if ((reply = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (((mq = mqname_lookup(name, NULL)) != NULL)	&&	((mq->mqstat != MQ_STATUS_CLOSE)	||	(mq->mqstat == MQ_STATUS_CLOSE && mq->policy != *policy))) {		cl_log(LOG_INFO, "mq name [%s] already exists", name);		error = SA_ERR_EXIST;		type = mqname_type2string(MQNAME_TYPE_DENIED);		if (ha_msg_add(reply, F_TYPE, type) == HA_FAIL		|| 	ha_msg_add(reply, F_MQREQUEST, request) == HA_FAIL		||	ha_msg_add(reply, F_MQNAME, name) == HA_FAIL		||	ha_msg_addbin(reply, F_MQINVOCATION, invocation, 				invocation_size) == HA_FAIL		||	ha_msg_add(reply, F_MQERROR, saerror_type2string(error))				== HA_FAIL) {			cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);			return FALSE;		}		dprintf("error is %d\n", error);		goto send_msg;	} else if ((mq != NULL) && mq->mqstat == MQ_STATUS_CLOSE) {		/*		 * The mq is closed, here need to be reopened.		 */		type = mqname_type2string(MQNAME_TYPE_REOPEN);		error = SA_OK;		ha_free(mq->host);		mq->host = ha_strdup(ha_msg_value(msg, F_ORIG));		/* we must not set mq->mqstat to MQ_STATUS_OPEN here		 * because on reopen case, the original master name		 * server need to check this bit before msgfeed.		 */		/* mq->mqstat = MQ_STATUS_OPEN; */		mq->list = NULL;		mq->current = NULL;		mq->notify_list = NULL; 	} else {		type = mqname_type2string(MQNAME_TYPE_GRANTED);		sending_state = SA_MSG_QUEUE_AVAILABLE;		mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t));		if (!mq) {			cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__);			return FALSE;		}		memset(mq, 0, sizeof(mqueue_t));		mq->name = ha_strdup(name);		mq->host = ha_strdup(ha_msg_value(msg, F_ORIG));		mq->mqstat = MQ_STATUS_OPEN;		mq->policy = *policy; 				error = mqueue_table_insert(mq);	}	/*	 * master node broadcast the result in the cluster	 */	if (ha_msg_add(reply, F_TYPE, type) == HA_FAIL	||	ha_msg_add(reply, F_MQNAME, name) == HA_FAIL	||	ha_msg_add(reply, F_MQREQUEST, request) == HA_FAIL	||	ha_msg_addbin(reply, F_MQINVOCATION, invocation, 			invocation_size) == HA_FAIL	||	ha_msg_add(reply, F_MQHOST, mq->host) == HA_FAIL	||	ha_msg_addbin(reply, F_MQSTATUS, &sending_state,			sizeof(SaMsgQueueSendingStateT)) == HA_FAIL	||	ha_msg_addbin(reply, F_MQPOLICY, policy, sizeof(int)) == HA_FAIL	||	ha_msg_addbin(reply, F_MQCREATEFLAG, cflag, cflag_size)			== HA_FAIL	||	ha_msg_addbin(reply, F_MQOPENFLAG, oflag, oflag_size)			== HA_FAIL	||	ha_msg_addbin(reply, F_MQRETENTION, retention, retention_size)			== HA_FAIL	||	ha_msg_add(reply, F_MQSIZE, size_string) == HA_FAIL	||	ha_msg_add(reply, F_MQERROR, saerror_type2string(error))			== HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	}send_msg:	hb->llc_ops->sendclustermsg(hb, reply);	ha_msg_del(reply);	return TRUE;}intprocess_mqname_close(struct ha_msg *msg){	const char *name;	mqueue_t *mq;	CMS_TRACE();	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	if (((mq = mqname_lookup(name, NULL)) != NULL)	&&	(mq->mqstat != MQ_STATUS_CLOSE)) {		mq->mqstat = MQ_STATUS_CLOSE;		cl_log(LOG_INFO, "%s: Set mq [%s] status to [%d]"		,	__FUNCTION__, name, mq->mqstat);	}#if DEBUG_CLUSTER	cl_log_message(msg);#endif	return SA_OK;}intprocess_mqname_unlink(struct ha_msg *msg){	const char *name;	mqueue_t *mq;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	if ((mq = mqname_lookup(name, NULL)) != NULL) {		mqueue_table_remove(name);		/*		 * TODO: remove handle hash also		 */	}#if DEBUG_CLUSTER	cl_log_message(msg);#endif	return SA_OK;}intprocess_mqname_send(struct ha_msg *msg, cms_data_t * cmsdata){	const char *name, * gname, * request_type;	const void * data, * ack, *invocation, *seq, *msg_pri;	const SaSizeT * msg_type, * msg_ver, * msg_size;	const char *node;	mqueue_request_t request;	SaMsgMessageT * message;	SaErrorT ret = SA_OK;	size_t data_len, ack_len, invocation_len, seq_len;	size_t type_len, ver_len, pri_len, size_len;	mqueue_t *mq;	client_mqueue_notify_t m;	const SaUint8T * priority;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL	||	(request_type = ha_msg_value(msg, F_MQREQUEST)) == NULL	||	(msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len)) 			== NULL	||	(msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) 			== NULL	||	(msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len))			== NULL	||	(msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len))			== NULL	||	(data = cl_get_binary(msg, F_MQMSGDATA, &data_len)) == NULL	||	(invocation = cl_get_binary(msg, F_MQINVOCATION, 			&invocation_len)) == NULL	||	(seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL	||	(ack = cl_get_binary(msg, F_MQMSGACK, &ack_len)) == NULL	||	*msg_size != data_len ) {		cl_log(LOG_ERR, "received bad mqname_send request.");		return FALSE;	}	gname = ha_msg_value(msg, F_MQGROUPNAME);	priority = (const SaUint8T *) msg_pri;	if ((*priority) > SA_MSG_MESSAGE_LOWEST_PRIORITY) {		cl_log(LOG_ALERT, "Wrong priorty [%u]\n", *priority);		return FALSE;	}		dprintf("%s: going to send to %s\n", __FUNCTION__, name);	if ((mq = mqname_lookup(name, NULL)) != NULL) {		dprintf("%s: data_len = %d\n", __FUNCTION__, (int)data_len);		dprintf("buff_avai[%d] = %lu\n", *priority, 			BUFFER_AVAILABLE(mq, *priority));		/*		 * don't deliver the msg if no buffer		 */		if ((BUFFER_AVAILABLE(mq, *priority) - data_len		-	sizeof(SaMsgMessageT)) < 0) {			cl_log(LOG_DEBUG, "%s: buffer over flow, msg not "				"delivered. ", __FUNCTION__);			ret = SA_ERR_QUEUE_FULL;		} else {			/*			 * save message to mq->message_buffer			 */			message = (SaMsgMessageT*)				ha_malloc(sizeof(SaMsgMessageT) + data_len);			if (!message)				return SA_ERR_NO_MEMORY;			message->type = *msg_type;			message->version = *msg_ver;			message->size = *msg_size;			message->priority = * priority;			message->data = (char *)message + sizeof(SaMsgMessageT);			memcpy(message->data, data, data_len);			enqueue_message(mq, *priority, message);			/*			 * send only limited info to client			 */			m.header.type = CMS_MSG_NOTIFY;			m.header.len = sizeof(client_header_t);			m.header.flag = SA_OK;			m.header.name.length = strlen(name) + 1;			m.handle = mq->handle;			strncpy(m.header.name.value, name, SA_MAX_NAME_LENGTH);			/*			 * send the info to the client			 */			ret = client_send_msg(mq->client,					sizeof(client_mqueue_notify_t), &m);			mq->notified = TRUE;			ret = SA_OK;		}		/*		 * send the ack back		 */		if (ret == SA_OK && ack) {			node = ha_msg_value(msg, F_ORIG);			if (gname) {				request.qname = ha_strdup(gname);			} else {				request.qname = ha_strdup(name);			}			request.gname = NULL;			request.request_type =				cmsrequest_string2type(request_type);			request.invocation = *(const int *)invocation;			request.ack = *(const int *)ack;			request.seq = *(const unsigned long *) seq;			mqname_send_ack(&request, node, NULL, ret, cmsdata);			ha_free(request.qname);			dprintf("send the ack, ret = %d\n", ret);		}	} else {		node = ha_msg_value(msg, F_ORIG);		cl_log(LOG_ERR, "%s: msg queue not found. the name server"			" database on node %s is bad.", __FUNCTION__, node);	}	return SA_OK;}static voidsend_msg_notify(gpointer data, gpointer user_data){	client_mqueue_notify_t m;	mqueue_t * mq = user_data;	m.header.type = CMS_MSG_NOTIFY;	m.header.len = sizeof(client_header_t);	m.header.flag = SA_OK;	m.header.name.length = strlen(mq->name) + 1;	m.handle = mq->handle;	strncpy(m.header.name.value, mq->name, SA_MAX_NAME_LENGTH);	client_send_msg(mq->client, sizeof(client_mqueue_notify_t), &m);}static voidsend_migrate_message_notify(mqueue_t * mq){	SaUint8T i;	for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY	;	i <= SA_MSG_MESSAGE_LOWEST_PRIORITY	;	i++)		g_list_foreach(mq->message_buffer[i], send_msg_notify, mq);}/** * process_mqname_granted - process the granted message from the master node *			    for this message queue name * @msg: received message from heartbeat IPC Channel */intprocess_mqname_granted(struct ha_msg *msg, cms_data_t * cmsdata){	const char *name, *host, *error, *request;	const int * invocation, *policy;	size_t invocation_size; 	const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL;	const SaMsgQueueSendingStateT * sending_state;	const SaTimeT * retention = NULL;	const char *size_string;		IPC_Channel *client;	mqueue_t *mq, *mq_pending;	mqueue_request_t mq_request;	cms_client_t * cms_client;	guint handle;	int flag;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL	||	(request = ha_msg_value(msg, F_MQREQUEST)) == NULL	||	(invocation = cl_get_binary(msg, F_MQINVOCATION, 			&invocation_size)) == NULL	||	(host = ha_msg_value(msg, F_MQHOST)) == NULL	||	(policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL	||	(cflag = cl_get_binary(msg, F_MQCREATEFLAG, NULL)) == NULL	||	(oflag = cl_get_binary(msg, F_MQOPENFLAG, NULL)) == NULL	||	(retention = cl_get_binary(msg, F_MQRETENTION, NULL)) == NULL	||	(sending_state = cl_get_binary(msg, F_MQSTATUS, NULL)) == NULL	||	(size_string = ha_msg_value(msg, F_MQSIZE)) == NULL	||	(error = ha_msg_value(msg, F_MQERROR)) == NULL) {			cl_log(LOG_ERR, "%s: ha_msg_value error", __FUNCTION__);			return FALSE;	}	flag = saerror_string2type(error);	/*	 * This node might be the mqname master node, so make sure don't	 * duplicate insertion.	 */	if ((mq = mqueue_table_lookup(name, NULL)) == NULL) {		/*		 * this is not the master node		 */		mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t));		if (!mq) {			cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__);			return FALSE;		}		memset(mq, 0, sizeof(mqueue_t));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -