⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmslib_client.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 3 页
字号:
	if (ret < 0)		return SA_ERR_LIBRARY;	*selectionObject = ret;	return SA_OK;}SaErrorTsaMsgDispatch(const SaMsgHandleT *msgHandle, SaDispatchFlagsT dispatchFlags){	int ret;	client_header_t *msg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	switch (dispatchFlags) {	case SA_DISPATCH_ONE:		read_and_queue_ipc_msg(hd);		if ((msg = dequeue_dispatch_msg(&hd->dispatch_queue)) == NULL) {			cl_log(LOG_ERR, "%s: dequeue_dispatch_msg got NULL"			,	__FUNCTION__);			return SA_OK;		}		ret = dispatch_msg(hd, msg);		if (g_list_length(hd->dispatch_queue))			active_poll(hd);		if (ret != HA_OK) 			return SA_ERR_LIBRARY;		break;	case SA_DISPATCH_ALL:		read_and_queue_ipc_msg(hd);		do {			if ((msg = dequeue_dispatch_msg(&hd->dispatch_queue)) 					!= NULL) {				dispatch_msg(hd, msg);			}		} while (g_list_length(hd->dispatch_queue));		break;	case SA_DISPATCH_BLOCKING:		break;	default:		cl_log(LOG_ERR, "%s: wrong dispatchFlags [%d]", 				__FUNCTION__, dispatchFlags);		return SA_ERR_INVALID_PARAM;	}	return SA_OK;}SaErrorTsaMsgQueueOpenAsync(const SaMsgHandleT *msgHandle,		    SaInvocationT invocation,		    const SaNameT *queueName,		    const SaMsgQueueCreationAttributesT *creationAttributes,		    SaMsgQueueOpenFlagsT openFlags){	client_mqueue_open_t cmg;	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (!openFlags)		return SA_ERR_BAD_FLAGS;	if (bad_saname(queueName) || !msgHandle	||	(!creationAttributes && openFlags & SA_MSG_QUEUE_CREATE)	||	(creationAttributes && !(openFlags & SA_MSG_QUEUE_CREATE)))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUE_OPEN_ASYNC;	cmg.header.name = *queueName;	if (creationAttributes) {		if ((creationAttributes->creationFlags != 0)		&&	creationAttributes->creationFlags !=				SA_MSG_QUEUE_PERSISTENT)			return SA_ERR_BAD_FLAGS;		cmg.attr = *creationAttributes;	}	cmg.openflag = openFlags;	cmg.invocation = invocation;	cmg.policy = 0;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (openFlags & SA_MSG_QUEUE_RECEIVE_CALLBACK	&&	!(hd->callbacks).saMsgQueueOpenCallback	&&	!(hd->callbacks).saMsgMessageReceivedCallback)		return SA_ERR_INIT;	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK)		cl_log(LOG_ERR, "%s: cmsclient_message_send failed, rc = %d"		,	__FUNCTION__, ret);	return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}SaErrorTsaMsgQueueGroupCreate(SaMsgHandleT *msgHandle,                      const SaNameT *queueGroupName,                      SaMsgQueueGroupPolicyT queueGroupPolicy){	int ret;	client_mqueue_open_t cmg;	client_mqueue_open_t *rcmg;	client_header_t * reply;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	cmg.header.type = CMS_QUEUEGROUP_CREATE;	cmg.header.name = *queueGroupName;	cmg.invocation = 0;	if (queueGroupPolicy != SA_MSG_QUEUE_GROUP_ROUND_ROBIN)		return SA_ERR_INVALID_PARAM;	cmg.policy = queueGroupPolicy;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK) {		cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d"		,	__FUNCTION__,ret);		return SA_ERR_LIBRARY;	}	ret = wait_for_msg(hd, CMS_QUEUEGROUP_CREATE, queueGroupName, &reply,			   SA_TIME_END);	if (ret != SA_OK)		return ret;	rcmg = (client_mqueue_open_t *) reply;	if ((rcmg->header).flag == SA_OK) {		SaMsgQueueHandleT *key;		key = (SaMsgQueueHandleT *)ha_malloc(sizeof(SaMsgQueueHandleT));		*key = rcmg->handle;		g_hash_table_insert(__mqhandle_hash, key, hd);	}	ret = (rcmg->header).flag;	ha_free(rcmg);	return ret;}SaErrorT saMsgQueueGroupDelete(SaMsgHandleT *msgHandle, const SaNameT *queueGroupName){	/* TODO: we should remove the key that's in the __mqhandle_hash	 * as well	 */	return saMsgQueueUnlink(msgHandle, queueGroupName);}SaErrorT saMsgQueueGroupInsert(SaMsgHandleT *msgHandle,                      const SaNameT *queueGroupName,                      const SaNameT *queueName){	int ret;	client_header_t * rcmg;	client_mqgroup_ops_t cmg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (bad_saname(queueName))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUEGROUP_INSERT;	cmg.header.name = *queueName;	cmg.qgname = *queueGroupName;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK) {	        cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d"		,	__FUNCTION__, ret);		return SA_ERR_LIBRARY;	}	ret = wait_for_msg(hd, CMS_QUEUEGROUP_INSERT, queueName, &rcmg,			   SA_TIME_END);	if (ret != SA_OK)		return ret;	ret = rcmg->flag;	ha_free(rcmg);	return ret;}SaErrorT saMsgQueueGroupRemove(SaMsgHandleT *msgHandle,                      const SaNameT *queueGroupName,                      const SaNameT *queueName){	int ret;	client_header_t * rcmg;	client_mqgroup_ops_t cmg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (bad_saname(queueName))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUEGROUP_REMOVE;	cmg.header.name = *queueName;	cmg.qgname = *queueGroupName;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK) {	        cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d"		,	__FUNCTION__, ret);		return SA_ERR_LIBRARY;	}	ret = wait_for_msg(hd, CMS_QUEUEGROUP_REMOVE, queueName, &rcmg, 			SA_TIME_END);	if (ret != SA_OK) 		return ret;		ret = rcmg->flag;	ha_free((client_mqueue_unlink_t *) rcmg);	return ret;}SaErrorT saMsgQueueGroupTrack(const SaMsgHandleT *msgHandle,                     const SaNameT *queueGroupName,                     SaUint8T trackFlags,                     SaMsgQueueGroupNotificationBufferT *notificationBuffer){	int ret;	client_mqgroup_mem_t cmg;	client_header_t * rcmg;	client_mqgroup_notify_t * rmsg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (!(hd->callbacks).saMsgQueueGroupTrackCallback)		return SA_ERR_INIT;	if ((trackFlags & SA_TRACK_CHANGES)	&&	(trackFlags & SA_TRACK_CHANGES_ONLY))		return SA_ERR_BAD_FLAGS;	/* tell server we care about the membership information */	cmg.header.type = CMS_QUEUEGROUP_TRACK_START;	cmg.header.name = *queueGroupName;	cmg.group_name = *queueGroupName;	cmg.flag = trackFlags;	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	if (ret != IPC_OK) {	        cl_log(LOG_ERR, "%s: cmsclient_message_send returns %d"		,	__FUNCTION__, ret);		return SA_ERR_LIBRARY;	}	ret = wait_for_msg(hd, CMS_QUEUEGROUP_TRACK_START, queueGroupName,			&rcmg, SA_TIME_END);	if (ret != SA_OK) 		return ret;	if ((ret = rcmg->flag) != SA_OK)		return ret;	rmsg = (client_mqgroup_notify_t *)rcmg;	if ((trackFlags & SA_TRACK_CHANGES)	||	(trackFlags & SA_TRACK_CHANGES_ONLY)) {		/*		 * Track membership changes with callbacks.		 */		__mqgroup_track_t * track;		char * name;		name = (char *) ha_malloc(queueGroupName->length + 1);		strncpy(name, queueGroupName->value, queueGroupName->length);		name[queueGroupName->length] = '\0';				track = (__mqgroup_track_t *)				ha_malloc(sizeof(__mqgroup_track_t));		track->name = queueGroupName;		track->flag = trackFlags & ~SA_TRACK_CURRENT;		g_hash_table_insert(__group_tracking_hash, name, track);	}	if (trackFlags & SA_TRACK_CURRENT) {		rmsg->data = (char *)rmsg + sizeof(client_mqgroup_notify_t);		if (!notificationBuffer) {			/*			 * Client wants saMsgQueueGroupTrackCallback.			 */			goto exit;		}		dprintf("numberOfItems %lu, real number %lu\n"		,	notificationBuffer->numberOfItems, rmsg->number);		if (!notificationBuffer->notification) {			notificationBuffer->notification =				(SaMsgQueueGroupNotificationT *)				ha_malloc(rmsg->number *					sizeof(SaMsgQueueGroupNotificationT));			if (!notificationBuffer->notification) {				ret = SA_ERR_NO_MEMORY;				goto exit;			}		} else if (notificationBuffer->numberOfItems < rmsg->number) {			ret = SA_ERR_NO_SPACE;			goto exit;		}		notificationBuffer->numberOfItems = rmsg->number;		memcpy(notificationBuffer->notification, rmsg->data		,	rmsg->number * sizeof(SaMsgQueueGroupNotificationT));	}exit:	ha_free(rcmg);	return ret;}SaErrorT saMsgQueueGroupTrackStop(const SaMsgHandleT *msgHandle,                         const SaNameT *queueGroupName){	char * name;	gpointer key, track;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (bad_saname(queueGroupName))		return SA_ERR_INVALID_PARAM;	name = (char *) ha_malloc(queueGroupName->length + 1);	strncpy(name, queueGroupName->value, queueGroupName->length);	name[queueGroupName->length] = '\0';	if (g_hash_table_lookup_extended(__group_tracking_hash, name, &key	,	&track) == TRUE) {		g_hash_table_remove(__group_tracking_hash, key);		ha_free(key);	} else		return SA_ERR_NOT_EXIST;	ha_free(name);	return SA_OK;}SaErrorT saMsgMessageSendReceive(SaMsgHandleT msgHandle,			const SaNameT *destination,                        const SaMsgMessageT *sendMessage,                        SaMsgMessageT *receiveMessage,			SaTimeT *replySendTime,                        SaTimeT timeout){	SaErrorT error = SA_OK;	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(&msgHandle);	const SaMsgMessageT * message;	client_message_t *cmg;	client_header_t *rcmg;	client_message_t * ack;	int freeack = 0;	message = sendMessage;	if (message->priority > SA_MSG_MESSAGE_LOWEST_PRIORITY)		return SA_ERR_INVALID_PARAM;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle);		return SA_ERR_BAD_HANDLE;	}	cmg = (client_message_t *)		ha_malloc(sizeof(client_message_t) + message->size);	cmg->header.type = CMS_MSG_SEND_RECEIVE;	cmg->header.name = *destination;	cmg->msg = *message;	cmg->invocation = 0;	cmg->data = cmg + 1;	cmg->sendreceive = 1;	memcpy(cmg->data, message->data, message->size);	cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */	ret = cmsclient_message_send(hd,			sizeof(client_message_t) + message->size, cmg);	/* TODO: fix needed.  this can only be called after the msg_done */	ha_free(cmg);	while (1) {		ret = wait_for_msg(hd, CMS_MSG_RECEIVE, 				NULL, &rcmg, timeout);		if (ret != SA_OK) 			return ret;		else 			break;	}	ret = rcmg->flag;	ack = (client_message_t *) rcmg;	ack->data = (void *)((char *)cmg + sizeof(client_message_t));	if (ack->msg.size > receiveMessage->size) {		error = SA_ERR_NO_SPACE;	}	receiveMessage->size = ack->msg.size;	if (receiveMessage->data) {		memcpy(receiveMessage->data, ack->data,			(ack->msg.size > receiveMessage->size ? 			 receiveMessage->size : ack->msg.size));		freeack = 1;	} else {		receiveMessage->data = ack->data;	}	receiveMessage->type = ack->msg.type;	receiveMessage->version = ack->msg.version;	receiveMessage->priority = 0;	if (freeack) 		ha_free(ack);	dprintf("type is %d\n", ack->send_type);	return error;}SaErrorT saMsgMessageReply(SaMsgHandleT msgHandle,		  const SaMsgMessageT *replyMessage,                  const SaMsgSenderIdT *senderId,                  SaTimeT timeout){	client_message_t *cmg;	client_header_t *rcmg;	client_message_ack_t * ack;	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(&msgHandle);	cmg = (client_message_t *)		ha_malloc(sizeof(client_message_t) + replyMessage->size);	cmg->header.type = CMS_MSG_REPLY;	cmg->header.name.length = 0;	cmg->msg = *replyMessage;	cmg->invocation = 0;	cmg->data = cmg + 1;	memcpy(cmg->data, replyMessage->data, replyMessage->size);	cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd,		sizeof(client_message_t) + replyMessage->size, cmg);	ha_free(cmg);	while (1) {		ret = wait_for_msg(hd, CMS_MSG_ACK,				NULL, &rcmg, timeout);		if (ret != SA_OK) 			return ret;		ret = rcmg->flag;		ack = (client_message_ack_t *) rcmg;		/*		 * CMS_MSG_SEND is a blocking call, so we can only		 * have one client waiting for it. Thus when we get		 * an ACK that is for the request type CMS_MSG_SEND,		 * we know this is the ACK we are waiting for.		 */		dprintf("type is %d\n", ack->send_type);		if (ack->send_type == CMS_MSG_REPLY) {			ha_free((client_message_t *) rcmg);			return ret;		} else {			enqueue_dispatch_msg(hd, rcmg);		}	}	return SA_ERR_NOT_SUPPORTED;}SaErrorT saMsgMessageReplyAsync(SaMsgHandleT msgHandle,                       SaInvocationT invocation,                       const SaMsgMessageT *replyMessage,		       const SaMsgSenderIdT *senderId,                       SaMsgAckFlagsT ackFlags){	client_message_t *cmg;	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(&msgHandle);	cmg = (client_message_t *)			ha_malloc(sizeof(client_message_t) + replyMessage->size);	cmg->header.type = CMS_MSG_REPLY_ASYNC;	cmg->header.name.length = 0;	cmg->msg = *replyMessage;	cmg->invocation = 0;	cmg->data = cmg + 1;	memcpy(cmg->data, replyMessage->data, replyMessage->size);	cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd,		sizeof(client_message_t) + replyMessage->size, cmg);	ha_free(cmg);	return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -