📄 message.c
字号:
/* $Id: message.c,v 1.14 2004/11/18 01:56:59 yixiong Exp $ *//* * message.c * * Copyright (C) 2003 Deng Pan <deng.pan@intel.com> * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#ifdef HAVE_CONFIG_H#include "config.h"#endif#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <sys/types.h>#include <sys/stat.h>#include <string.h>#include <glib.h>#include <clplumbing/cl_log.h>#include <clplumbing/cl_signal.h>#include <clplumbing/ipc.h>#include <clplumbing/Gmain_timeout.h>#include <clplumbing/base64.h>#include <hb_api_core.h>#include <hb_api.h>#include <ha_msg.h>#include <heartbeat.h>#include <saf/ais.h>#include <checkpointd/clientrequest.h>#include "checkpointd.h"#include "client.h"#include "replica.h"#include "message.h"#include "request.h"#include "response.h"#include "operation.h"#include "utils.h"#ifdef USE_DMALLOC#include <dmalloc.h>#endif#undef CKPTDEBUG extern SaCkptServiceT* saCkptService;#define SACKPTMESSAGEVALIDATEREQ(ckptMsg) \{ \ client = g_hash_table_lookup( \ saCkptService->clientHash, \ (gpointer)&(ckptMsg->clientHandle)); \ if (client == NULL) { \ cl_log(LOG_ERR, \ "No client, ignore message"); \ break; \ } \ \ ckptReq= g_hash_table_lookup( \ client->requestHash, \ (gpointer)&(ckptMsg->clientRequestNO)); \ if (ckptReq == NULL) { \ cl_log(LOG_ERR, \ "No request, ignore message"); \ break; \ } \}#define SACKPTMESSAGEVALIDATEOP(ckptMsg) \{ \ if (replica == NULL) { \ cl_log(LOG_ERR, \ "No replica, ignore message"); \ } \ \ if (!(replica->flagIsActive)) { \ cl_log(LOG_ERR, \ "Replica is not active, ignore message"); \ break; \ } \ \ ckptOp = g_hash_table_lookup( \ replica->operationHash, \ (gconstpointer)&(ckptMsg->operationNO)); \ if (ckptOp == NULL) { \ cl_log(LOG_ERR, \ "No operation, ignore message"); \ break; \ } \}/* checkpoint message process routine */gbooleanSaCkptClusterMsgProcess(){ SaCkptMessageT* ckptMsg = NULL; SaCkptReplicaT* replica = NULL; SaCkptClientT* client = NULL; SaCkptOpenCheckpointT* openCkpt = NULL; SaCkptRequestT* ckptReq = NULL; SaCkptResponseT* ckptResp = NULL; SaCkptOperationT* ckptOp = NULL; SaCkptReqOpenParamT* openParam = NULL; SaCkptReqCloseParamT* closeParam = NULL; SaCkptCheckpointCreationAttributesT* attr = NULL; SaCkptReqUlnkParamT* unlinkParam = NULL; SaNameT* unlinkName = NULL; SaCkptStateT* state = NULL; saOpenResponseTypeT openRespType ; int checkpointHandle; GList* list = NULL; GList* nodeList = NULL; int finished = TRUE; saOpenResponseTypeT updateOpenProcessRes = -1; SaErrorT retVal; ckptMsg = SaCkptMessageReceive(); if (ckptMsg == NULL) { return FALSE; } if (!strcmp(ckptMsg->msgType, T_CKPT)) { /* FIXME: different version should be work together */ if ( SaCkptVersionCompare(ckptMsg->msgVersion, saCkptService->version) != 0) { cl_log(LOG_ERR, "Bad message version\n"); return FALSE; } if (ckptMsg->checkpointName[0] != 0) { replica = (SaCkptReplicaT*)g_hash_table_lookup( saCkptService->replicaHash, (gconstpointer)ckptMsg->checkpointName); } switch (ckptMsg->msgSubtype) { /* * Sent out when checkpoint service started */ case M_CKPT_CREATED: receiveCkptCreateMsg(ckptMsg); break; /* * Response for M_CKPT_CREATED */ case M_CKPT_CREATED_REPLY: receiveCkptCreateReplyMsg(ckptMsg); break; /* * unlink the checkpoint * add its name to the unlinkCheckpointHash */ case M_CKPT_UNLINK_BCAST: unlinkParam = ckptMsg->param; unlinkName = g_hash_table_lookup( saCkptService->unlinkedCheckpointHash, unlinkParam->ckptName.value); if (unlinkName != NULL) { cl_log(LOG_INFO, "Name %s has already been in unlink hashtable", unlinkParam->ckptName.value); break; } unlinkName = SaCkptMalloc(sizeof(SaNameT)); if (unlinkName == NULL) { cl_log(LOG_ERR, "No memory in daemon"); break; } unlinkName->length = unlinkParam->ckptName.length; g_hash_table_insert( saCkptService->unlinkedCheckpointHash, (gpointer)(unlinkParam->ckptName.value), (gpointer)unlinkName); cl_log(LOG_INFO, "Name %s is added into unlink hash table", unlinkParam->ckptName.value); break; /* * the first checkpoint message * before create local checkpoint, it has to broadcast this * message. if no reply after timeout, it can create, or else * it will copy the data from active checkpoint */ case M_CKPT_OPEN_BCAST: if(isLoopMessage(ckptMsg)){ ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_SELF; if(saCkptService->flagVerbose){ cl_log(LOG_INFO, "self M_CKPT_OPEN_BCAST, ignore it\n"); } SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); break; } openParam = (SaCkptReqOpenParamT*)ckptMsg->param; if (replica == NULL) { if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "No replica\n"); } if(isOnOpenProcess(openParam) == NULL){ if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "No replica and no local open request\n"); } ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA; } /* * This node have already a open request with * the same name, so if it is a conflict open, * the high priority node will do it, * otherwise , the earlier one do it. * Just compare the node name for priority now */ else { updateOpenProcessQueue(ckptMsg, &updateOpenProcessRes); if(updateOpenProcessRes == RES_RACE_HIGH_PRIO){ ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_RACE_HIGH; }else if(updateOpenProcessRes == RES_RACE_LOW_PRIO){ ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_RACE_LOW; }else{ ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_EARLIER; } } SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); break; } if (!replica->flagIsActive) { if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Standby replica for open request\n"); } ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY_STANDBY; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); break; } /* * if the createattribute is not null and is * different from the replica, return error */ attr = &(openParam->attr); if ((attr->checkpointSize != replica->maxCheckpointSize) || (attr->maxSectionIdSize != replica->maxSectionIDSize) || (attr->maxSections != replica->maxSectionNumber) || (attr->maxSectionSize != replica->maxSectionSize) || (attr->creationFlags != replica->createFlag)) { cl_log(LOG_ERR, "create attribute is different"); ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY; ckptMsg->retVal = SA_ERR_EXIST; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); break; } /* if replica is unlinked, open should fail */ if (replica->flagUnlink == TRUE) { cl_log(LOG_ERR, "checkpoint %s has been unlinked", replica->checkpointName); ckptMsg->retVal = SA_ERR_INVALID_PARAM; } else { strcpy(ckptMsg->activeNodeName, saCkptService->nodeName); } ckptMsg->msgSubtype = M_CKPT_OPEN_BCAST_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); break; /* * the reply message to M_CKPT_OPEN_BCAST when active replica * on sent node */ case M_CKPT_OPEN_BCAST_REPLY: if (replica != NULL) { cl_log(LOG_ERR, "Replica exists, ignore message"); break; } SACKPTMESSAGEVALIDATEREQ(ckptMsg); openParam = ckptReq->clientRequest->reqParam; updateOpenParamNodeStatus(openParam, ckptMsg->fromNodeName, RES_HAVE_REPLICA); removeOpenPendingQueue(ckptReq->clientRequest->reqParam); if (ckptMsg->retVal != SA_OK) { ckptResp = SaCkptResponseCreate(ckptReq); ckptResp->resp->retVal = ckptMsg->retVal; ckptResp->resp->dataLength = 0; ckptResp->resp->data = NULL; SaCkptResponseSend(&ckptResp); break; } if (openParam->openFlag & SA_CKPT_CHECKPOINT_COLOCATED) { ckptMsg->msgSubtype = M_RPLC_CRT; ckptReq->operation = OP_RPLC_CRT; } else { ckptMsg->msgSubtype = M_CKPT_OPEN_REMOTE; } SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName); break; /* * No replica for M_CKPT_OPEN_BCAST on sent node * ugly goto , may remove it ... */ case M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA: openRespType = RES_NO_REPLICA; goto here; /* * Standby replica for M_CKPT_OPEN_BCAST on sent node */ case M_CKPT_OPEN_BCAST_REPLY_STANDBY: openRespType = RES_STANDBY; goto here; /* * A race condition for M_CKPT_OPEN_BCAST * sent node have high priority */ case M_CKPT_OPEN_BCAST_REPLY_RACE_HIGH: openRespType = RES_RACE_HIGH_PRIO; goto here; /* * A race condition for M_CKPT_OPEN_BCAST * sent node have low priority */ case M_CKPT_OPEN_BCAST_REPLY_RACE_LOW: openRespType = RES_RACE_LOW_PRIO; goto here; /* * The sent node have open on progress * this node send M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA for * sent node's M_CKPT_OPEN_BCAST */ case M_CKPT_OPEN_BCAST_REPLY_EARLIER: openRespType = RES_EARLIER; goto here; case M_CKPT_OPEN_BCAST_REPLY_SELF: openRespType = RES_SELF; /* Ugly goto */ here: SACKPTMESSAGEVALIDATEREQ(ckptMsg); openParam = ckptReq->clientRequest->reqParam; if( NULL == isOnOpenProcess(openParam)){ /* The reason not found may because it have receive reply from other side*/ if(saCkptService->flagVerbose){ cl_log(LOG_INFO, "open reponse miss\n"); } break; }else{ if(saCkptService->flagVerbose){ cl_log(LOG_INFO, "open reponse found\n"); } updateOpenParamNodeStatus( openParam, ckptMsg->fromNodeName, openRespType); if( openReqFinishedForLocalCreate(openParam)){ SaCkptRequestStopTimer(ckptReq); ckptResp = createLocalReplical(ckptReq); notifyLowPrioNode(openParam); removeOpenPendingQueue(ckptReq->clientRequest->reqParam); SaCkptResponseSend(&ckptResp); } break; } /* * if the client do not want to create a local copy of * the checkpoint, it will open the checkpoint remotely */ case M_CKPT_OPEN_REMOTE: if (replica == NULL) { cl_log(LOG_ERR, "No replica, ignore message"); break; } if (!replica->flagIsActive) { cl_log(LOG_ERR, "Standby replica, ignore message"); break; } openParam = (SaCkptReqOpenParamT*)ckptMsg->param; /* * if the createattribute is not null and is * different from the replica, return error */ attr = &(openParam->attr); if ((attr->checkpointSize != replica->maxCheckpointSize) || (attr->maxSectionIdSize != replica->maxSectionIDSize) || (attr->maxSections != replica->maxSectionNumber) || (attr->maxSectionSize != replica->maxSectionSize) || (attr->creationFlags != replica->createFlag)) { cl_log(LOG_ERR, "create attribute is different"); ckptMsg->msgSubtype = M_CKPT_OPEN_REMOTE_REPLY; ckptMsg->retVal = SA_ERR_EXIST; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); break; } openCkpt = SaCkptCheckpointOpen(NULL, replica, openParam); SACKPTASSERT(openCkpt != NULL); strcpy(openCkpt->clientHostName, ckptMsg->clientHostName); openCkpt->clientHandle = ckptMsg->clientHandle; ckptMsg->msgSubtype = M_CKPT_OPEN_REMOTE_REPLY; ckptMsg->dataLength = sizeof(openCkpt->checkpointHandle); ckptMsg->data = SaCkptMalloc( sizeof(openCkpt->checkpointHandle)); SACKPTASSERT(ckptMsg->data != NULL); memcpy(ckptMsg->data, &(openCkpt->checkpointHandle), sizeof(openCkpt->checkpointHandle)); SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); break; /* * the reply message to M_CKPT_OPEN_REMOTE */ case M_CKPT_OPEN_REMOTE_REPLY: if (replica != NULL) { cl_log(LOG_ERR, "Replica exists, ignore message"); break; } SACKPTMESSAGEVALIDATEREQ(ckptMsg); SaCkptRequestStopTimer(ckptReq); if (ckptMsg->retVal != SA_OK) { ckptResp = SaCkptResponseCreate(ckptReq); ckptResp->resp->retVal = ckptMsg->retVal; SaCkptResponseSend(&ckptResp); break; } openParam = ckptReq->clientRequest->reqParam; openCkpt = SaCkptCheckpointOpen(client, NULL, openParam); strcpy(openCkpt->activeNodeName, ckptMsg->activeNodeName); strcpy(openCkpt->checkpointName, ckptMsg->checkpointName); openCkpt->checkpointRemoteHandle = *(int*)(ckptMsg->data); ckptReq->openCkpt = openCkpt; ckptResp = SaCkptResponseCreate(ckptReq); ckptResp->resp->retVal = ckptMsg->retVal; ckptResp->resp->dataLength = sizeof(SaCkptCheckpointHandleT); ckptResp->resp->data = SaCkptMalloc(ckptResp->resp->dataLength);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -