⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_cluster.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
		mq->name = ha_strdup(name);		mq->host = ha_strdup(host);		mq->mqstat = MQ_STATUS_OPEN;		mq->client = NULL;		mq->policy = *policy;				mqueue_table_insert(mq);	}	sa_mqueue_usage_decode(size_string, NULL, NULL	,	mq->status.saMsgQueueUsage);	/*	 * set SaMsgQueueStatus in local database	 */	mq->status.sendingState = *sending_state;	mq->status.creationFlags = *cflag;	mq->status.openFlags = *oflag;	mq->status.retentionTime = *retention;	mq->status.headerLength = 0;	mq_pending = g_hash_table_lookup(mq_open_pending_hash, name);	if (mq_pending != NULL) {		/*		 * This is the local node for this msg queue, 		 * we have clients open pending, send out reply.		 */		client = mq_pending->client;		handle = mqueue_handle_insert(mq);		/*		 * insert this mq to client's opened_mqueue_list		 */		dprintf("lookup farside_pid [%d] in <%p>\n"		,	client->farside_pid, cmsdata->client_table);		cms_client = g_hash_table_lookup(cmsdata->client_table,					&(client->farside_pid));		assert(cms_client != NULL);		cms_client->opened_mqueue_list =			g_list_append(cms_client->opened_mqueue_list, mq);		mq->client = (IPC_Channel *)ha_malloc(sizeof(IPC_Channel));		if (mq->client == NULL) {			cl_log(LOG_ERR, "%s: ha_malloc failed\n", __FUNCTION__);			return FALSE;		}		*mq->client = *mq_pending->client;		mq_request.qname = ha_strdup(name);		mq_request.invocation = *invocation;		mq_request.request_type = cmsrequest_string2type(request);		client_send_client_qopen(client, &mq_request, handle, flag);		ha_free(mq_request.qname);		g_hash_table_remove(mq_open_pending_hash, name);		dprintf("%p %p %p\n", mq_pending->name, mq_pending->client		,	mq_pending);		ha_free(mq_pending->name);		ha_free(mq_pending->client);		ha_free(mq_pending);		/*		 * send out notify msg for migratable mq if any		 */		send_migrate_message_notify(mq);	}	return TRUE;}static intsend_undelivered_message(ll_cluster_t *hb, mqueue_t *mq, const char *node){	SaMsgMessageT * message;	struct ha_msg *m;	const char *type = mqname_type2string(MQNAME_TYPE_REOPEN_MSGFEED);	int invalid = FALSE;	const char * request_type;	char size_string[PACKSTRSIZE]; 	sa_mqueue_usage_encode(size_string, NULL, NULL	,	mq->status.saMsgQueueUsage);	dprintf("Timer: current - close %s retention\n"	,	get_current_satime() - mq->status.closeTime	<	mq->status.retentionTime ? "<" : ">=");	if (RETENTION_TIME_EXPIRES(mq)) {		/* retention timer expires */		cl_log(LOG_INFO, "Original node: open a expired queue [%s]"		,	mq->name);		invalid = TRUE;		goto end;	}	while ((message = dequeue_message(mq))) {	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL 	||	ha_msg_add(m, F_MQNAME, mq->name) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGTYPE, (char *) &message->type, 			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGVER, (char *) &message->version,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGSIZE, (char *) &message->size,			sizeof(SaSizeT)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGPRI, (char *) &message->priority,			sizeof(SaUint8T)) == HA_FAIL	||	ha_msg_addbin(m, F_MQMSGDATA, (char *) message->data, 			message->size) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ha_msg_del(m);		return FALSE;	} 	hb->llc_ops->sendnodemsg(hb, m, node);	dprintf("Send 1 msgfeed %d size msg to %s\n"	,	message->size, node);	ha_msg_del(m);	}end:	/*	 * send the MSGFEED_END message	 */	type = mqname_type2string(MQNAME_TYPE_MSGFEED_END);	if ((m = ha_msg_new(0)) == NULL) {		cl_log(LOG_ERR, "%s: out of memory", __FUNCTION__);		return FALSE;	}	if (invalid == TRUE)		request_type = "invalid";	else		request_type = "valid";	if (ha_msg_add(m, F_TYPE, type) == HA_FAIL	||	ha_msg_add(m, F_MQNAME, mq->name) == HA_FAIL	||      ha_msg_add(m, F_MQREQUEST, request_type) == HA_FAIL	||      ha_msg_addbin(m, F_MQPOLICY, &mq->policy, sizeof(int))			== HA_FAIL	||      ha_msg_addbin(m, F_MQCREATEFLAG, &mq->status.creationFlags,			sizeof(SaMsgQueueCreationFlagsT)) == HA_FAIL	||      ha_msg_addbin(m, F_MQOPENFLAG, &mq->status.openFlags,			sizeof(SaMsgQueueOpenFlagsT)) == HA_FAIL	||      ha_msg_addbin(m, F_MQRETENTION, &mq->status.retentionTime,			sizeof(SaTimeT)) == HA_FAIL	||      ha_msg_add(m, F_MQSIZE, size_string) == HA_FAIL) {		cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);		ha_msg_del(m);		return FALSE;	}	hb->llc_ops->sendnodemsg(hb, m, node);	dprintf("Send msgfeed_end msg to %s\n", node);	ha_msg_del(m);	return TRUE;}intprocess_mqname_reopen(struct ha_msg *msg, enum mqname_type type,		      cms_data_t * cmsdata){	static struct ha_msg * saved_msg = NULL;	struct ha_msg * new_msg;	const char *name, *node;	ll_cluster_t *hb = cmsdata->hb_handle;	mqueue_t * mq;	SaMsgMessageT * message;	const SaSizeT * msg_type, * msg_ver, * msg_size, * msg_pri, * data;	size_t type_len, ver_len, pri_len, size_len, data_len;	const char *valid;	const char *size_string = NULL;	const int *s_invocation, *policy;	size_t s_invocation_size, cflag_size, oflag_size, retention_size;	const SaTimeT * retention = NULL;	char *s_request, *s_error;	const SaMsgQueueCreationFlagsT *cflag = NULL, *oflag = NULL;	SaMsgQueueSendingStateT sending_state = SA_MSG_QUEUE_AVAILABLE;	dprintf("%s: type is %d\n", __FUNCTION__, type);	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL) {		cl_log(LOG_ERR, "%s: ha_msg_value failed",__FUNCTION__);		return FALSE;	}	switch (type) {	case MQNAME_TYPE_REOPEN:		if ((node = ha_msg_value(msg, F_MQHOST)) == NULL) {			cl_log(LOG_ERR, "%s: ha_msg_value failed",__FUNCTION__);			return FALSE;		}		if (((mq = mqname_lookup(name, NULL)) != NULL)		&&	mq->mqstat == MQ_STATUS_CLOSE) {			/* Do I have the original mq? */ 			cl_log(LOG_INFO, "Original mq host is %s", mq->host);			if (is_host_local(mq->host, cmsdata)) {				/* Send undelivered message to new mq */				send_undelivered_message(hb, mq, node);			}		}		dprintf("mq->mqstat = <%d>\n", mq->mqstat);		if (!is_host_local(node, cmsdata))			return TRUE;		saved_msg = ha_msg_copy(msg);		cl_log(LOG_INFO, "%s: waiting for msgfeed...", __FUNCTION__);		break;	case MQNAME_TYPE_REOPEN_MSGFEED:		if ((msg_type = cl_get_binary(msg, F_MQMSGTYPE, &type_len))				== NULL		||	(msg_ver = cl_get_binary(msg, F_MQMSGVER, &ver_len)) 				== NULL		||	(msg_pri = cl_get_binary(msg, F_MQMSGPRI, &pri_len))				== NULL		||	(msg_size = cl_get_binary(msg, F_MQMSGSIZE, &size_len))				== NULL		||	(data = cl_get_binary(msg, F_MQMSGDATA, &data_len))				== NULL) {			cl_log(LOG_ERR, "received bad msgfeed msg.");			return FALSE;		}		if ((mq = mqname_lookup(name, NULL)) == NULL		||	mq->mqstat != MQ_STATUS_CLOSE) {			cl_log(LOG_ALERT, "State machine BUG");			return FALSE;		}		message = (SaMsgMessageT *)			ha_malloc(sizeof(SaMsgMessageT) + data_len);		if (!message) {			cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);			return FALSE;		}		message->type = *msg_type;		message->version = *msg_ver;		message->size = *msg_size;		message->priority = *(const SaUint8T *) msg_pri;		message->data = (char *)message + sizeof(SaMsgMessageT);		memcpy(message->data, data, data_len);		enqueue_message(mq, message->priority, message);		break;	case MQNAME_TYPE_MSGFEED_END:		if ((mq = mqname_lookup(name, NULL)) == NULL		||	mq->mqstat != MQ_STATUS_CLOSE) {			cl_log(LOG_ALERT, "State machine BUG");			return FALSE;		}		mq->mqstat = MQ_STATUS_OPEN;		dprintf("in feedend, used is [%d]\n"		,	mq->status.saMsgQueueUsage[3].queueUsed);		/*		 * read saved_msg		 */		if ((s_request = ha_strdup(ha_msg_value(saved_msg,				F_MQREQUEST))) == NULL		||	(s_invocation = cl_get_binary(saved_msg, F_MQINVOCATION,				&s_invocation_size)) == NULL		||	(s_error = ha_strdup(ha_msg_value(saved_msg,F_MQERROR)))				== NULL) {			cl_log(LOG_ERR, "%s: ha_msg_value error", __FUNCTION__);			return FALSE;		}		/*		 * If mq retention time not expired, we will receive		 * a valid mq status information, so that we need to		 * use this original mq status.		 */		if ((valid = ha_msg_value(msg, F_MQREQUEST)) == NULL) {			cl_log(LOG_ERR, "%s: cannot read invalid bit"			,	__FUNCTION__);			return FALSE;		}		if (strncmp(valid, "invalid", 7) == 0)			goto invalid;		/*		 * read msgfeedend		 */		if ((policy = cl_get_binary(msg, F_MQPOLICY, NULL))			== NULL		||	(cflag = cl_get_binary(msg, F_MQCREATEFLAG			,	&cflag_size)) == NULL		||	(oflag = cl_get_binary(msg, F_MQOPENFLAG			,	&oflag_size)) == NULL		||	(retention = cl_get_binary(msg, F_MQRETENTION			,	&retention_size)) == NULL		||	(size_string = ha_msg_value(msg, F_MQSIZE)) == NULL) {			cl_log(LOG_ERR, "%s: cl_get_binary failed"			,	__FUNCTION__);		}		/*		 * create a new msg according to msgfeedend		 */		if ((new_msg = ha_msg_new(0)) == NULL) {			cl_log(LOG_ERR, "%s: no memory", __FUNCTION__);			return FALSE;		}		if (ha_msg_add(new_msg, F_TYPE		,	mqname_type2string(MQNAME_TYPE_GRANTED)) == HA_FAIL		||	ha_msg_add(new_msg, F_MQNAME, name) == HA_FAIL		||	ha_msg_add(new_msg, F_MQREQUEST, s_request)				== HA_FAIL		||	ha_msg_addbin(new_msg, F_MQINVOCATION, s_invocation,				s_invocation_size) == HA_FAIL		||	ha_msg_add(new_msg, F_MQHOST, mq->host) == HA_FAIL		||	ha_msg_addbin(new_msg, F_MQSTATUS, &sending_state,				sizeof(SaMsgQueueSendingStateT)) == HA_FAIL		||	ha_msg_addbin(new_msg, F_MQPOLICY, policy, sizeof(int))				== HA_FAIL		||	ha_msg_addbin(new_msg, F_MQCREATEFLAG, cflag,				cflag_size) == HA_FAIL		||	ha_msg_addbin(new_msg, F_MQOPENFLAG, oflag, oflag_size)				== HA_FAIL		||	ha_msg_addbin(new_msg, F_MQRETENTION, retention,				retention_size) == HA_FAIL		||	ha_msg_add(new_msg, F_MQSIZE, size_string) == HA_FAIL		||	ha_msg_add(new_msg, F_MQERROR, s_error) == HA_FAIL) {			cl_log(LOG_ERR, "%s: ha_msg_add failed", __FUNCTION__);			ha_msg_del(new_msg);			return FALSE;		}		ha_msg_del(saved_msg);		saved_msg = new_msg;		goto sendmsg;invalid:		/*		 * If the open request set SA_MSG_QUEUE_OPEN_ONLY,		 * deny this request, since retention timer expired.		 */		mq = g_hash_table_lookup(mq_open_pending_hash, name);		if (!mq) {			cl_log(LOG_ERR, "BUG: cannot find mq in pending hash");			return TRUE;		}		if ((mq->status.creationFlags & SA_MSG_QUEUE_OPEN_ONLY)) {			mqueue_request_t reply;			cl_log(LOG_INFO, "retention timer expired and "				"SA_MSG_QUEUE_OPEN_ONLY is set, reject!");			reply.qname = ha_strdup(name);			reply.gname = NULL;			reply.request_type = cmsrequest_string2type(s_request);			reply.invocation = *s_invocation;			client_send_client_qopen(mq->client, &reply, -1			,	SA_ERR_NOT_EXIST);			g_hash_table_remove(mq_open_pending_hash, name);			/*			 * We get the change to unlink the mqueue here.			 */			request_mqname_unlink(name, cmsdata);			ha_free(mq);			ha_free(reply.qname);			return TRUE;		}sendmsg:		/*		 * Send granted message to all.		 */		ha_msg_mod(saved_msg, F_TYPE		,	mqname_type2string(MQNAME_TYPE_GRANTED));		hb->llc_ops->sendclustermsg(hb, saved_msg);		ha_msg_del(saved_msg);		saved_msg = NULL;		return TRUE;	default:		break;	}	return TRUE;}/** * process_mqname_denied - process the denied message from the master node *			   for this message queue name * @msg: received message from heartbeat IPC Channel */intprocess_mqname_denied(struct ha_msg *msg){	const char * name, * error, * request;	const int * invocation;	size_t invocation_size;	IPC_Channel *client = NULL;	mqueue_request_t reply;	mqueue_t * mq_pending;	int flag;	if ((name = ha_msg_value(msg, F_MQNAME)) == NULL ||		(error = ha_msg_value(msg, F_MQERROR)) == NULL ||		(request = ha_msg_value(msg, F_MQREQUEST)) == NULL ||		(invocation = cl_get_binary(msg, F_MQINVOCATION,				&invocation_size)) == NULL ) {		cl_log(LOG_ERR, "received NULL mq name or mq error reply");		return FALSE;	}	flag = saerror_string2type(error);	mq_pending = g_hash_table_lookup(mq_open_pending_hash, name);	if (mq_pending != NULL) {		/*		 * we have clients open pending, send out reply		 */		client = mq_pending->client;		cl_log(LOG_INFO, "%s: found client <%p>", __FUNCTION__, client);		reply.qname = ha_strdup(name);		reply.gname = NULL;		reply.request_type = cmsrequest_string2type(request);		reply.invocation = *invocation;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -