⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_client.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 2 页
字号:
	if ((mq = mqueue_table_lookup(mqname, NULL)) == NULL) {		cl_log(LOG_ERR, "%s: cannot find mq [%s]", __FUNCTION__,mqname);		ha_free(mqname);		return FALSE;	}	if ((message = dequeue_message(mq)) == NULL) {		cl_log(LOG_ERR, "%s: No message found for mq [%s], block"		,	__FUNCTION__, mqname);		ha_free(mqname);		return TRUE;	}	dprintf("%s: dequeue_message [%s]\n", __FUNCTION__	,	(char *)message->msg.data);	m = (client_message_t *)			ha_malloc(sizeof(client_message_t) + message->msg.size);	m->header.type = CMS_MSG_GET;        m->header.len = sizeof(client_message_t) + message->msg.size;        m->header.flag = SA_OK;	m->header.name = msg->name;        m->handle = mq->handle;	m->senderId = message->msgInfo.senderId;        m->msg = message->msg;        m->msg.data = NULL;        m->data = m + 1;        memcpy(m->data, message->msg.data, message->msg.size);	client_send_msg(client, m->header.len, m);	/* TODO: needs fix.  	   Can only be called after the msg_done */	ha_free(message);	ha_free(mqname);	ha_free(m);	return TRUE;}intclient_process_mqgroup_insert(IPC_Channel * client, client_header_t * msg,			      cms_data_t * cmsdata){	mqueue_t *mq, *mqg;	char *mqname, *mqgname;	client_mqgroup_ops_t * m = (client_mqgroup_ops_t *)msg;	client_header_t reply;	mqname = saname2str(m->header.name);	mqgname = saname2str(m->qgname);	reply.type = msg->type;	reply.len = sizeof(client_header_t);	reply.flag = SA_OK;	reply.name = msg->name;	mqg = mqueue_table_lookup(mqgname, NULL);	if (mqg == NULL)		goto bad_queue;	if (mqg->policy == 0)	/* not a queue group */		goto bad_queue;	mq = mqueue_table_lookup(mqname, NULL);	if (mq == NULL)		goto bad_queue;	/*	 * check if the queue is already in the queue group	 */	if (g_list_find(mqg->list, mq) == NULL) {		mqg->list = g_list_append(mqg->list, mq);		cl_log(LOG_INFO, "Adding mq <%p> to [%s] list", mq, mqgname); 		if (mqg->current == NULL)			mqg->current = g_list_first(mqg->list);		/*		 * Notify other nodes the change		 */		request_mqgroup_insert(mqgname, mqname, cmsdata);	} else {		reply.flag =  SA_ERR_EXIST;		client_send_msg(client, reply.len, &reply);		goto exit;	}	client_send_msg(client, reply.len, &reply);	goto exit;bad_queue:	reply.flag = SA_ERR_NOT_EXIST;	client_send_msg(client, reply.len, &reply);exit:	ha_free(mqname);	ha_free(mqgname);	return TRUE;}intclient_process_mqgroup_remove(IPC_Channel * client, client_header_t * msg,			      cms_data_t * cmsdata){	mqueue_t *mq, *mqg;	char *mqname, *mqgname;	client_mqgroup_ops_t * m = (client_mqgroup_ops_t *)msg;	client_header_t reply;	mqname = saname2str(m->header.name);	mqgname = saname2str(m->qgname);	reply.type = msg->type;	reply.len = sizeof(client_header_t);	reply.flag = SA_OK;	reply.name = msg->name;	mqg = mqueue_table_lookup(mqgname, NULL);	if (mqg == NULL)		goto error;	if (!IS_MQGROUP(mqg))	/* not a queue group */		goto error;	mq = mqname_lookup(mqname, NULL);	if (mq == NULL)		goto error;	/*	 * check if the queue is already removed from the queue group	 */	if (g_list_find(mqg->list, mq) != NULL) {		mqgroup_unref_mqueue(mqg, mq);		/*		 * Notify other nodes the change		 */		request_mqgroup_remove(mqgname, mqname, cmsdata);	} else		goto error;	client_send_msg(client, reply.len, &reply);	goto exit;error:	reply.flag = SA_ERR_NOT_EXIST;	client_send_msg(client, reply.len, &reply);exit:	ha_free(mqname);	ha_free(mqgname);	return TRUE;}intclient_process_mqgroup_track(IPC_Channel * client, client_header_t * msg){	mqueue_t *mqg;	char * gname;	int group;	client_mqgroup_mem_t * m = (client_mqgroup_mem_t *)msg;	client_mqgroup_notify_t * rmsg;	rmsg = (client_mqgroup_notify_t *)			ha_malloc(sizeof(client_mqgroup_notify_t));	rmsg->header.type = msg->type;	rmsg->header.len = sizeof(client_mqgroup_notify_t);	rmsg->header.flag = SA_OK;	rmsg->header.name = msg->name;	gname = saname2str(m->group_name);	mqg = mqueue_table_lookup(gname, &group);	if (mqg == NULL || group == FALSE)		goto noexist;	if ((m->flag & SA_TRACK_CHANGES)	||      (m->flag & SA_TRACK_CHANGES_ONLY)) {		client_mqgroup_track_t * track;		track = (client_mqgroup_track_t *)				ha_malloc(sizeof(client_mqgroup_track_t));		if (track == NULL) {			cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);			rmsg->header.flag = SA_ERR_NO_MEMORY;			client_send_msg(client, rmsg->header.len			,	(client_header_t *)rmsg);					ha_free(rmsg);			return TRUE;		}		track->ch = client;		track->flag = m->flag;				cl_log(LOG_INFO, "%s: append ch to [%s] notify_list,flag = <%d>"		,	__FUNCTION__, gname, track->flag);		mqg->notify_list = g_list_append(mqg->notify_list, track);	}	if (m->flag & SA_TRACK_CURRENT) {		notify_buffer_t buf;		size_t length;		buf.number = 0;		buf.change_buff = NULL;		g_list_foreach(mqg->list, mqueue_copy_notify_data, &buf);		length = buf.number * sizeof(SaMsgQueueGroupNotificationT);		rmsg = realloc(rmsg, sizeof(client_mqgroup_notify_t) + length);		rmsg->data = (char *)rmsg + sizeof(client_mqgroup_notify_t);		memcpy(rmsg->data, buf.change_buff, length);		ha_free(buf.change_buff);		rmsg->number = buf.number;		rmsg->policy = buf.policy;		rmsg->group_name = buf.name;		rmsg->header.len += length;	}			client_send_msg(client, rmsg->header.len, (client_header_t *)rmsg);	goto exit;noexist:	rmsg->header.flag = SA_ERR_NOT_EXIST;	client_send_msg(client, rmsg->header.len, (client_header_t *)rmsg);exit:	ha_free(gname);	ha_free(rmsg);	return TRUE;}static gintcompare_client(gconstpointer a, gconstpointer b){	const client_mqgroup_track_t * c;	const IPC_Channel * d;	c = (const client_mqgroup_track_t *)a;	d = (const IPC_Channel *)b;	if (c->ch == d)		return TRUE;	else		return FALSE;}intclient_process_mqgroup_track_stop(IPC_Channel * client, client_header_t * msg){	mqueue_t *mqg;	char * gname;	int group;	client_mqgroup_track_t * track;	client_mqgroup_mem_t * m = (client_mqgroup_mem_t *)msg;	client_header_t reply;	reply.type = msg->type;	reply.len = sizeof(client_header_t);	reply.flag = SA_OK;	reply.name = msg->name;	gname = saname2str(m->group_name);	mqg = mqueue_table_lookup(gname, &group);	if (mqg == NULL || group == FALSE)		goto noexist;	track = (client_mqgroup_track_t *)g_list_find_custom(mqg->notify_list,				client, compare_client);	if (track != NULL) {		mqg->notify_list = g_list_remove(mqg->notify_list, track);		ha_free(track);	}	client_send_msg(client, reply.len, &reply);	goto exit;noexist:	reply.flag = SA_ERR_NOT_EXIST;	client_send_msg(client, reply.len, &reply);exit:	ha_free(gname);	return TRUE;}static voidcms_client_msg_done(IPC_Message * msg){	client_header_t * message;	size_t msg_type;	//mqueue_t * mq;	//client_message_t * m = (client_message_t *) message;	message = msg->msg_body;	msg_type = message->type;	dprintf("cms_client_msg_done, type = %d\n", (int)msg_type);#if 0	/* update the buffer size */	if (msg_type == CMS_MSG_SEND || msg_type == CMS_MSG_SEND_ASYNC)		if ((mq = mqname_lookup(mqname, NULL)))			mqueue_update_usage(mq, m->msg.priority, -m->msg.size);#endif	ha_free(msg->msg_private);	return;}static voidcms_client_msg_done_freeclient(IPC_Message * msg){	client_header_t * message;	size_t msg_type;	message = msg->msg_body;	msg_type = message->type;	dprintf("cms_client_msg_done, type = %d\n", (int)msg_type);	ha_free(msg->msg_private);	ha_free(msg->msg_ch);	return;}intclient_send_msg(IPC_Channel * client, size_t len, gpointer data){	int ret;	IPC_Message * msg;	CMS_TRACE();	if ((msg = ha_malloc(sizeof(IPC_Message) + len)) == NULL) {		cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);		return FALSE;	}#if DEBUG_MEMORY	dprintf("%s (%p) ha_malloc %p, size 0x%x\n", __FUNCTION__	,	&client_send_msg, msg, sizeof(IPC_Message) + len);#endif	msg->msg_body = msg + 1;	memcpy(msg->msg_body, data, len);	msg->msg_len = len;	msg->msg_done = cms_client_msg_done;	msg->msg_private = msg;	msg->msg_ch = client;	msg->msg_buf = NULL;	ret = client->ops->send(client, msg);	if (ret == IPC_OK) 		return TRUE;	else 		return FALSE;}/* This function send the message thru the client and free the client * memory afterward. This has to be done here since it is async. --YZ */intclient_send_msg_freeclient(IPC_Channel * client, size_t len, gpointer data){	int ret;	IPC_Message * msg;	CMS_TRACE();	if ((msg = ha_malloc(sizeof(IPC_Message) + len)) == NULL) {		cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);		return FALSE;	}#if DEBUG_MEMORY	dprintf("%s (%p) ha_malloc %p, size 0x%x\n", __FUNCTION__	,	&client_send_msg, msg, sizeof(IPC_Message) + len);#endif	msg->msg_body = msg + 1;	memcpy(msg->msg_body, data, len);	msg->msg_len = len;	msg->msg_done = cms_client_msg_done_freeclient;	msg->msg_private = msg;	msg->msg_ch = client;	msg->msg_buf = NULL;	ret = client->ops->send(client, msg);	if (ret == IPC_OK) 		return TRUE;	else 		return FALSE;}intclient_send_error_msg(IPC_Channel * client, const char * name,		      size_t type, SaErrorT error){	client_header_t msg;	msg.type = type;	msg.len = sizeof(client_header_t);	msg.flag = error;	str2saname(&msg.name, name);	return client_send_msg(client, msg.len, &msg);}intclient_send_qstatus(IPC_Channel * client, mqueue_t * queue, int flag){	client_mqueue_status_t qstatus;	memset(&qstatus, 0, sizeof(client_mqueue_status_t));	qstatus.header.type = CMS_QUEUE_STATUS;	qstatus.header.len = sizeof(client_mqueue_status_t);	qstatus.header.flag = flag;	str2saname(&qstatus.header.name, queue->name);	qstatus.qstatus = queue->status;	return client_send_msg(client, qstatus.header.len, &qstatus);}intclient_send_client_qopen(IPC_Channel * client, mqueue_request_t * request, 			 guint handle, int flag){	client_mqueue_open_t qopen;	CMS_TRACE();	if (!request)		return HA_FAIL;	memset(&qopen, 0, sizeof(client_mqueue_open_t));	qopen.header.type = request->request_type;	qopen.header.len = sizeof(client_mqueue_open_t);	qopen.header.flag = flag;	str2saname(&qopen.header.name, request->qname);	qopen.handle = handle;	qopen.invocation = request->invocation;	return client_send_msg(client, qopen.header.len, &qopen);}intclient_send_ack_msg(IPC_Channel * client, mqueue_request_t * request, 		    guint handle, int flag){	client_message_ack_t ack;	if (!request)		return HA_FAIL;	memset(&ack, 0, sizeof(client_message_ack_t));	ack.header.type = CMS_MSG_ACK;	ack.header.len = sizeof(client_message_ack_t);	ack.header.flag = flag;	str2saname(&ack.header.name, request->qname);	ack.handle = handle;	ack.invocation = request->invocation;	ack.send_type = request->request_type;	return client_send_msg(client, ack.header.len, &ack);}int client_send_notready_msg(IPC_Channel * client, client_header_t * msg){	client_header_t reply;	reply.type = msg->type;	reply.len = sizeof(client_header_t);	reply.name = msg->name;	reply.flag = SA_ERR_TRY_AGAIN;	cl_log(LOG_ERR, "CMS is still waiting to receive message queue "			"updates.  Please try again later.");	return client_send_msg(client, reply.len, &reply);}int client_process_mqsend_reply(IPC_Channel * client, client_header_t * msg, cms_data_t * cmsdata){	SaErrorT error;	mqueue_request_t request;	IPC_Channel *cli = NULL; 	unsigned long * seq = NULL;	client_message_t * m = (client_message_t *) msg;	client_message_ack_t reply;	/* no queue name for this msg */	request.qname = NULL;	request.gname = NULL;	request.request_type = m->header.type;	dprintf("request.request_type is %d\n", request.request_type);	request.invocation = m->invocation;	request.ack = m->ack;	request.sendreceive = 0;	request.seq = gSendSeqNo++; 	m->data = (void *)((char *)msg + sizeof(client_message_t));	m->msg.data = m->data;	if (request.ack) {		dprintf("%s: insert ack packet: ", __FUNCTION__);		if ((cli = (IPC_Channel *) ha_malloc(sizeof(IPC_Channel))) 				== NULL 		||  (seq = (unsigned long *) ha_malloc(sizeof(unsigned long))) 				== NULL ) {			cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);			if (cli)				ha_free(cli);			error = SA_ERR_NO_MEMORY;			goto error;		}		*cli = *client;		*seq = request.seq;		dprintf("seq = %ld, client = %p\n", *seq, cli);		g_hash_table_insert(mq_ack_pending_hash, seq, cli);	} 	if (send_mq_reply(&request, m->senderId, &(m->msg), cmsdata) != TRUE) {		cl_log(LOG_ERR, "%s: mqname_send failed", __FUNCTION__);		error = SA_ERR_LIBRARY;		goto error;	}	return TRUE;error:	/* This is actually a error respond instead of a ACK. */	memset(&reply, 0, sizeof(client_message_ack_t));	reply.header.type = CMS_MSG_ACK;	reply.header.len = sizeof(client_message_ack_t);	reply.header.flag = error;	reply.header.name = msg->name;	reply.send_type = msg->type;	reply.invocation = m->invocation;	client_send_msg(client, reply.header.len, &reply);	return TRUE;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -