⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 message.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
/* $Id: message.c,v 1.14 2004/11/18 01:56:59 yixiong Exp $ *//*  * message.c * * Copyright (C) 2003 Deng Pan <deng.pan@intel.com> *  * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. *  * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * General Public License for more details. *  * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#ifdef HAVE_CONFIG_H#include "config.h"#endif#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <sys/types.h>#include <sys/stat.h>#include <string.h>#include <glib.h>#include <clplumbing/cl_log.h>#include <clplumbing/cl_signal.h>#include <clplumbing/ipc.h>#include <clplumbing/Gmain_timeout.h>#include <clplumbing/base64.h>#include <hb_api_core.h>#include <hb_api.h>#include <ha_msg.h>#include <heartbeat.h>#include <saf/ais.h>#include <checkpointd/clientrequest.h>#include "checkpointd.h"#include "client.h"#include "replica.h"#include "message.h"#include "request.h"#include "response.h"#include "operation.h"#include "utils.h"#ifdef USE_DMALLOC#include <dmalloc.h>#endif#undef CKPTDEBUG extern SaCkptServiceT* saCkptService;#define SACKPTMESSAGEVALIDATEREQ(ckptMsg) 		\{							\	client = g_hash_table_lookup(			\		saCkptService->clientHash,		\		(gpointer)&(ckptMsg->clientHandle));	\	if (client == NULL) {				\		cl_log(LOG_ERR, 			\			"No client, ignore message");	\		break;					\	}						\							\	ckptReq= g_hash_table_lookup(			\		client->requestHash,			\		(gpointer)&(ckptMsg->clientRequestNO));	\	if (ckptReq == NULL) {				\		cl_log(LOG_ERR, 			\			"No request, ignore message");	\		break;					\	}						\}#define SACKPTMESSAGEVALIDATEOP(ckptMsg)		\{							\	if (replica == NULL) {				\		cl_log(LOG_ERR,			\			"No replica, ignore message");	\	}						\							\	if (!(replica->flagIsActive)) {			\		cl_log(LOG_ERR, 			\			"Replica is not active, ignore message");	\		break;					\	}						\							\	ckptOp = g_hash_table_lookup(			\		replica->operationHash,			\		(gconstpointer)&(ckptMsg->operationNO));	\	if (ckptOp == NULL) {				\		cl_log(LOG_ERR, 			\			"No operation, ignore message");	\		break;					\	}						\}/* checkpoint message process routine */gbooleanSaCkptClusterMsgProcess(){	SaCkptMessageT* ckptMsg = NULL;	SaCkptReplicaT* replica = NULL;	SaCkptClientT* client = NULL;	SaCkptOpenCheckpointT* openCkpt = NULL;	SaCkptRequestT* ckptReq = NULL;	SaCkptResponseT* ckptResp = NULL;	SaCkptOperationT* ckptOp = NULL;	SaCkptReqOpenParamT* openParam = NULL;	SaCkptReqCloseParamT* closeParam = NULL;	SaCkptCheckpointCreationAttributesT*	attr = NULL;	SaCkptReqUlnkParamT* unlinkParam = NULL;	SaNameT*	unlinkName = NULL;	SaCkptStateT*	state = NULL;	saOpenResponseTypeT openRespType ;	int	checkpointHandle;	GList*	list = NULL;	GList* 	nodeList = NULL;	int	finished = TRUE;	saOpenResponseTypeT	updateOpenProcessRes = -1;	SaErrorT	retVal;	ckptMsg = SaCkptMessageReceive();	if (ckptMsg == NULL) {		return FALSE;	}	if (!strcmp(ckptMsg->msgType, T_CKPT)) {		/* FIXME: different version should be work together  */		if ( SaCkptVersionCompare(ckptMsg->msgVersion, 			saCkptService->version) != 0) {			cl_log(LOG_ERR, 				"Bad message version\n");			return FALSE;		}		if (ckptMsg->checkpointName[0] != 0) {			replica = (SaCkptReplicaT*)g_hash_table_lookup(				saCkptService->replicaHash, 				(gconstpointer)ckptMsg->checkpointName); 		}		switch (ckptMsg->msgSubtype) {		/*		 * Sent out when checkpoint service started		 */		case M_CKPT_CREATED:			receiveCkptCreateMsg(ckptMsg);			break;		/*		 * Response for M_CKPT_CREATED		 */		case M_CKPT_CREATED_REPLY:			receiveCkptCreateReplyMsg(ckptMsg);			break;		/*		 * unlink the checkpoint		 * add its name to the unlinkCheckpointHash		 */		case M_CKPT_UNLINK_BCAST:			unlinkParam = ckptMsg->param;			unlinkName = g_hash_table_lookup(				saCkptService->unlinkedCheckpointHash,				unlinkParam->ckptName.value);			if (unlinkName != NULL) {				cl_log(LOG_INFO, 				"Name %s has already been in unlink hashtable",				unlinkParam->ckptName.value);				break;			}			unlinkName = SaCkptMalloc(sizeof(SaNameT));			if (unlinkName == NULL) {				cl_log(LOG_ERR, "No memory in daemon");				break;			} 						unlinkName->length = 				unlinkParam->ckptName.length;			g_hash_table_insert(				saCkptService->unlinkedCheckpointHash,				(gpointer)(unlinkParam->ckptName.value), 				(gpointer)unlinkName);			cl_log(LOG_INFO, 				"Name %s is added into unlink hash table",				unlinkParam->ckptName.value);			break;		/* 		 * the first checkpoint message 		 * before create local checkpoint, it has to broadcast this		 * message. if no reply after timeout, it can create, or else		 * it will copy the data from active checkpoint		 */		case M_CKPT_OPEN_BCAST:					if(isLoopMessage(ckptMsg)){				ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_SELF;				if(saCkptService->flagVerbose){					cl_log(LOG_INFO,					"self M_CKPT_OPEN_BCAST, ignore it\n");				}				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);				break;			}			openParam = (SaCkptReqOpenParamT*)ckptMsg->param;						if (replica == NULL) {				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,					"No replica\n");				}				if(isOnOpenProcess(openParam) == NULL){					if (saCkptService->flagVerbose) {						cl_log(LOG_INFO,						"No replica and no local open request\n");					}					ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA;				}				/*			 	 * This node have already a open request with 				 * the same name, so if it is a conflict open, 				 * the high priority node will do it, 				 * otherwise , the earlier one do it. 				 * Just compare the node name for priority now			 	 */				else {					updateOpenProcessQueue(ckptMsg, &updateOpenProcessRes);										if(updateOpenProcessRes == RES_RACE_HIGH_PRIO){						ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_RACE_HIGH;					}else if(updateOpenProcessRes == RES_RACE_LOW_PRIO){						ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_RACE_LOW;										}else{						ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_EARLIER;					}				}				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);				break;			}			if (!replica->flagIsActive) {				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,					"Standby replica for open request\n");				}				ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_STANDBY;				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);				break;				}			/* 			 * if the createattribute is not null and is 			 * different from the replica, return error			 */			attr = &(openParam->attr);			if ((attr->checkpointSize != 				replica->maxCheckpointSize) ||			    (attr->maxSectionIdSize !=				replica->maxSectionIDSize) || 			    (attr->maxSections != 				replica->maxSectionNumber) ||			    (attr->maxSectionSize !=				replica->maxSectionSize) ||			    (attr->creationFlags !=			    	replica->createFlag)) {				cl_log(LOG_ERR, 					"create attribute is different");								ckptMsg->msgSubtype = 					M_CKPT_OPEN_BCAST_REPLY;				ckptMsg->retVal = 					SA_ERR_EXIST;				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);				break;			}						/* if replica is unlinked, open should fail */			if (replica->flagUnlink == TRUE) {				cl_log(LOG_ERR, 					"checkpoint %s has been unlinked",					replica->checkpointName);				ckptMsg->retVal = SA_ERR_INVALID_PARAM;			} else {				strcpy(ckptMsg->activeNodeName,					saCkptService->nodeName);			}						ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY;			SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName);			break;		/* 		 * the reply message to M_CKPT_OPEN_BCAST when active replica		 * on sent node		 */		case M_CKPT_OPEN_BCAST_REPLY:			if (replica != NULL) {				cl_log(LOG_ERR, 					"Replica exists, ignore message");				break;			}			SACKPTMESSAGEVALIDATEREQ(ckptMsg);			openParam = ckptReq->clientRequest->reqParam;			updateOpenParamNodeStatus(openParam, 				ckptMsg->fromNodeName, RES_HAVE_REPLICA);			removeOpenPendingQueue(ckptReq->clientRequest->reqParam);			if (ckptMsg->retVal != SA_OK) {				ckptResp = SaCkptResponseCreate(ckptReq);				ckptResp->resp->retVal = ckptMsg->retVal;				ckptResp->resp->dataLength = 0;				ckptResp->resp->data = NULL;								SaCkptResponseSend(&ckptResp);								break;			}									if (openParam->openFlag & 				SA_CKPT_CHECKPOINT_COLOCATED) {				ckptMsg->msgSubtype = M_RPLC_CRT;								ckptReq->operation = OP_RPLC_CRT;			} else {				ckptMsg->msgSubtype = M_CKPT_OPEN_REMOTE;			}			SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName);			break;			/*		 * No replica for M_CKPT_OPEN_BCAST on sent node		 * ugly goto , may remove it ...		 */		case M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA:			openRespType = RES_NO_REPLICA;			goto here;		/*		 * Standby replica for M_CKPT_OPEN_BCAST on sent node		 */		case M_CKPT_OPEN_BCAST_REPLY_STANDBY:			openRespType = RES_STANDBY;			goto here;					/*		 * A race condition for M_CKPT_OPEN_BCAST 		 * sent node have high priority		 */		case M_CKPT_OPEN_BCAST_REPLY_RACE_HIGH:			openRespType = RES_RACE_HIGH_PRIO;			goto here;					/*		 * A race condition for M_CKPT_OPEN_BCAST 		 * sent node have low priority		 */		case M_CKPT_OPEN_BCAST_REPLY_RACE_LOW:			openRespType = RES_RACE_LOW_PRIO;			goto here;		/*		 * The sent node have open on progress		 * this node send M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA for 		 * sent node's M_CKPT_OPEN_BCAST		 */		case M_CKPT_OPEN_BCAST_REPLY_EARLIER:			openRespType = RES_EARLIER;			goto here;					case M_CKPT_OPEN_BCAST_REPLY_SELF:			openRespType = RES_SELF;			/* Ugly goto */			here:			SACKPTMESSAGEVALIDATEREQ(ckptMsg);			openParam = ckptReq->clientRequest->reqParam;			if( NULL == isOnOpenProcess(openParam)){			/* The reason not found may because it have receive reply from other side*/				if(saCkptService->flagVerbose){					cl_log(LOG_INFO,					"open reponse miss\n");				}					break;			}else{				if(saCkptService->flagVerbose){					cl_log(LOG_INFO,					"open reponse found\n");				}				updateOpenParamNodeStatus(					openParam, ckptMsg->fromNodeName, 					openRespType);				if( openReqFinishedForLocalCreate(openParam)){					SaCkptRequestStopTimer(ckptReq);					ckptResp = createLocalReplical(ckptReq);					notifyLowPrioNode(openParam);					removeOpenPendingQueue(ckptReq->clientRequest->reqParam);					SaCkptResponseSend(&ckptResp);				}				break;			}			/* 		 * if the client do not want to create a local copy of 		 * the checkpoint, it will open the checkpoint remotely		 */		case M_CKPT_OPEN_REMOTE:			if (replica == NULL) {				cl_log(LOG_ERR,					"No replica, ignore message");				break;			}						if (!replica->flagIsActive) {				cl_log(LOG_ERR,					"Standby replica, ignore message");				break;				}			openParam = (SaCkptReqOpenParamT*)ckptMsg->param;			/* 			 * if the createattribute is not null and is 			 * different from the replica, return error			 */			attr = &(openParam->attr);			if ((attr->checkpointSize != 				replica->maxCheckpointSize) ||			    (attr->maxSectionIdSize !=				replica->maxSectionIDSize) || 			    (attr->maxSections != 				replica->maxSectionNumber) ||			    (attr->maxSectionSize !=				replica->maxSectionSize) ||			    (attr->creationFlags !=			    	replica->createFlag)) {				cl_log(LOG_ERR, 					"create attribute is different");								ckptMsg->msgSubtype = 					M_CKPT_OPEN_REMOTE_REPLY;				ckptMsg->retVal = 					SA_ERR_EXIST;				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);				break;			}						openCkpt = SaCkptCheckpointOpen(NULL, replica, 				openParam);			SACKPTASSERT(openCkpt != NULL);			strcpy(openCkpt->clientHostName, 				ckptMsg->clientHostName);			openCkpt->clientHandle = ckptMsg->clientHandle;			ckptMsg->msgSubtype = M_CKPT_OPEN_REMOTE_REPLY;			ckptMsg->dataLength = 				sizeof(openCkpt->checkpointHandle);			ckptMsg->data = SaCkptMalloc(				sizeof(openCkpt->checkpointHandle));			SACKPTASSERT(ckptMsg->data != NULL);			memcpy(ckptMsg->data,				&(openCkpt->checkpointHandle),				sizeof(openCkpt->checkpointHandle));			SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName);						break;		/*		 * the reply message to M_CKPT_OPEN_REMOTE		 */		case M_CKPT_OPEN_REMOTE_REPLY:			if (replica != NULL) {				cl_log(LOG_ERR, 					"Replica exists, ignore message");				break;			}			SACKPTMESSAGEVALIDATEREQ(ckptMsg);			SaCkptRequestStopTimer(ckptReq);						if (ckptMsg->retVal != SA_OK) {				ckptResp = SaCkptResponseCreate(ckptReq);				ckptResp->resp->retVal = ckptMsg->retVal;				SaCkptResponseSend(&ckptResp);				break;			}				openParam = ckptReq->clientRequest->reqParam;			openCkpt = SaCkptCheckpointOpen(client, NULL,				openParam);			strcpy(openCkpt->activeNodeName, 				ckptMsg->activeNodeName);			strcpy(openCkpt->checkpointName, 				ckptMsg->checkpointName);			openCkpt->checkpointRemoteHandle = 				*(int*)(ckptMsg->data);			ckptReq->openCkpt = openCkpt;						ckptResp = SaCkptResponseCreate(ckptReq);			ckptResp->resp->retVal = ckptMsg->retVal;						ckptResp->resp->dataLength = 				sizeof(SaCkptCheckpointHandleT);			ckptResp->resp->data = 				SaCkptMalloc(ckptResp->resp->dataLength);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -