⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_client.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * cms_client.c: cms daemon client operation * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <sys/types.h>#include <sys/stat.h>#include <string.h>#include <stdlib.h>#include <assert.h>#include <clplumbing/cl_log.h>#include <clplumbing/GSource.h>#include <heartbeat.h>#include "cms_common.h"extern GHashTable * mq_open_pending_hash;extern GHashTable * mq_status_pending_hash;extern GHashTable * mq_ack_pending_hash;extern GHashTable * cms_client_table;static unsigned long gSendSeqNo = 0;static gbooleandelete_cms_client(gpointer key, gpointer value, gpointer user_data){	/* memory leak here? */	return TRUE;}voidcms_client_input_destroy(gpointer user_data){	dprintf("%s: received HUP.\n", __FUNCTION__);	return;}intcms_client_init(cms_data_t * cmsdata){	IPC_WaitConnection * wait_ch = NULL;	mode_t mask;	GHashTable * attrs;	char path[] = IPC_PATH_ATTR;	char domainsocket[] = IPC_DOMAIN_SOCKET;	char cms_socket[] = CMS_DOMAIN_SOCKET;	cl_log(LOG_INFO, "initialize client tables and wait channel.");	attrs = g_hash_table_new(g_str_hash, g_str_equal);	g_hash_table_insert(attrs, path, cms_socket);	mask = umask(0);	wait_ch = ipc_wait_conn_constructor(domainsocket, attrs);	if (!wait_ch) {		cl_perror("Can't create wait channel");		return HA_FAIL;	}	mask = umask(mask);	g_hash_table_destroy(attrs);	cmsdata->wait_ch = wait_ch;	cmsdata->client_table = g_hash_table_new(g_int_hash, g_int_equal);	return HA_OK;}intcms_client_add(GHashTable ** cms_client_table, struct IPC_CHANNEL * newclient) {	cms_client_t * cms_client;	pid_t * key;	if (!cms_client_table) {		cl_log(LOG_ERR, "cms: can't find client table"); 		return HA_FAIL;	}	cms_client = g_hash_table_lookup(*cms_client_table,			&(newclient->farside_pid));	if (cms_client) {		cms_client->channel_count++;		dprintf("farside_pid [%d] already exists, channel_count [%d]\n"		,	newclient->farside_pid, cms_client->channel_count);		return HA_OK;	}	cms_client = (cms_client_t *) ha_malloc(sizeof(cms_client_t));	dprintf("Add farside_pid [%d] to daemon <%p>\n"	,	newclient->farside_pid, *cms_client_table);	cms_client->channel_count = 1;	cms_client->opened_mqueue_list = NULL;	key = (pid_t *)ha_malloc(sizeof(pid_t));	*key = newclient->farside_pid;	g_hash_table_insert(*cms_client_table, key, cms_client);	return HA_OK;}voidcms_client_close_all(GHashTable * cms_client_table){	dprintf("In Func %s ...\n", __FUNCTION__);	if (g_hash_table_size(cms_client_table)) {		g_hash_table_foreach_remove(cms_client_table, 				delete_cms_client, NULL);	}	return;}intclient_process_qstatus(IPC_Channel * client, client_header_t *msg, cms_data_t * cmsdata){	client_mqueue_status_t * qstatus_msg;	mqueue_t * queue;	SaErrorT error;	client_header_t reply;	char * mqname;	qstatus_msg = (client_mqueue_status_t *) msg;	mqname = saname2str(qstatus_msg->header.name);	/* get the queue/host mapping 	 * 	 * this function should be deterministic based 	 * only on the queue name and the list of cluster	 * membership available.  	 */	if ((queue = mqname_lookup(mqname, NULL)) == NULL) {		dprintf("%s, queue %s not found.\n", __FUNCTION__, mqname);		error = SA_ERR_NOT_EXIST;		reply.type = msg->type;		reply.len = sizeof(client_header_t);		reply.flag = error;		reply.name = msg->name;		client_send_msg(client, reply.len, &reply);		return TRUE;	};	dprintf("%s, queue %s found.\n", __FUNCTION__, mqname);	if (!is_cms_online(queue->host)) {		cl_log(LOG_WARNING, "%s, cms daemon offline on [%s], return BAD_HANDLE for queue status.", __FUNCTION__, queue->host);		error = SA_ERR_BAD_HANDLE;		reply.type = msg->type;		reply.len = sizeof(client_header_t);		reply.flag = error;		reply.name = msg->name;		client_send_msg(client, reply.len, &reply);		return TRUE;	}	/* Get queueUsed and numberOfMessages for saMsgQueueUsage[4]	 * from mqueue open node.	 */	g_hash_table_insert(mq_status_pending_hash, mqname, client);	request_mqueue_status(queue, cmsdata);	return TRUE;}intclient_process_mqopen(IPC_Channel * client, client_header_t * msg,		      cms_data_t * cmsdata){	mqueue_request_t request;	IPC_Channel *cli;	client_mqueue_open_t * m = (client_mqueue_open_t *) msg;	mqueue_t * mq;	char * mqname;	int i;	SaErrorT error = SA_OK;	mqname = saname2str(m->header.name);	if (g_hash_table_lookup(mq_open_pending_hash, mqname)) {		cl_log(LOG_WARNING, "%s: mqname [%s] open pending from local"		,	__FUNCTION__, mqname);		request.qname = mqname;		request.gname = NULL;		request.request_type = m->header.type;		request.invocation = 0;		client_send_client_qopen(client, &request, -1, SA_ERR_EXIST);		ha_free(mqname);		return TRUE;	}	/* Search from local database firstly, if local database checking	 * fails, we don't continue.	 *	 * Don't worry about retention time here, the original mqueue	 * owner will deal with it when receiving REOPEN from hb.	 */	if ((mq = mqname_lookup(mqname, NULL)) != NULL) {		if (mq->mqstat != MQ_STATUS_CLOSE)			error = SA_ERR_EXIST;		/* Client provide a creationAttributes, but it is 		 * different from what we already have.		 */		else if ((m->attr.creationFlags != -1)		&&	(mq->status.openFlags != m->openflag		||	mq->status.creationFlags != m->attr.creationFlags		||	mq->status.retentionTime != m->attr.retentionTime))			error = SA_ERR_EXIST;	}	if (!mq && (m->header.type != CMS_QUEUEGROUP_CREATE)	&&	!(m->openflag & SA_MSG_QUEUE_CREATE)) {		error = SA_ERR_NOT_EXIST;		cl_log(LOG_INFO, "%s: SA_MSG_QUEUE_CREATE not provided for %s"		,	__FUNCTION__, mqname);	}	if (error == SA_OK)		goto proceed;	request.qname = mqname;	request.gname = NULL;	request.request_type = m->header.type;	request.invocation = 0;	client_send_client_qopen(client, &request, -1, error);	ha_free(mqname);	return TRUE;proceed:	if ((cli = (IPC_Channel *) ha_malloc(sizeof(IPC_Channel))) == NULL) {		cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);		ha_free(mqname);		return FALSE;	}	*cli = *client;	mq = (mqueue_t *) ha_malloc(sizeof(mqueue_t));	if (!mq) {		cl_log(LOG_ERR, "%s: ha_malloc for mqueue_t failed.\n"		,	__FUNCTION__);		ha_free(cli);		ha_free(mqname);		return FALSE;	}	memset(mq, 0, sizeof(mqueue_t));	mq->name = mqname;	mq->policy = m->policy;	mq->mqstat = MQ_STATUS_OPEN_PENDING;	mq->client = cli;	g_hash_table_insert(mq_open_pending_hash, mq->name, mq);	request.qname = mqname;	request.gname = NULL;	request.request_type = m->header.type;	request.invocation = m->invocation;	request.policy = m->policy;	request.create_flag = m->attr.creationFlags;	request.retention = m->attr.retentionTime;	request.ack = 1; 	for (i = SA_MSG_MESSAGE_HIGHEST_PRIORITY	;	i <= SA_MSG_MESSAGE_LOWEST_PRIORITY; i++)		request.size[i] = m->attr.size[i];	cl_log(LOG_INFO, "%s: invocation = %d, policy = %d, type = %d"	,	__FUNCTION__, request.invocation, request.policy	,	request.request_type);	if (request_mqname_open(&request, cmsdata) == FALSE) {		cl_log(LOG_ERR, "%s: cluster_request_mqname failed"		,	__FUNCTION__);		g_hash_table_remove(mq_open_pending_hash, mqname);		ha_free(cli);		ha_free(mq);		ha_free(mqname);		return FALSE;	}	return TRUE;}intclient_process_mqclose(IPC_Channel * client, client_header_t * msg,		       cms_data_t * cmsdata){	client_mqueue_close_t * m = (client_mqueue_close_t *) msg;	mqueue_t *mq;	client_header_t reply;	cms_client_t * cms_client;	CMS_TRACE();	mq = mqueue_handle_lookup(&(m->handle), NULL);	if (mq == NULL) {		reply.type = msg->type;		reply.len = sizeof(client_header_t);		reply.flag = SA_ERR_NOT_EXIST;		reply.name = msg->name;		client_send_msg(client, reply.len, &reply);		cl_log(LOG_WARNING, "%s: Cannot find mq by handle [%u]"		,	__FUNCTION__, m->handle);		return TRUE;	}	mq->status.closeTime = get_current_satime();	/*	 * close the mqueue in the cluster	 */	if (request_mqname_close(mq->name, cmsdata) == FALSE) {		cl_log(LOG_ERR, "%s: mqname_close failed", __FUNCTION__);		return FALSE;	}	mqueue_handle_remove(&(m->handle));	/*	 * remove this mq from client's opened_mqueue_list	 */	cms_client = g_hash_table_lookup(cmsdata->client_table,				&client->farside_pid);	assert(cms_client != NULL);	cms_client->opened_mqueue_list =		g_list_remove(cms_client->opened_mqueue_list, mq);	#if 0	if (m->silent)		return TRUE;	#endif	return TRUE;}intclient_process_mqunlink(IPC_Channel * client, client_header_t * msg,			cms_data_t * cmsdata){	mqueue_t *mq;	char * mqname;	client_mqueue_unlink_t * m = (client_mqueue_unlink_t *) msg;	client_header_t reply;	cms_client_t * cms_client;	mqname = saname2str(m->header.name);	dprintf("%s: mqname=[%s]\n", __FUNCTION__, mqname);	mq = mqueue_table_lookup(mqname, NULL);	if (mq == NULL) {		reply.type = msg->type;		reply.len = sizeof(client_header_t);		reply.flag = SA_ERR_NOT_EXIST;		reply.name = msg->name;		cl_log(LOG_WARNING, "%s: Cannot find mq by handle [%u]"		,	__FUNCTION__, m->handle);		client_send_msg(client, reply.len, &reply); 		return TRUE;	}	if (mq->list != NULL) {		/*		 * Remove the reference from my message queue		 */	}	if (request_mqname_unlink(mqname, cmsdata) == FALSE) {		cl_log(LOG_ERR, "%s: mqname_unlink failed", __FUNCTION__);		ha_free(mqname);		return FALSE;	}	mqueue_handle_remove(&(m->handle));	/*	 * remove this mq from client's opened_mqueue_list	 */	cms_client = g_hash_table_lookup(cmsdata->client_table,				&client->farside_pid);	assert(cms_client != NULL);	cms_client->opened_mqueue_list =		g_list_remove(cms_client->opened_mqueue_list, mq);	ha_free(mqname);	return TRUE;}intclient_process_mqsend(IPC_Channel * client, client_header_t * msg,		      cms_data_t * cmsdata){	mqueue_t *mq;	char * mqname;	mqueue_request_t request;	IPC_Channel *cli = NULL; 	unsigned long * seq = NULL;	client_message_t * m = (client_message_t *) msg;	client_message_ack_t reply;	SaErrorT error;	mqname = saname2str(m->header.name);	request.qname = mqname;	request.gname = NULL;	request.request_type = m->header.type;	dprintf("request.request_type is %d\n", request.request_type);	request.invocation = m->invocation;	request.ack = m->ack;	request.sendreceive = m->sendreceive;	request.seq = gSendSeqNo++;		dprintf("%s: mqname = %s\n", __FUNCTION__, mqname);	m->data = (void *)((char *)msg + sizeof(client_message_t));	m->msg.data = m->data;	mq = mqname_lookup(mqname, NULL);	if (mq == NULL) {		cl_log(LOG_WARNING, "%s: Cannot find mq by name [%s]"		,	__FUNCTION__, mqname);		error = SA_ERR_NOT_EXIST;		goto error;	}	/*	 * for message queue group	 */ 	if (IS_MQGROUP(mq)) { 		mqueue_t *mqg = mq;		char *mqueue_name = NULL;		request.gname = mqg->name;  		cl_log(LOG_INFO, "[%s] is a [Type %d] message queue group" 		,	mqname, mqg->policy);  		cl_log(LOG_INFO, "MQ Group [%s] current is <%p>" 		,	mqname, mqg->current->data);  		if (!(mqg->current) 		||  !(mqueue_name = ((mqueue_t *)(mqg->current->data))->name) 		||  !(mq = mqueue_table_lookup(mqueue_name, NULL))) { 			cl_log(LOG_ERR, "%s: Cannot find group current [%s]\n" 			,	__FUNCTION__, mqueue_name);			error = SA_ERR_NOT_EXIST;			goto error; 		}		dump_mqueue_list(mqg); 		mqg->current = CIRCLE_LIST_NEXT(mqg->list, mqg->current);		request.qname = mqueue_name;		request.gname = mqname; 	}	if (request.ack) {		dprintf("%s: insert ack packet ", __FUNCTION__);		if ((cli = (IPC_Channel *) ha_malloc(sizeof(IPC_Channel))) 				== NULL 		||  (seq = (unsigned long *) ha_malloc(sizeof(unsigned long))) 				== NULL ) {			cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__);			if (cli)				ha_free(cli);			error = SA_ERR_NO_MEMORY;			goto error;		}		*cli = *client;		*seq = request.seq;		dprintf("seq = %ld, client = %p\n", *seq, cli);		g_hash_table_insert(mq_ack_pending_hash, seq, cli);	} 	if (request_mqname_send(&request, mq->host, NULL, &(m->msg)	,	cmsdata) == FALSE) {		cl_log(LOG_ERR, "%s: mqname_send failed", __FUNCTION__);		error = SA_ERR_LIBRARY;		goto error;	}	ha_free(mqname);	return TRUE;error:	/* This is actually a error respond instead of a ACK. */	memset(&reply, 0, sizeof(client_message_ack_t));	reply.header.type = CMS_MSG_ACK;	reply.header.len = sizeof(client_message_ack_t);	reply.header.flag = error;	reply.header.name = msg->name;	reply.send_type = msg->type;	reply.invocation = m->invocation;	client_send_msg(client, reply.header.len, &reply);	ha_free(mqname);	return TRUE;}intclient_process_message_request(IPC_Channel * client, client_header_t * msg){	char * mqname;	mqueue_t * mq;	message_t * message;	client_message_t * m;	mqname = saname2str(msg->name);	dprintf("%s: mqname is %s\n", __FUNCTION__, mqname);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -