⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_cluster.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 4 页
字号:
	mqueue_t *mq;	struct ha_msg *reply;	SaErrorT error = SA_OK;	request = NULL;	CMS_TRACE();	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL	||	(request = ha_msg_value(msg, F_MQREQUEST)) == NULL	||	(invocation = cl_get_binary(msg, F_MQINVOCATION, 			&invocation_size)) == NULL	||	(policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL	||	(cflag = cl_get_binary(msg, F_MQCREATEFLAG, &cflag_size))			== NULL	||	(oflag = cl_get_binary(msg, F_MQOPENFLAG, &oflag_size))			== NULL	||	(retention = cl_get_binary(msg, F_MQRETENTION, &retention_size))			== NULL	||	(size_string = ha_msg_value(msg, F_MQSIZE)) == NULL) {		cl_log(LOG_ERR, "received bad mq request: name = %s, request"		 		" = %s, invo = %d, policy = %d", name, request		 		,	*invocation, *policy);		return FALSE;	}	dprintf("queue (group) name =  %s\n", name);	if ((reply = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (((mq = mqname_lookup(name, NULL)) != NULL)	&&	((mq->mqstat != MQ_STATUS_CLOSE)	||	(mq->mqstat == MQ_STATUS_CLOSE && mq->policy != *policy))) {		cl_log(LOG_INFO, "mq name [%s] already exists", name);		error = SA_ERR_EXIST;		type = mqname_type2string(MQNAME_TYPE_DENIED);		if (ha_msg_add(reply, F_TYPE, type) == HA_FAIL		|| 	ha_msg_add(reply, F_MQREQUEST, request) == HA_FAIL		||	ha_msg_add(reply, F_MQNAME, name) == HA_FAIL		||	ha_msg_addbin(reply, F_MQINVOCATION, invocation, 				invocation_size) == HA_FAIL		||	ha_msg_add(reply, F_MQERROR, saerror_type2string(error))				== HA_FAIL) {			cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);			return FALSE;		}		dprintf("error is %d\n", error);		goto send_msg;	} else if ((mq != NULL) && mq->mqstat == MQ_STATUS_CLOSE) {		/*		 * The mq is closed, here need to be reopened.		 */		type = mqname_type2string(MQNAME_TYPE_REOPEN);		error = SA_OK;		/* we must not set mq->mqstat to MQ_STATUS_OPEN here		 * because on reopen case, the original master name		 * server need to check this bit before msgfeed.		 */		//mq->mqstat = MQ_STATUS_OPEN;		mqhost = ha_msg_value(msg, F_ORIG);		mq->list = NULL;		mq->current = NULL;		mq->notify_list = NULL; 	} else {		type = mqname_type2string(MQNAME_TYPE_GRANTED);		sending_state = SA_MSG_QUEUE_AVAILABLE;		mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t));		if (!mq) {			cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__);			return FALSE;		}		memset(mq, 0, sizeof(mqueue_t));		mq->name = ha_strdup(name);		mq->host = ha_strdup(ha_msg_value(msg, F_ORIG));		mq->mqstat = MQ_STATUS_OPEN;		mq->policy = *policy; 				error = mqueue_table_insert(mq);	}	/*	 * master node broadcast the result in the cluster	 */	if (ha_msg_add(reply, F_TYPE, type) == HA_FAIL	||	ha_msg_add(reply, F_MQNAME, name) == HA_FAIL	||	ha_msg_add(reply, F_MQREQUEST, request) == HA_FAIL	||	ha_msg_addbin(reply, F_MQINVOCATION, invocation, 			invocation_size) == HA_FAIL	||	ha_msg_add(reply, F_MQHOST, (mqhost == NULL ? mq->host : mqhost)) == HA_FAIL	||	ha_msg_addbin(reply, F_MQSTATUS, &sending_state,			sizeof(SaMsgQueueSendingStateT)) == HA_FAIL	||	ha_msg_addbin(reply, F_MQPOLICY, policy, sizeof(int)) == HA_FAIL	||	ha_msg_addbin(reply, F_MQCREATEFLAG, cflag, cflag_size)			== HA_FAIL	||	ha_msg_addbin(reply, F_MQOPENFLAG, oflag, oflag_size)			== HA_FAIL	||	ha_msg_addbin(reply, F_MQRETENTION, retention, retention_size)			== HA_FAIL	||	ha_msg_add(reply, F_MQSIZE, size_string) == HA_FAIL	||	ha_msg_add(reply, F_MQERROR, saerror_type2string(error))			== HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		return FALSE;	}send_msg:	hb->llc_ops->sendclustermsg(hb, reply);	ha_msg_del(reply);	return TRUE;}intprocess_mqname_close(struct ha_msg *msg){	const char *name;	mqueue_t *mq;	client_header_t reply;	CMS_TRACE();	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	if (((mq = mqname_lookup(name, NULL)) != NULL)	&&	(mq->mqstat != MQ_STATUS_CLOSE)) {		mq->mqstat = MQ_STATUS_CLOSE;		cl_log(LOG_INFO, "%s: Set mq [%s] status to [%d]"		,	__FUNCTION__, name, mq->mqstat);	}	/* this is the node where the queue is opened */	if (mq->client) {		cl_log(LOG_INFO, "%s, sending close reply to the client. ", __FUNCTION__);		reply.type = CMS_QUEUE_CLOSE;		reply.len = sizeof(client_header_t);		reply.flag = SA_OK;		reply.name.length = strlen(name);		strncpy(reply.name.value, name, SA_MAX_NAME_LENGTH);		reply.name.value[reply.name.length] = '\0';		client_send_msg(mq->client, reply.len, &reply);	}#if DEBUG_CLUSTER	cl_log_message(msg);#endif	return SA_OK;}intprocess_mqname_unlink(struct ha_msg *msg){	const char *name;	mqueue_t *mq;	IPC_Channel * client = NULL;	client_header_t reply;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "received NULL mq name request");		return FALSE;	}	if ((mq = mqname_lookup(name, NULL)) != NULL) {		/* this is the node where the queue is opened */		if (mq->client) {			client = (IPC_Channel *)ha_malloc(sizeof(IPC_Channel));			*client = *mq->client;		}		mqueue_table_remove(name);		/*		 * TODO: remove handle hash also		 */	}	if (client) {		cl_log(LOG_INFO, "%s, sending unlink reply to the client. ", __FUNCTION__);		reply.type = CMS_QUEUE_UNLINK;		reply.len = sizeof(client_header_t);		reply.flag = SA_OK;		reply.name.length = strlen(name);		strncpy(reply.name.value, name, SA_MAX_NAME_LENGTH);		reply.name.value[reply.name.length] = '\0';		client_send_msg(client, reply.len, &reply);		ha_free(client);	}#if DEBUG_CLUSTER	cl_log_message(msg);#endif	return SA_OK;}intprocess_mqname_send(struct ha_msg *msg, cms_data_t * cmsdata){	const char *name, * gname, * request_type;	const void * data, * ack, *invocation, *msg_pri;	const SaSizeT * msg_type, * msg_ver, * msg_size, * sendreceive;	const char *node;	const unsigned long * seq;	mqueue_request_t request;	message_t * message;	SaErrorT ret = SA_OK;	size_t data_len, ack_len, invocation_len, seq_len, sendreceive_len;	size_t type_len, ver_len, pri_len, size_len;	mqueue_t *mq;	client_mqueue_notify_t m;	const SaUint8T * priority;	enum cms_client_msg_type req_type;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL	||	(request_type = ha_msg_value(msg, F_MQREQUEST)) == NULL	||	(msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len)) 			== NULL	||	(msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) 			== NULL	||	(msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len))			== NULL	||	(msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len))			== NULL	||	(data = cl_get_binary(msg, F_MQMSGDATA, &data_len)) == NULL	||	(invocation = cl_get_binary(msg, F_MQINVOCATION, 			&invocation_len)) == NULL	||	(sendreceive = cl_get_binary(msg, F_SENDRECEIVE, 			&sendreceive_len)) == NULL	||	(seq = cl_get_binary(msg, F_MQMSGSEQ, &seq_len)) == NULL	||	(ack = cl_get_binary(msg, F_MQMSGACK, &ack_len)) == NULL	||	(node = ha_msg_value(msg, F_ORIG)) == NULL	||	*msg_size != data_len ) {		cl_log(LOG_ERR, "received bad mqname_send request.");		return FALSE;	}	gname = ha_msg_value(msg, F_MQGROUPNAME);	priority = (const SaUint8T *) msg_pri;	if ((*priority) > SA_MSG_MESSAGE_LOWEST_PRIORITY) {		cl_log(LOG_ALERT, "Wrong priorty [%u]\n", *priority);		return FALSE;	}		dprintf("%s: going to send to %s\n", __FUNCTION__, name);	if ((mq = mqname_lookup(name, NULL)) != NULL) {		dprintf("%s: data_len = %d\n", __FUNCTION__, (int)data_len);		dprintf("buff_avai[%d] = %lu\n", *priority, 			BUFFER_AVAILABLE(mq, *priority));		/*		 * don't deliver the msg if no buffer		 */		if ((BUFFER_AVAILABLE(mq, *priority) - data_len		-	sizeof(SaMsgMessageT)) < 0) {			cl_log(LOG_DEBUG, "%s: buffer over flow, msg not "				"delivered. ", __FUNCTION__);			ret = SA_ERR_QUEUE_FULL;		} else {			/*			 * save message to mq->message_buffer			 */			message = (message_t *)				ha_malloc(sizeof(message_t) + data_len);			memset(message, 0, sizeof(message_t) + data_len);			if (*sendreceive) {				message->msgInfo.senderId = get_senderId_by_name(node, *seq);			}			message->msg.type = *msg_type;			message->msg.version = *msg_ver;			message->msg.size = *msg_size;			message->msg.priority = * priority;			message->msg.data = (char *)message + sizeof(message_t);			memcpy(message->msg.data, data, data_len);			enqueue_message(mq, *priority, message);			/*			 * send only limited info to client			 */			m.header.type = CMS_MSG_NOTIFY;			m.header.len = sizeof(client_header_t);			m.header.flag = SA_OK;			m.header.name.length = strlen(name) + 1;			m.handle = mq->handle;			strncpy(m.header.name.value, name, SA_MAX_NAME_LENGTH);			/*			 * send the info to the client			 */			ret = client_send_msg(mq->client,					sizeof(client_mqueue_notify_t), &m);			mq->notified = TRUE;			ret = SA_OK;		}		/*		 * send the ack back, but not for sendreceive		 */		req_type = cmsrequest_string2type(request_type);		if (req_type != CMS_MSG_SEND_RECEIVE 			&& ret == SA_OK && ack) {			if (gname) {				request.qname = ha_strdup(gname);			} else {				request.qname = ha_strdup(name);			}			request.gname = NULL;			request.request_type =				cmsrequest_string2type(request_type);			request.invocation = *(const int *)invocation;			request.ack = *(const int *)ack;			request.seq = *(const unsigned long *) seq;			mqname_send_ack(&request, node, NULL, ret, cmsdata);			ha_free(request.qname);			dprintf("send the ack, ret = %d\n", ret);		}	} else {		node = ha_msg_value(msg, F_ORIG);		cl_log(LOG_ERR, "%s: msg queue not found. the name server"			" database on node %s is bad.", __FUNCTION__, node);	}	return SA_OK;}static voidsend_msg_notify(gpointer data, gpointer user_data){	client_mqueue_notify_t m;	mqueue_t * mq = user_data;	m.header.type = CMS_MSG_NOTIFY;	m.header.len = sizeof(client_header_t);	m.header.flag = SA_OK;	m.header.name.length = strlen(mq->name) + 1;	m.handle = mq->handle;	strncpy(m.header.name.value, mq->name, SA_MAX_NAME_LENGTH);	client_send_msg(mq->client, sizeof(client_mqueue_notify_t), &m);}static voidsend_migrate_message_notify(mqueue_t * mq){	SaUint8T i;	for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY	;	i <= SA_MSG_MESSAGE_LOWEST_PRIORITY	;	i++)		g_list_foreach(mq->message_buffer[i], send_msg_notify, mq);}/** * process_mqname_granted - process the granted message from the master node *			    for this message queue name * @msg: received message from heartbeat IPC Channel */intprocess_mqname_granted(struct ha_msg *msg, cms_data_t * cmsdata){	const char *name, *host, *error, *request;	const int * invocation, *policy;	size_t invocation_size; 	const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL;	const SaMsgQueueSendingStateT * sending_state;	const SaTimeT * retention = NULL;	const char *size_string;		IPC_Channel *client;	mqueue_t *mq, *mq_pending;	mqueue_request_t mq_request;	cms_client_t * cms_client;	guint handle;	int flag;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL	||	(request = ha_msg_value(msg, F_MQREQUEST)) == NULL	||	(invocation = cl_get_binary(msg, F_MQINVOCATION, 			&invocation_size)) == NULL	||	(host = ha_msg_value(msg, F_MQHOST)) == NULL	||	(policy = cl_get_binary(msg, F_MQPOLICY, NULL)) == NULL	||	(cflag = cl_get_binary(msg, F_MQCREATEFLAG, NULL)) == NULL	||	(oflag = cl_get_binary(msg, F_MQOPENFLAG, NULL)) == NULL	||	(retention = cl_get_binary(msg, F_MQRETENTION, NULL)) == NULL	||	(sending_state = cl_get_binary(msg, F_MQSTATUS, NULL)) == NULL	||	(size_string = ha_msg_value(msg, F_MQSIZE)) == NULL	||	(error = ha_msg_value(msg, F_MQERROR)) == NULL) {			cl_log(LOG_ERR, "%s: ha_msg_value error", __FUNCTION__);			return FALSE;	}	flag = saerror_string2type(error);	/*	 * This node might be the mqname master node, so make sure don't	 * duplicate insertion.	 */	if ((mq = mqueue_table_lookup(name, NULL)) == NULL) {		/*		 * this is not the master node		 */		mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t));		if (!mq) {			cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__);			return FALSE;		}		memset(mq, 0, sizeof(mqueue_t));		mq->name = ha_strdup(name);		mq->host = ha_strdup(host);		mq->mqstat = MQ_STATUS_OPEN;		mq->client = NULL;		mq->policy = *policy;				mqueue_table_insert(mq);	}	sa_mqueue_usage_decode(size_string, NULL, NULL	,	mq->status.saMsgQueueUsage);	/*	 * set SaMsgQueueStatus in local database	 */	mq->status.sendingState = *sending_state;	mq->status.creationFlags = *cflag;	mq->status.openFlags = *oflag;	mq->status.retentionTime = *retention;	mq->status.headerLength = 0;	mq_pending = g_hash_table_lookup(mq_open_pending_hash, name);	if (mq_pending != NULL) {		/*		 * This is the local node for this msg queue, 		 * we have clients open pending, send out reply.		 */		client = mq_pending->client;		handle = mqueue_handle_insert(mq);		/*		 * insert this mq to client's opened_mqueue_list		 */		dprintf("lookup farside_pid [%d] in <%p>\n"		,	client->farside_pid, cmsdata->client_table);		cms_client = g_hash_table_lookup(cmsdata->client_table,					&(client->farside_pid));		if (cms_client == NULL){			/* this happens when the impatient client quit			   before it the response is received. */			cl_log(LOG_WARNING, "the client who requested queue [%s] to be opened does not exist any more.", name);			g_hash_table_remove(mq_open_pending_hash, name);			ha_free(mq_pending->name);			ha_free(mq_pending->client);			ha_free(mq_pending);			request_mqname_close(name, cmsdata);			/* todo: need better error handling here. 			   we should be able to unlink this queue from the client side.			request_mqname_unlink(name, cmsdata);			*/			return FALSE;		}		cms_client->opened_mqueue_list =			g_list_append(cms_client->opened_mqueue_list, mq);		mq->client = (IPC_Channel *)ha_malloc(sizeof(IPC_Channel));		if (mq->client == NULL) {			cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__);			return FALSE;		}		*mq->client = *mq_pending->client;		mq_request.qname = ha_strdup(name);		mq_request.invocation = *invocation;		mq_request.request_type = cmsrequest_string2type(request);		client_send_client_qopen(client, &mq_request, handle, flag);		ha_free(mq_request.qname);		g_hash_table_remove(mq_open_pending_hash, name);		dprintf("%p %p %p\n", mq_pending->name, mq_pending->client		,	mq_pending);		ha_free(mq_pending->name);		ha_free(mq_pending->client);		ha_free(mq_pending);		/*		 * send out notify msg for migratable mq if any		 */		send_migrate_message_notify(mq);	}	return TRUE;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -