⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cmslib_client.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 3 页
字号:
 					ha_free(name);		ha_free(nmsg); 		break;	default:		return HA_FAIL;	}	return HA_OK;}SaErrorTsaMsgInitialize(SaMsgHandleT *msgHandle, const SaMsgCallbacksT *msgCallbacks,                const SaVersionT *version){	IPC_Channel *ch;	__cms_handle_t *hd; 	SaMsgHandleT * key;	int pipefd[2];	cl_log_set_entity("libcms");	cl_log_set_facility(LOG_USER);#ifdef DEBUG_LIBRARY	cl_log_enable_stderr(TRUE);#endif	if ((!version)	||	version->releaseCode < 'A' || version->releaseCode > 'Z'	||	(version->releaseCode << 8) + (version->major << 4) +		version->minor > (AIS_VERSION_RELEASE_CODE << 8) +		(AIS_VERSION_MAJOR << 4) + AIS_VERSION_MINOR) {		cl_log(LOG_ERR, "AIS library version is lower then required");		return SA_ERR_VERSION;	}	if (!msgHandle)		return SA_ERR_INVALID_PARAM;	if (!(ch = cms_channel_conn())) {		cl_log(LOG_ERR, "cms_channel_conn failed.");		return SA_ERR_LIBRARY;	}	if (pipe(pipefd) == -1) {		cl_log(LOG_ERR, "create pipe failed");		return SA_ERR_LIBRARY;        }	/*	 * Write something to the pipe but we never read so that	 * select to this fd will always return immediately.	 */	if (write(pipefd[1], "ACTIVE", 6) < 0) {		cl_log(LOG_ERR, "write pipe failed\n");		return SA_ERR_LIBRARY;	}	cmsclient_hash_init();	dprintf("ch_status = %d\n", ch->ch_status);	dprintf("farside_pid = %d\n", ch->farside_pid);	hd = (__cms_handle_t *)ha_malloc(sizeof(__cms_handle_t));	memset(hd, 0, sizeof(__cms_handle_t));	hd->queue_handle_hash = g_hash_table_new(g_int_hash, g_int_equal);	hd->ch = ch;	if (msgCallbacks) {		memcpy(&(hd->callbacks), msgCallbacks, sizeof(SaMsgCallbacksT));	} else {		memset(&(hd->callbacks), 0, sizeof(SaMsgCallbacksT));	}	*msgHandle = __cmshandle_counter++;	hd->service_handle = *msgHandle;	hd->active_fd = pipefd[0];	hd->backup_fd = -1;	key = (SaMsgHandleT *) ha_malloc(sizeof(SaMsgHandleT));	key = msgHandle;	g_hash_table_insert(__cmshandle_hash, key, hd);	return SA_OK;}SaErrorTsaMsgFinalize(SaMsgHandleT *msgHandle){	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	g_hash_table_foreach_remove(hd->queue_handle_hash, msgqueue_remove, hd);	g_hash_table_remove(__cmshandle_hash, msgHandle);	if (hd->backup_fd >= 0)		restore_poll(hd);	hd->ch->ops->destroy(hd->ch);	close(hd->active_fd);	/* TODO: need to free the glist on the dispatch queue */	ha_free(hd);	return SA_OK;}SaErrorTsaMsgQueueOpen(const SaMsgHandleT *msgHandle,               const SaNameT *queueName,               const SaMsgQueueCreationAttributesT *creationAttributes,               SaMsgQueueOpenFlagsT openFlags,               SaTimeT timeout,               SaMsgQueueHandleT *queueHandle){	int ret;	client_mqueue_open_t cmg;	client_mqueue_open_t *rcmg;	client_header_t * reply;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (!openFlags)		return SA_ERR_BAD_FLAGS;	if (bad_saname(queueName) || !queueHandle	||	(!creationAttributes && (openFlags & SA_MSG_QUEUE_CREATE))	||	(creationAttributes && !(openFlags & SA_MSG_QUEUE_CREATE)))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUE_OPEN;	cmg.header.name = *queueName;	if (creationAttributes) {		if ((creationAttributes->creationFlags != 0)		&&	creationAttributes->creationFlags !=				SA_MSG_QUEUE_PERSISTENT)			return SA_ERR_BAD_FLAGS;		cmg.attr = *creationAttributes;	} else {		/*		 * else set to -1, so that daemon knows client didn't		 * provide a creationAttributes		 */		cmg.attr.creationFlags = -1;	}	cmg.openflag = openFlags;	cmg.invocation = 0;	cmg.policy = 0;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (openFlags & SA_MSG_QUEUE_RECEIVE_CALLBACK	&&	!(hd->callbacks).saMsgMessageReceivedCallback)		return SA_ERR_INIT;	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);        dprintf("%s: cmsclient_message_send returns %d\n", __FUNCTION__, ret);	/*	 * We should only have one client blocking for it.	 */	ret = wait_for_msg(hd, CMS_QUEUE_OPEN, queueName, &reply, timeout);	if (ret != SA_OK) {		return ret;	}	rcmg = (client_mqueue_open_t *) reply;	if ((ret = (rcmg->header).flag) == SA_OK) {		SaMsgQueueHandleT *key;		__cms_queue_handle_t *qhd;		key = (SaMsgQueueHandleT *)				ha_malloc(sizeof(SaMsgQueueHandleT));		qhd = (__cms_queue_handle_t *)				ha_malloc(sizeof(__cms_queue_handle_t));		memset(qhd, 0, sizeof(__cms_queue_handle_t));		qhd->queue_handle = rcmg->handle;		qhd->queue_name = *queueName;		qhd->cms_handle = hd;		*key = qhd->queue_handle;		g_hash_table_insert(hd->queue_handle_hash, key, qhd);		g_hash_table_insert(__mqhandle_hash, key, hd);		*queueHandle = *key;	}	ha_free(rcmg);	return ret;}SaErrorTsaMsgQueueClose(SaMsgQueueHandleT *queueHandle){	int ret;	client_mqueue_close_t cmg;	client_header_t *rcmg;	__cms_handle_t *hd = NULL; 	__cms_queue_handle_t *qhd;	SaNameT * qname;	gpointer origkey, orighd;	if (g_hash_table_lookup_extended(__mqhandle_hash, queueHandle,			&origkey, &orighd)) {		hd = (__cms_handle_t *) orighd;	};	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, queueHandle ? *queueHandle : -1);		return SA_ERR_BAD_HANDLE;	}	qhd = g_hash_table_lookup(hd->queue_handle_hash, queueHandle); 	if (!qhd) {		cl_log(LOG_ERR, "%s: Cannot find handlle [%d]"		,	__FUNCTION__, queueHandle ? *queueHandle : -1);		return SA_ERR_BAD_HANDLE;	}	qname = &(qhd->queue_name);	cmg.header.type = CMS_QUEUE_CLOSE;	cmg.header.name = *qname;	cmg.handle = *queueHandle;	cmg.silent = FALSE;	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	ret = wait_for_msg(hd, CMS_QUEUE_CLOSE, qname, &rcmg, SA_TIME_END);	if (ret != SA_OK) 		return ret;	if ((ret = rcmg->flag) == SA_OK) {		g_hash_table_remove(hd->queue_handle_hash, queueHandle);		g_hash_table_remove(__mqhandle_hash, queueHandle);		ha_free(origkey);		ha_free(qhd);		/* TODO: free the queue msgs. */	}	ha_free((client_mqueue_close_t *) rcmg);	return ret;}static voidlookup_queuehandle(gpointer key, gpointer value, gpointer user_data){	char * qname;	__cms_queue_handle_t *qhd = (__cms_queue_handle_t *)value;	char * name = (char *)user_data;	SaMsgQueueHandleT *queueHandle = (SaMsgQueueHandleT *)key;	qname = saname2str(qhd->queue_name);	if (!strcmp(qname, name)) {		g_hash_table_remove(qhd->cms_handle->queue_handle_hash, key);	}	g_hash_table_remove(__mqhandle_hash, queueHandle);}SaErrorT saMsgQueueUnlink(SaMsgHandleT *msgHandle, const SaNameT *queueName){	int ret;	char * name;	client_mqueue_unlink_t cmg;	client_header_t *rcmg;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (bad_saname(queueName))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUE_UNLINK;	cmg.header.name = *queueName;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	ret = wait_for_msg(hd, CMS_QUEUE_UNLINK, queueName, &rcmg, 			SA_TIME_END);	if (ret != SA_OK) 		return ret;	/*	 * remove from the mq from queue_handle_hash if possible	 */	name = saname2str(*queueName);	g_hash_table_foreach(hd->queue_handle_hash, lookup_queuehandle, name);	ret = rcmg->flag;	ha_free((client_mqueue_unlink_t *) rcmg);	ha_free(name);	return ret;}SaErrorT saMsgQueueStatusGet(SaMsgHandleT *msgHandle, const SaNameT *queueName,	SaMsgQueueStatusT *queueStatus){	int ret;	client_mqueue_status_t cmg;	client_mqueue_status_t *rcmg;	client_header_t * reply;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (bad_saname(queueName))		return SA_ERR_INVALID_PARAM;	cmg.header.type = CMS_QUEUE_STATUS;	cmg.header.name = *queueName;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd, sizeof(cmg), &cmg);	ret = wait_for_msg(hd, CMS_QUEUE_STATUS, queueName, &reply,			SA_TIME_END);	if (ret != SA_OK) 		return ret;	rcmg = (client_mqueue_status_t *) reply;	ret = reply->flag;	if (ret == SA_OK) 		*queueStatus = rcmg->qstatus;	ha_free((client_mqueue_status_t *) reply);	return ret;}SaErrorTsaMsgMessageSend(const SaMsgHandleT *msgHandle,                 const SaNameT *destination,                 const SaMsgMessageT *message,                 SaMsgAckFlagsT ackFlags,                 SaTimeT timeout){	client_message_t *cmg;	client_header_t *rcmg;	client_message_ack_t * ack;	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (message->priority > SA_MSG_MESSAGE_LOWEST_PRIORITY 	||	!(ackFlags & SA_MSG_MESSAGE_DELIVERED_ACK))		return SA_ERR_INVALID_PARAM;	if (ackFlags & ~SA_MSG_MESSAGE_DELIVERED_ACK)		return SA_ERR_BAD_FLAGS;	cmg = (client_message_t *)		ha_malloc(sizeof(client_message_t) + message->size);	cmg->header.type = CMS_MSG_SEND;	cmg->header.name = *destination;	cmg->msg = *message;	cmg->invocation = 0;	cmg->data = cmg + 1;	memcpy(cmg->data, message->data, message->size);	cmg->ack = SA_MSG_MESSAGE_DELIVERED_ACK; /* according to the spec */	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (ackFlags & SA_MSG_MESSAGE_DELIVERED_ACK	&&	!(hd->callbacks).saMsgMessageDeliveredCallback)		return SA_ERR_INIT;	ret = cmsclient_message_send(hd,			sizeof(client_message_t) + message->size, cmg);	ha_free(cmg);	while (1) {		ret = wait_for_msg(hd, CMS_MSG_ACK,				destination, &rcmg, timeout);		if (ret != SA_OK) 			return ret;		ret = rcmg->flag;		ack = (client_message_ack_t *) rcmg;		/*		 * CMS_MSG_SEND is a blocking call, so we can only		 * have one client waiting for it. Thus when we get		 * an ACK that is for the request type CMS_MSG_SEND,		 * we know this is the ACK we are waiting for.		 */		dprintf("type is %d\n", ack->send_type);		if (ack->send_type == CMS_MSG_SEND) {			ha_free((client_message_t *) rcmg);			return ret;		} else {			enqueue_dispatch_msg(hd, rcmg);		}	}}SaErrorTsaMsgMessageSendAsync(const SaMsgHandleT *msgHandle,                      SaInvocationT invocation,                      const SaNameT *destination,                      const SaMsgMessageT *message,                      SaMsgAckFlagsT ackFlags){	client_message_t *cmg;	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	cmg = (client_message_t *)			ha_malloc(sizeof(client_message_t) + message->size);	cmg->header.type = CMS_MSG_SEND_ASYNC;	cmg->header.name = *destination;	cmg->msg = *message;	cmg->invocation = invocation;	cmg->data = (char *)cmg + sizeof(client_message_t);	memcpy(cmg->data, message->data, message->size);	cmg->ack = ackFlags;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	ret = cmsclient_message_send(hd,			sizeof(client_message_t) + message->size, cmg);	ha_free(cmg);	return ret == IPC_OK ? SA_OK : SA_ERR_LIBRARY;}static intrequest_for_message(__cms_handle_t * hd, const SaNameT * name){	client_header_t request_msg;	request_msg.type = CMS_MSG_REQUEST;	request_msg.name = *name;	return cmsclient_message_send(hd, sizeof(request_msg), &request_msg);}SaErrorTsaMsgMessageGet(const SaMsgQueueHandleT *queueHandle,                SaMsgMessageT *message,                SaMsgMessageInfoT *messageInfo,                SaTimeT timeout){	int ret;	SaErrorT error = SA_OK;	client_message_t * cmg;	client_header_t *rcmg;	__cms_handle_t *hd = GET_MQ_HANDLE(queueHandle);	__cms_queue_handle_t *qhd;	SaNameT * qname;	int freecmg = 0;	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by queueHandle [%d]"		,	__FUNCTION__, queueHandle ? *queueHandle : -1);		return SA_ERR_BAD_HANDLE;	}	if (!messageInfo || !message) 		return SA_ERR_INVALID_PARAM;	memset(messageInfo, 0, sizeof(SaMsgMessageInfoT));	qhd = g_hash_table_lookup(hd->queue_handle_hash, queueHandle);	assert(qhd != NULL);	qname = &(qhd->queue_name);	/*	 * request a message from daemon	 */	while (1) {		request_for_message(hd, qname);		ret = wait_for_msg(hd, CMS_MSG_GET | CMS_MSG_NOTIFY, qname, &rcmg,				   timeout);		if (ret != SA_OK) {			cl_log(LOG_ERR, "wait_for_msg error [%d]", ret);			return ret;		}		if (rcmg->type == CMS_MSG_NOTIFY) {			dprintf("Received CMS_MSG_NOTIFY\n");			continue;		} else 			break;	} 	cmg = (client_message_t *)rcmg;	cmg->data = (void *)((char *)cmg + sizeof(client_message_t));	dprintf("message.data is [%s]\n", (char *)cmg->data);	if (cmg->senderId) {		messageInfo->senderId = cmg->senderId;	}	if (message->size < cmg->msg.size) 		error = SA_ERR_NO_SPACE;	message->size = (cmg->msg).size;	if (message->data) {		memcpy(message->data, cmg->data, 			(cmg->msg.size > message->size ? 			 message->size : cmg->msg.size));		freecmg = 1;	} else {		message->data = cmg->data;	}	message->type = cmg->msg.type;	message->version = cmg->msg.version;	message->priority = cmg->msg.priority;	if (freecmg) 		ha_free(cmg);	/* TODO: message info */	return error;}SaErrorT saMsgMessageReceivedGet(const SaMsgQueueHandleT *queueHandle,                        const SaMsgMessageHandleT *messageHandle,                        SaMsgMessageT *message,                        SaMsgMessageInfoT *messageInfo){	return SA_ERR_NOT_SUPPORTED;}SaErrorT saMsgMessageCancel(const SaMsgQueueHandleT *queueHandle){	return SA_ERR_NOT_SUPPORTED;}SaErrorTsaMsgSelectionObjectGet(const SaMsgHandleT *msgHandle, 		SaSelectionObjectT *selectionObject){	int ret;	__cms_handle_t *hd = GET_CMS_HANDLE(msgHandle);	if (hd == NULL) {		cl_log(LOG_ERR, "%s: Cannot find hd by handlle [%d]"		,	__FUNCTION__, msgHandle ? *msgHandle : -1);		return SA_ERR_BAD_HANDLE;	}	dprintf("hd->backup_fd is [%d]\n", hd->backup_fd);	ret = hd->backup_fd >= 0 ? hd->active_fd : 		hd->ch->ops->get_recv_select_fd(hd->ch);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -