📄 replica.c
字号:
/* $Id: replica.c,v 1.17 2004/11/18 01:56:59 yixiong Exp $ *//* * replica.c: * * Copyright (C) 2003 Deng Pan <deng.pan@intel.com> * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#ifdef HAVE_CONFIG_H#include "config.h"#endif#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <sys/types.h>#include <sys/stat.h>#include <glib.h>#include <clplumbing/cl_log.h>#include <clplumbing/cl_signal.h>#include <clplumbing/ipc.h>#include <clplumbing/Gmain_timeout.h>#include <hb_api_core.h>#include <hb_api.h>#include <ha_msg.h>#include <heartbeat.h>#include <saf/ais.h>#include <checkpointd/clientrequest.h>#include "checkpointd.h"#include "client.h"#include "replica.h"#include "message.h"#include "request.h"#include "response.h"#include "operation.h"#include "utils.h"#ifdef USE_DMALLOC#include <dmalloc.h>#endifextern SaCkptServiceT* saCkptService;/* * replica retention timeout process * delte local replica and sent message to other nodes to update replica list */gbooleanSaCkptRetentionTimeout(gpointer timeout_data){ SaCkptReplicaT* replica = (SaCkptReplicaT*)timeout_data; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Checkpoint %s retention timeout", replica->checkpointName); } /* * if there are still not finished operations * do not delete the replica */ if (g_hash_table_size(replica->operationHash) > 0) { replica->flagRetentionTimeout = TRUE; return FALSE; } if (replica != NULL) { SaCkptReplicaRemove(&replica); } return FALSE;}/* remove the replica and free its memory */int SaCkptReplicaRemove(SaCkptReplicaT** pReplica){ SaCkptReplicaT* replica = *pReplica; SaCkptMessageT* ckptMsg = NULL; ckptMsg = SaCkptMalloc(sizeof(SaCkptMessageT)); ckptMsg->msgVersion = saCkptService->version; strcpy(ckptMsg->msgType, T_CKPT); ckptMsg->msgSubtype = M_RPLC_DEL_BCAST; strcpy(ckptMsg->checkpointName, replica->checkpointName); ckptMsg->retVal = SA_OK; SaCkptMessageMulticast(ckptMsg, replica->nodeList); g_hash_table_remove(saCkptService->replicaHash, (gpointer)replica->checkpointName); cl_log(LOG_INFO, "Replica %s deleted", replica->checkpointName); SaCkptReplicaFree(pReplica); return HA_OK;}/* create a replica from scratch */SaCkptReplicaT* SaCkptReplicaCreate(SaCkptReqOpenParamT* openParam) { SaCkptReplicaT* replica = NULL; SaCkptSectionT* sec = NULL; SaCkptStateT* state = NULL; replica = (SaCkptReplicaT*)SaCkptMalloc(sizeof(SaCkptReplicaT)); SACKPTASSERT(replica != NULL); replica->saCkptService = saCkptService; strcpy(replica->checkpointName, openParam->ckptName.value); strcpy(replica->activeNodeName, saCkptService->nodeName); replica->flagIsActive= TRUE; replica->createFlag = openParam->attr.creationFlags; replica->maxSectionIDSize = openParam->attr.maxSectionIdSize; replica->maxSectionNumber = openParam->attr.maxSections; replica->maxSectionSize = openParam->attr.maxSectionSize; replica->maxCheckpointSize= openParam->attr.checkpointSize; replica->checkpointSize = 0; replica->retentionDuration = openParam->attr.retentionDuration; replica->flagRetentionTimeout = FALSE; replica->operationHash = g_hash_table_new(g_int_hash, g_int_equal); replica->nextOperationNumber = 0; replica->referenceCount = 0; replica->flagUnlink = FALSE; replica->flagPendOperation = FALSE; replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } replica->replicaState= STATE_CREATE_PREPARED; replica->openCheckpointList= NULL; replica->pendingOperationList = NULL; replica->sectionList = NULL; replica->nodeList = NULL; replica->retentionTimeoutTag = 0; /* create default section */ sec = (SaCkptSectionT*)SaCkptMalloc(sizeof(SaCkptSectionT) + sizeof(SaUint8T)); SACKPTASSERT(sec != NULL); sec->replica = replica; sec->sectionID.id[0] = 0; sec->sectionID.idLen = 0; sec->expirationTime = SA_TIME_END; sec->lastUpdateTime = time_longclock(); sec->dataIndex = 0; sec->dataUpdateState = OP_STATE_COMMITTED; sec->sectionState = STATE_CREATE_COMMITTED; sec->dataState = SA_CKPT_SECTION_VALID; sec->dataLength[0] = 0; sec->dataLength[1] = 0; sec->data[0] = NULL; sec->data[1] = NULL; replica->sectionList = g_list_append(replica->sectionList, (gpointer)sec); replica->sectionNumber = 1; state = (SaCkptStateT*)SaCkptMalloc(sizeof(SaCkptStateT)); strcpy(state->nodeName, saCkptService->nodeName); state->state = OP_STATE_COMMITTED; replica->nodeList = g_list_append(replica->nodeList, (gpointer)state); g_hash_table_insert(saCkptService->replicaHash, (gpointer)replica->checkpointName, (gpointer)replica); cl_log(LOG_INFO, "Replica %s created with default section", openParam->ckptName.value); return replica;}int SaCkptReplicaFree(SaCkptReplicaT** pReplica){ SaCkptReplicaT* replica = *pReplica; GList* list = NULL; SaCkptSectionT* sec = NULL; list = replica->nodeList; while (list != NULL) { if (list->data != NULL) { SaCkptFree((void**)&(list->data)); } list = list->next; } g_list_free(replica->nodeList); list = replica->sectionList; while (list != NULL) { sec = (SaCkptSectionT*)list->data; if (sec != NULL) { if (sec->data[0] != NULL) { SaCkptFree((void**)&(sec->data[0])); } if (sec->data[1] != NULL) { SaCkptFree((void**)&(sec->data[1])); } SaCkptFree((void*)&sec); } list = list->next; } g_list_free(replica->sectionList); g_hash_table_destroy(replica->operationHash); SaCkptFree((void*)&replica); *pReplica = NULL; return HA_OK;}/* open a checkpoint */SaCkptOpenCheckpointT* SaCkptCheckpointOpen(SaCkptClientT* client, SaCkptReplicaT* replica, SaCkptReqOpenParamT* openParam){ SaCkptOpenCheckpointT* openCkpt = NULL; /* create opencheckpoint */ openCkpt = (SaCkptOpenCheckpointT*)SaCkptMalloc( sizeof(SaCkptOpenCheckpointT)); SACKPTASSERT(openCkpt != NULL); openCkpt->client = client; openCkpt->replica = replica; saCkptService->nextCheckpointHandle++; if (saCkptService->nextCheckpointHandle <= 0) { saCkptService->nextCheckpointHandle = 1; } openCkpt->checkpointHandle = saCkptService->nextCheckpointHandle; openCkpt->checkpointRemoteHandle = -1; openCkpt->checkpointOpenFlags = openParam->openFlag; openCkpt->flagLocalClient = FALSE; openCkpt->flagLocalReplica = FALSE; if (replica != NULL) { /* update the replica reference count */ replica->referenceCount++; strcpy(openCkpt->checkpointName, replica->checkpointName); strcpy(openCkpt->activeNodeName, replica->activeNodeName); openCkpt->flagLocalReplica = TRUE; replica->openCheckpointList= g_list_append(replica->openCheckpointList, (gpointer)openCkpt); } if (client != NULL) { openCkpt->clientHandle = client->clientHandle; strcpy(openCkpt->clientHostName, saCkptService->nodeName); client->openCheckpointList= g_list_append(client->openCheckpointList, (gpointer)openCkpt); } g_hash_table_insert(saCkptService->openCheckpointHash, (gpointer)&(openCkpt->checkpointHandle), (gpointer)openCkpt); cl_log(LOG_INFO, "client %d opened checkpoint %s, handle %d", client!=NULL?client->clientHandle:0, replica!=NULL?replica->checkpointName:"REMOTE", openCkpt->checkpointHandle); return openCkpt;}/* close a checkpoint */intSaCkptCheckpointClose(SaCkptOpenCheckpointT** pOpenCkpt) { SaCkptOpenCheckpointT* openCkpt = *pOpenCkpt; SaCkptClientT* client = openCkpt->client; SaCkptReplicaT* replica = openCkpt->replica; int checkpointHandle = openCkpt->checkpointHandle; cl_log(LOG_INFO, "Client %d closed checkpoint %s", client!=NULL?client->clientHandle:0, replica!=NULL?replica->checkpointName:"REMOTE"); g_hash_table_remove(saCkptService->openCheckpointHash, (gpointer)&checkpointHandle); if (client != NULL) { client->openCheckpointList = g_list_remove(client->openCheckpointList, (gpointer)openCkpt); } if (replica != NULL) { replica->openCheckpointList= g_list_remove(replica->openCheckpointList, (gpointer)openCkpt); replica->referenceCount--; if (replica->referenceCount == 0) { if (replica->flagUnlink == TRUE) { SaCkptReplicaRemove(&replica); } else { /* * if the retention time is SA_TIME_END, * it will exist forever */ if (replica->retentionDuration != SA_TIME_END) { SaCkptReplicaStartTimer(replica); } } } } SaCkptFree((void*)&openCkpt); *pOpenCkpt = openCkpt; return HA_OK;}/* pack the replica so that it can be sent in a ckpt message */int SaCkptReplicaPack(void** data, SaSizeT* dataLength, SaCkptReplicaT* replica){ SaCkptSectionT* sec = NULL; SaCkptStateT* state = NULL; int sectionLength = 0; GList* list = NULL; char *p = NULL; char *q = NULL; int n = 0; int index = 0; q = (char*)SaCkptMalloc(MAXMSG); SACKPTASSERT(q != NULL); p = q; n = strlen(replica->checkpointName); memcpy(p, &n, sizeof(n)); p += sizeof(n); memcpy(p, replica->checkpointName, n); p += n; memcpy(p, &replica->maxSectionNumber, sizeof(replica->maxSectionNumber)); p += sizeof(replica->maxSectionNumber); memcpy(p, &replica->maxSectionSize, sizeof(replica->maxSectionSize)); p += sizeof(replica->maxSectionSize); memcpy(p, &replica->maxSectionIDSize, sizeof(replica->maxSectionIDSize)); p += sizeof(replica->maxSectionIDSize); memcpy(p, &replica->retentionDuration, sizeof(replica->retentionDuration)); p += sizeof(replica->retentionDuration); memcpy(p, &replica->maxCheckpointSize, sizeof(replica->maxCheckpointSize)); p += sizeof(replica->maxCheckpointSize); memcpy(p, &replica->checkpointSize, sizeof(replica->checkpointSize)); p += sizeof(replica->checkpointSize); memcpy(p, &replica->sectionNumber, sizeof(replica->sectionNumber)); p += sizeof(replica->sectionNumber); memcpy(p, &replica->createFlag, sizeof(replica->createFlag)); p += sizeof(replica->createFlag); memcpy(p, &replica->ownerPID, sizeof(replica->ownerPID)); p += sizeof(replica->ownerPID); list = replica->nodeList; n = g_list_length(list); memcpy(p, &n, sizeof(n)); p += sizeof(n); while (list != NULL) { state = (SaCkptStateT*)list->data; n = strlen(state->nodeName); memcpy(p, &n, sizeof(n)); p += sizeof(n); memcpy(p, state->nodeName, n); p += n; memcpy(p, &state->state, sizeof(state->state)); p += sizeof(state->state); list = list->next; } n = strlen(replica->activeNodeName); memcpy(p, &n, sizeof(n)); p += sizeof(n); memcpy(p, replica->activeNodeName, n); p += n; memcpy(p, &replica->nextOperationNumber, sizeof(replica->nextOperationNumber)); p += sizeof(replica->nextOperationNumber); list = replica->sectionList; while (list != NULL) { sec = (SaCkptSectionT* )list->data; sectionLength = sizeof(SaCkptSectionT) + sec->sectionID.idLen; index = sec->dataIndex; memcpy(p,§ionLength,sizeof(sectionLength)); p += sizeof(sizeof(sectionLength)); memcpy(p, &sec->sectionID.idLen, sizeof(sec->sectionID.idLen)); p += sizeof(sec->sectionID.idLen); if (sec->sectionID.idLen > 0) { memcpy(p, sec->sectionID.id, sec->sectionID.idLen); p += sec->sectionID.idLen; } memcpy(p, &sec->dataState, sizeof(sec->dataState)); p += sizeof(sec->dataState); if (sec->dataState == SA_CKPT_SECTION_CORRUPTED) { list = list->next; continue; } memcpy(p, &sec->expirationTime, sizeof(sec->expirationTime)); p += sizeof(sec->expirationTime); memcpy(p, &sec->lastUpdateTime, sizeof(sec->lastUpdateTime)); p += sizeof(sec->lastUpdateTime); memcpy(p, &sec->dataLength[index], sizeof(sec->dataLength[index])); p += sizeof(sec->dataLength[index]); if (sec->dataLength[index] > 0) { memcpy(p, sec->data[index], sec->dataLength[index]); p += sec->dataLength[index]; } list = list->next; } *dataLength = p - q; *data = SaCkptMalloc(*dataLength); SACKPTASSERT(*data != NULL); memcpy(*data, q, *dataLength); SaCkptFree((void*)&q); return HA_OK;}/* create local replica from the received message */SaCkptReplicaT* SaCkptReplicaUnpack(void* data, int dataLength){ char* p = NULL; char* q = NULL; SaUint32T i = 0; SaUint32T n = 0; int m = 0; int sectionLength = 0; GList* list = NULL; SaCkptSectionT* sec = NULL; SaCkptStateT* state = NULL; SaCkptReplicaT* replica = NULL; replica = (SaCkptReplicaT*)SaCkptMalloc(sizeof(SaCkptReplicaT)); SACKPTASSERT(replica != NULL); p = data; memcpy(&n, p, sizeof(n)); p += sizeof(n); memcpy(replica->checkpointName, p, n); replica->checkpointName[n] = 0; p += n; memcpy(&(replica->maxSectionNumber), p, sizeof(replica->maxSectionNumber)); p += sizeof(replica->maxSectionNumber); memcpy(&(replica->maxSectionSize), p, sizeof(replica->maxSectionSize)); p += sizeof(replica->maxSectionSize); memcpy(&(replica->maxSectionIDSize), p, sizeof(replica->maxSectionIDSize)); p += sizeof(replica->maxSectionIDSize); memcpy(&(replica->retentionDuration), p, sizeof(replica->retentionDuration)); p += sizeof(replica->retentionDuration); memcpy(&(replica->maxCheckpointSize), p, sizeof(replica->maxCheckpointSize)); p += sizeof(replica->maxCheckpointSize); memcpy(&(replica->checkpointSize), p, sizeof(replica->checkpointSize)); p += sizeof(replica->checkpointSize); memcpy(&(replica->sectionNumber), p, sizeof(replica->sectionNumber)); p += sizeof(replica->sectionNumber); memcpy(&(replica->createFlag), p, sizeof(replica->createFlag)); p += sizeof(replica->createFlag); memcpy(&(replica->ownerPID), p, sizeof(replica->ownerPID)); p += sizeof(replica->ownerPID); list = replica->nodeList = NULL; memcpy(&n, p, sizeof(n));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -