⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmslib_client.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd,			sizeof(client_message_t) + message->size, cmg);	ha_free(cmg);	return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}static intrequest_for_message(__cms_handle_t * hd, const SaNameT * name){	client_header_t request_msg;	request_msg.type = CMS_MSG_REQUEST;	request_msg.name = *name;	return cmsclient_message_send(hd, sizeof(request_msg), &request_msg);}SaErrorTsaMsgMessageGet(const SaMsgQueueHandleT *queueHandle,                SaMsgMessageT *message,                SaMsgMessageInfoT *messageInfo,                SaTimeT timeout){	int ret;	SaErrorT error = SA_OK;	client_message_t * cmg;	client_header_t *rcmg;	client_header_t request_msg;	__cms_handle_t *hd = GET_MQ_HANDLE(queueHandle);	__cms_queue_handle_t *qhd;	SaNameT * qname;	int freecmg = 0;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by queueHandle [%d]"		,	__FUNCTION__, queueHandle ? *queueHandle : -1);		return SA_ERR_BAD_HANDLE;	}	qhd = g_hash_table_lookup(hd->queue_handle_hash, queueHandle);	assert(qhd != NULL);	qname = &(qhd->queue_name);	/*	 * request a message from daemon	 */	request_msg.type = CMS_MSG_REQUEST;	request_msg.name = *qname;	cmsclient_message_send(hd, sizeof(request_msg), &request_msg);wait_again:	ret = wait_for_msg(hd, CMS_MSG_NOTIFY |CMS_MSG_GET, qname, &rcmg,			   timeout);	if (rcmg->type == CMS_MSG_NOTIFY) {		dprintf("Received CMS_MSG_NOTIFY\n");		request_for_message(hd, qname);		goto wait_again;	}	if (ret != SA_OK) {		cl_log(LOG_ERR, "wait_for_msg error [%d]", ret);		return ret;	}	cmg = (client_message_t *)rcmg;	cmg->data = (void *)((char *)cmg + sizeof(client_message_t));	dprintf("message.data is [%s]\n", (char *)cmg->data);	if (message->size < cmg->msg.size) 		error = SA_ERR_NO_SPACE;	message->size = (cmg->msg).size;	if (message->data) {		memcpy(message->data, cmg->data, 			(cmg->msg.size > message->size ? 			 message->size : cmg->msg.size));		freecmg = 1;	} else {		message->data = cmg->data;	}	message->type = cmg->msg.type;	message->version = cmg->msg.version;	message->priority = cmg->msg.priority;	if (freecmg) 		ha_free(cmg);	/* TODO: message info */	return error;}SaErrorT saMsgMessageReceivedGet(const SaMsgQueueHandleT *queueHandle,                        const SaMsgMessageHandleT *messageHandle,                        SaMsgMessageT *message,                        SaMsgMessageInfoT *messageInfo){	return SA_ERR_NOT_SUPPORTED;}SaErrorT saMsgMessageCancel(const SaMsgQueueHandleT *queueHandle){	return SA_ERR_NOT_SUPPORTED;}SaErrorTsaMsgSelectionObjectGet(const SaMsgHandleT *msgHandle, 		SaSelectionObjectT *selectionObject){	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	dprintf("hd->backup_fd is [%d]\n", hd->backup_fd);	ret = hd->backup_fd >= 0 ? hd->active_fd : 		hd->ch->ops->get_recv_select_fd(hd->ch);	if (ret < 0)		return SA_ERR_LIBRARY;	*selectionObject = ret;	return SA_OK;}SaErrorTsaMsgDispatch(const SaMsgHandleT *msgHandle, SaDispatchFlagsT dispatchFlags){	int ret;	client_header_t *msg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	switch (dispatchFlags) {	case SA_DISPATCH_ONE:		read_and_queue_ipc_msg(hd);		if ((msg = dequeue_dispatch_msg(&hd->dispatch_queue)) == NULL) {			cl_log(LOG_ERR, "%s: dequeue_dispatch_msg got NULL"			,	__FUNCTION__);			return SA_OK;		}		ret = dispatch_msg(hd, msg);		if (g_list_length(hd->dispatch_queue))			active_poll(hd);		if (ret != HA_OK) 			return SA_ERR_LIBRARY;		break;	case SA_DISPATCH_ALL:		read_and_queue_ipc_msg(hd);		do {			if ((msg = dequeue_dispatch_msg(&hd->dispatch_queue)) 					!= NULL) {				dispatch_msg(hd, msg);			}		} while (g_list_length(hd->dispatch_queue));		break;	case SA_DISPATCH_BLOCKING:		break;	default:		cl_log(LOG_ERR, "%s: wrong dispatchFlags [%d]", 				__FUNCTION__, dispatchFlags);		return SA_ERR_INVALID_PARAM;	}	return SA_OK;}SaErrorTsaMsgQueueOpenAsync(const SaMsgHandleT *msgHandle,		    SaInvocationT invocation,		    const SaNameT *queueName,		    const SaMsgQueueCreationAttributesT *creationAttributes,		    SaMsgQueueOpenFlagsT openFlags){	client_mqueue_open_t cmg;	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (!openFlags)		return SA_ERR_BAD_FLAGS;	if (bad_saname(queueName) || !msgHandle	||	(!creationAttributes && !(openFlags & SA_MSG_QUEUE_OPEN_ONLY))	||	(creationAttributes && (openFlags & SA_MSG_QUEUE_OPEN_ONLY)))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUE_OPEN_ASYNC;	cmg.header.name = *queueName;	if (creationAttributes) {		if ((creationAttributes->creationFlags != 0)		&&	creationAttributes->creationFlags !=				SA_MSG_QUEUE_PERSISTENT)			return SA_ERR_BAD_FLAGS;		cmg.attr = *creationAttributes;	}	cmg.openflag = openFlags;	cmg.invocation = invocation;	cmg.policy = 0;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (openFlags & SA_MSG_QUEUE_RECEIVE_CALLBACK	&&	!(hd->callbacks).saMsgQueueOpenCallback	&&	!(hd->callbacks).saMsgMessageReceivedCallback)		return SA_ERR_INIT;	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK)		cl_log(LOG_ERR, "%s: cmsclient_message_send failed, rc = %d"		,	__FUNCTION__, ret);	return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}SaErrorTsaMsgQueueGroupCreate(SaMsgHandleT *msgHandle,                      const SaNameT *queueGroupName,                      SaMsgQueueGroupPolicyT queueGroupPolicy){	int ret;	client_mqueue_open_t cmg;	client_mqueue_open_t *rcmg;	client_header_t * reply;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	cmg.header.type = CMS_QUEUEGROUP_CREATE;	cmg.header.name = *queueGroupName;	cmg.invocation = 0;	if (queueGroupPolicy != SA_MSG_QUEUE_GROUP_ROUND_ROBIN)		return SA_ERR_INVALID_PARAM;	cmg.policy = queueGroupPolicy;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK) {		cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d"		,	__FUNCTION__,ret);		return SA_ERR_LIBRARY;	}	ret = wait_for_msg(hd, CMS_QUEUEGROUP_CREATE, queueGroupName, &reply,			   SA_TIME_END);	if (ret != SA_OK)		return ret;	rcmg = (client_mqueue_open_t *) reply;	if ((rcmg->header).flag == SA_OK) {		SaMsgQueueHandleT *key;		key = (SaMsgQueueHandleT *)ha_malloc(sizeof(SaMsgQueueHandleT));		*key = rcmg->handle;		g_hash_table_insert(__mqhandle_hash, key, hd);	}	ret = (rcmg->header).flag;	ha_free(rcmg);	return ret;}SaErrorT saMsgQueueGroupDelete(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName){	/* TODO: we should remove the key that's in the __mqhandle_hash	 * as well	 */	return saMsgQueueUnlink(msgHandle, queueGroupName);}SaErrorT saMsgQueueGroupInsert(SaMsgHandleT *msgHandle,                      const SaNameT *queueGroupName,                      const SaNameT *queueName){	int ret;	client_header_t * rcmg;	client_mqgroup_ops_t cmg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (bad_saname(queueName))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUEGROUP_INSERT;	cmg.header.name = *queueName;	cmg.qgname = *queueGroupName;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK) {	        cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d"		,	__FUNCTION__, ret);		return SA_ERR_LIBRARY;	}	ret = wait_for_msg(hd, CMS_QUEUEGROUP_INSERT, queueName, &rcmg,			   SA_TIME_END);	if (ret != SA_OK)		return ret;	ret = rcmg->flag;	ha_free(rcmg);	return ret;}SaErrorT saMsgQueueGroupRemove(SaMsgHandleT *msgHandle,                      const SaNameT *queueGroupName,                      const SaNameT *queueName){	int ret;	client_header_t * rcmg;	client_mqgroup_ops_t cmg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (bad_saname(queueName))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUEGROUP_REMOVE;	cmg.header.name = *queueName;	cmg.qgname = *queueGroupName;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK) {	        cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d"		,	__FUNCTION__, ret);		return SA_ERR_LIBRARY;	}	ret = wait_for_msg(hd, CMS_QUEUEGROUP_REMOVE, queueName, &rcmg, 			SA_TIME_END);	if (ret != SA_OK) 		return ret;		ret = rcmg->flag;	ha_free((client_mqueue_unlink_t *) rcmg);	return ret;}SaErrorT saMsgQueueGroupTrackStart(const SaMsgHandleT *msgHandle,                     const SaNameT *queueGroupName,                     SaUint8T trackFlags,                     SaMsgQueueGroupNotificationT *notificationBuffer,                     SaUint32T numberOfItems){	int ret;	client_mqgroup_mem_t cmg;	client_header_t * rcmg;	client_mqgroup_notify_t * rmsg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (!(hd->callbacks).saMsgQueueGroupTrackCallback)		return SA_ERR_INIT;	if ((trackFlags & SA_TRACK_CHANGES)	&&	(trackFlags & SA_TRACK_CHANGES_ONLY))		return SA_ERR_BAD_FLAGS;	/* tell server we care about the membership information */	cmg.header.type = CMS_QUEUEGROUP_TRACK_START;	cmg.header.name = *queueGroupName;	cmg.group_name = *queueGroupName;	cmg.flag = trackFlags;	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK) {	        cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d"		,	__FUNCTION__, ret);		return SA_ERR_LIBRARY;	}	ret = wait_for_msg(hd, CMS_QUEUEGROUP_TRACK_START, queueGroupName,			&rcmg, SA_TIME_END);	if (ret != SA_OK) 		return ret;	if ((ret = rcmg->flag) != SA_OK)		return ret;	rmsg = (client_mqgroup_notify_t *)rcmg;	if ((trackFlags & SA_TRACK_CHANGES)	||	(trackFlags & SA_TRACK_CHANGES_ONLY)) {		/*		 * Track membership changes with callbacks.		 */		__mqgroup_track_t * track;		char * name;		name = (char *) ha_malloc(queueGroupName->length + 1);		strncpy(name, queueGroupName->value, queueGroupName->length);		name[queueGroupName->length] = '\0';				track = (__mqgroup_track_t *)				ha_malloc(sizeof(__mqgroup_track_t));		track->name = queueGroupName;		track->flag = trackFlags & ~SA_TRACK_CURRENT;		g_hash_table_insert(__group_tracking_hash, name, track);	}	if (trackFlags & SA_TRACK_CURRENT) {		rmsg->data = (char *)rmsg + sizeof(client_mqgroup_notify_t);		if (!notificationBuffer) {			/*			 * Client wants saMsgQueueGroupTrackCallback.			 */			goto exit;		}	}exit:	ha_free(rcmg);	return ret;}SaErrorT saMsgQueueGroupTrackStop(const SaMsgHandleT *msgHandle,                         const SaNameT *queueGroupName){	char * name;	gpointer key, track;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (bad_saname(queueGroupName))		return SA_ERR_INVALID_PARAM;	name = (char *) ha_malloc(queueGroupName->length + 1);	strncpy(name, queueGroupName->value, queueGroupName->length);	name[queueGroupName->length] = '\0';	if (g_hash_table_lookup_extended(__group_tracking_hash, name, &key	,	&track) == TRUE) {		g_hash_table_remove(__group_tracking_hash, key);		ha_free(key);	} else		return SA_ERR_NOT_EXIST;	ha_free(name);	return SA_OK;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -