⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cms_entry.c

📁 linux集群服务器软件代码包
💻 C
字号:
/* * cms_entry.c: cms daemon state machine entry * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */#include <portability.h>#include <assert.h>#include "cms_data.h"#include "cms_common.h"#include "cms_cluster.h"#include "cms_client.h"#include "cms_membership.h"gbooleancluster_input_dispatch(IPC_Channel * channel, gpointer user_data){	struct ha_msg	* reply;	const char	* type;	ll_cluster_t	* hb = ((cms_data_t *)user_data)->hb_handle;repeat:	/*	 * heartbeat message received	 */	reply = hb->llc_ops->readmsg(hb, 0);	if (!reply)		return TRUE;	type = ha_msg_value(reply, F_TYPE);	assert(type);	dprintf("%s: Received msg from hb, type = %s\n", __FUNCTION__, type);#if DEBUG_CLUSTER	cl_log_message(reply);#endif	switch (mqname_string2type(type)) {	case MQNAME_TYPE_REQUEST:		reply_mqname_open(hb, reply);		break;	case MQNAME_TYPE_GRANTED:		process_mqname_granted(reply, &cms_data);		break;	case MQNAME_TYPE_REOPEN:	case MQNAME_TYPE_REOPEN_MSGFEED:	case MQNAME_TYPE_MSGFEED_END:		process_mqname_reopen(reply, mqname_string2type(type)		,	&cms_data);		break;	case MQNAME_TYPE_DENIED:		process_mqname_denied(reply);		break;	case MQNAME_TYPE_CLOSE:		process_mqname_close(reply);		break;	case MQNAME_TYPE_UNLINK:		process_mqname_unlink(reply);		break;	case MQNAME_TYPE_SEND:		process_mqname_send(reply, &cms_data);		break;	case MQNAME_TYPE_INSERT:		process_mqgroup_insert(reply);		break;	case MQNAME_TYPE_REMOVE:		process_mqgroup_remove(reply);		break;	case MQNAME_TYPE_ACK:		process_mqname_ack(reply);		break;	case MQNAME_TYPE_UPDATE:		process_mqname_update(reply, &cms_data);		break;	case MQNAME_TYPE_STATUS_REQUEST:		reply_mqueue_status(reply, &cms_data);		break;	case MQNAME_TYPE_STATUS_REPLY:		process_mqueue_status(reply);		break;	case MQNAME_TYPE_UPDATE_REQUEST:		process_mqinfo_update_request(reply, &cms_data);		break;	case MQNAME_TYPE_REPLY:		process_mqsend_reply(reply, &cms_data);		break;	default:		cl_log(LOG_ERR, "%s: Unknow type [%s]", __FUNCTION__, type);		break;	}	ha_msg_del(reply);	goto repeat;}static gbooleanprocess_client_message(IPC_Channel * client, client_header_t * msg){	int ret;	/*	 * client message received	 */	dprintf("%s: Received msg from client <%p>, type = %d\n"	,	__FUNCTION__, client, (int)msg->type);	switch (msg->type) {		case CMS_QUEUE_STATUS:			ret = client_process_qstatus(client, msg, &cms_data);			break;		case CMS_QUEUE_OPEN_ASYNC:		case CMS_QUEUE_OPEN:		case CMS_QUEUEGROUP_CREATE:			ret = client_process_mqopen(client, msg, &cms_data);			break;		case CMS_QUEUE_CLOSE:			ret = client_process_mqclose(client, msg, &cms_data);			break;		case CMS_QUEUE_UNLINK:			ret = client_process_mqunlink(client, msg, &cms_data);			break;		case CMS_MSG_SEND:		case CMS_MSG_SEND_ASYNC:		case CMS_MSG_SEND_RECEIVE:			ret = client_process_mqsend(client, msg, &cms_data);			break;		case CMS_MSG_REQUEST:			ret = client_process_message_request(client, msg);			break;		case CMS_QUEUEGROUP_INSERT:			ret = client_process_mqgroup_insert(client, msg,					&cms_data);			break;		case CMS_QUEUEGROUP_REMOVE:			ret = client_process_mqgroup_remove(client, msg,					&cms_data);			break;		case CMS_QUEUEGROUP_TRACK_START:			ret =client_process_mqgroup_track(client, msg);			break;		case CMS_QUEUEGROUP_TRACK_STOP:			ret = client_process_mqgroup_track_stop(client, msg);			break;		case CMS_MSG_REPLY:		case CMS_MSG_REPLY_ASYNC:			ret = client_process_mqsend_reply(client, msg, &cms_data);			break;		default:			cl_log(LOG_ERR, "Unknow message type [%d]"			,	(int)msg->type);			return HA_FAIL;	}	return ret;}static voidcms_close_mqueue(gpointer data, gpointer user_data){	mqueue_t * mq = (mqueue_t *)data;	cms_client_t * cms_client = (cms_client_t *) user_data;	dprintf("%s: mqname_close %s\n", __FUNCTION__, mq->name);	request_mqname_close(mq->name, &cms_data);	cms_client->opened_mqueue_list =                g_list_remove(cms_client->opened_mqueue_list, mq);}#if DEBUG_EXITstatic voiddump_client_table(gpointer key, gpointer value, gpointer user_data){	dprintf("Begin dumping hash table ...\n");	dprintf("key = %d, value = <%p>\n", *(int *)key, value);	dprintf("End dumping hash table\n");}#endifgbooleanclient_input_dispatch(IPC_Channel * client, gpointer user_data){	IPC_Message * msg;	client_header_t *cms_msg;	int ret;	cms_client_t * cms_client;	cms_data_t * cmsdata = (cms_data_t *) user_data;	gpointer orig_key, value;	if (client->ops->is_message_pending(client)) {		ret = client->ops->recv(client, &msg);		if (ret == IPC_FAIL) {			cl_log(LOG_ERR, "%s: client->ops->recv failed. "					"ret = %d\n", __FUNCTION__, ret);			return FALSE;		} else if (ret == IPC_BROKEN) {			cl_log(LOG_ERR, "%s: client->ops->recv failed. "					"ret = %d\n", __FUNCTION__, ret);			/*			 * Client disconnected. According to the spec,			 * we should close all message queues opened by			 * this client.			 */#if DEBUG_EXIT			cl_log(LOG_INFO			,	"Close all message queues for farside_pid "				"[%d], client_table <%p>\n"			,	client->farside_pid			,	cmsdata->client_table);			dprintf("client_table <%p>\n", cmsdata->client_table);			g_hash_table_foreach(cmsdata->client_table			,	dump_client_table, NULL);#endif			ret = g_hash_table_lookup_extended(					cmsdata->client_table,					&client->farside_pid,					&orig_key,					&value);			assert(ret == TRUE);			cms_client = (cms_client_t *)value;#if DEBUG_EXIT			dprintf("%s: channel_count now is %d\n", __FUNCTION__			,	cms_client->channel_count);#endif			/*			 * If we are not the last one, skip.			 */			if (--cms_client->channel_count == 0) {				g_list_foreach(cms_client->opened_mqueue_list				,	cms_close_mqueue, &cms_client);#if DEBUG_EXIT				dprintf("%s: remove key [%d] from "					"client_table\n", __FUNCTION__,					client->farside_pid);#endif				g_hash_table_remove(cmsdata->client_table				,	&client->farside_pid);				ha_free(orig_key);				ha_free(cms_client);			}			return FALSE;		}		cms_msg = (client_header_t *) msg->msg_body;		if (!cmsdata->cms_ready) {			cl_log(LOG_INFO, "cms_ready = %d\n"			,	cmsdata->cms_ready);			client_send_notready_msg(client, cms_msg);			return TRUE;		}		/*		 * process this client messages		 */		ret = process_client_message(client, cms_msg);		if (msg->msg_done) {			msg->msg_done(msg);		}		if (ret != HA_OK) {			return FALSE;		}	}	return TRUE;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -