📄 replica.c
字号:
p += sizeof(n); for(i=0; i<n; i++) { state = (SaCkptStateT*)SaCkptMalloc(sizeof(SaCkptStateT)); SACKPTASSERT(state != NULL); memcpy(&m, p, sizeof(m)); p += sizeof(m); memcpy(state->nodeName, p, m); state->nodeName[m] = 0; p += m; memcpy(&(state->state), p, sizeof(state->state)); p += sizeof(state->state); replica->nodeList = g_list_append( replica->nodeList, (gpointer)state); } memcpy(&n, p, sizeof(n)); p += sizeof(n); memcpy(replica->activeNodeName, p, n); replica->activeNodeName[n] = 0; p += n; memcpy(&(replica->nextOperationNumber), p, sizeof(replica->nextOperationNumber)); p += sizeof(replica->nextOperationNumber); list = replica->sectionList; for(i=0; i<replica->sectionNumber; i++) { memcpy(§ionLength ,p,sizeof(sectionLength)); p += sizeof(sectionLength); sec = (SaCkptSectionT*)SaCkptMalloc(sectionLength); SACKPTASSERT (sec != NULL); replica->sectionList = g_list_append(replica->sectionList, (gpointer)sec); memcpy(&(sec->sectionID.idLen), p, sizeof(sec->sectionID.idLen)); p += sizeof(sec->sectionID.idLen); if (sec->sectionID.idLen > 0) { memcpy(sec->sectionID.id, p, sec->sectionID.idLen); p += sec->sectionID.idLen; } memcpy(&(sec->dataState), p, sizeof(sec->dataState)); p += sizeof(sec->dataState); if (sec->dataState == SA_CKPT_SECTION_CORRUPTED) { continue; } memcpy(&(sec->expirationTime), p, sizeof(sec->expirationTime)); p += sizeof(sec->expirationTime); memcpy(&(sec->lastUpdateTime), p, sizeof(sec->lastUpdateTime)); p += sizeof(sec->lastUpdateTime); /* by default , 0 will be the active copy */ memcpy(&(sec->dataLength[0]), p, sizeof(sec->dataLength[0])); p += sizeof(sec->dataLength[0]); if (sec->dataLength[0] > 0) { sec->data[0] = (void*)SaCkptMalloc( sec->dataLength[0]); SACKPTASSERT(sec->data[0] != NULL); memcpy(sec->data[0], p, sec->dataLength[0]); p += sec->dataLength[0]; } sec->dataIndex = 0; sec->dataUpdateState = OP_STATE_COMMITTED; sec->sectionState = STATE_CREATE_COMMITTED; sec->replica = replica; sec->dataLength[1] = 0; sec->data[1] = NULL; } /* default value for new created replica */ replica->saCkptService = saCkptService; replica->referenceCount = 0; replica->replicaState = STATE_CREATE_PREPARED; replica->pendingOperationList = NULL; replica->flagIsActive = FALSE; replica->flagPendOperation = TRUE; replica->flagReplicaLock = TRUE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s locked", replica->checkpointName); } replica->retentionTimeoutTag = 0; replica->operationHash = g_hash_table_new(g_int_hash, g_int_equal); replica->openCheckpointList= NULL; SaCkptFree((void*)&q); if(saCkptService->flagVerbose){ SaCkptDumpReplica(replica); } cl_log(LOG_INFO, "Replica %s was copied from node %s", replica->checkpointName, replica->activeNodeName); return replica;}int SaCkptReplicaRead(SaCkptReplicaT* replica, SaSizeT* dataLength, void** data, SaSizeT paramLength, void* param){ SaCkptSectionT* sec = NULL; SaCkptReqSecReadParamT* secReadParam = NULL; secReadParam = (SaCkptReqSecReadParamT*)param; sec = SaCkptSectionFind(replica, &(secReadParam->sectionID)); if (sec == NULL) { return SA_ERR_NOT_EXIST; } *dataLength = secReadParam->dataSize; return SaCkptSectionRead(replica, sec, secReadParam->offset, dataLength, data);}int SaCkptReplicaUpdate(SaCkptReplicaT* replica, SaCkptReqT req, SaSizeT dataLength, void* data, int paramLength, void* param){ SaCkptSectionT* sec = NULL; SaCkptReqSecCrtParamT* secCrtParam = NULL; SaCkptReqSecDelParamT* secDelParam = NULL; SaCkptReqSecWrtParamT* secWrtParam = NULL; SaCkptReqSecOwrtParamT* secOwrtParam = NULL; int retVal = SA_OK; char* strReq = NULL; char* strErr = NULL; int index = 0; switch (req) { case REQ_SEC_CRT: secCrtParam = (SaCkptReqSecCrtParamT*)param; retVal = SaCkptSectionCreate(replica, secCrtParam, dataLength, data, &sec); if (retVal != SA_OK) { break; } sec->sectionState = STATE_CREATE_COMMITTED; /* update replica */ replica->checkpointSize += sec->dataLength[sec->dataIndex]; replica->sectionNumber++; break; case REQ_SEC_DEL: secDelParam = (SaCkptReqSecDelParamT*)param; sec = SaCkptSectionFind(replica, &(secDelParam->sectionID)); if (sec == NULL) { cl_log(LOG_INFO, "Can not find section"); retVal = SA_ERR_NOT_EXIST; break; } /* update replica */ replica->sectionList = g_list_remove( replica->sectionList, (gpointer)sec); replica->sectionNumber--; replica->checkpointSize -= sec->dataLength[sec->dataIndex]; if (saCkptService->flagVerbose) { char* strSectionID = NULL; strSectionID = SaCkptSectionId2String(sec->sectionID); cl_log(LOG_INFO, "Update: section %s deleted from replica %s", sec->sectionID.id, replica->checkpointName); SaCkptFree((void*)&strSectionID); } /* free section */ if (sec->data[0] != NULL) { SaCkptFree((void**)&(sec->data[0])); } if (sec->data[1] != NULL) { SaCkptFree((void**)&(sec->data[1])); } SaCkptFree((void*)&sec); break; case REQ_SEC_WRT: secWrtParam = (SaCkptReqSecWrtParamT*)param; sec = SaCkptSectionFind(replica, &(secWrtParam->sectionID)); if (sec == NULL) { cl_log(LOG_INFO, "Can not find section"); retVal = SA_ERR_INVALID_PARAM; break; } retVal = SaCkptSectionWrite(replica, sec, secWrtParam->offset, dataLength, data); if (retVal == SA_OK) { index = sec->dataIndex; /* update replica */ if ((secWrtParam->offset + dataLength) > sec->dataLength[index]){ replica->checkpointSize += secWrtParam->offset + dataLength - sec->dataLength[index]; sec->dataLength[(index+1)%2] = secWrtParam->offset + dataLength; } else { sec->dataLength[(index+1)%2] = sec->dataLength[sec->dataIndex]; } /* commit the update */ SaCkptFree((void**)&(sec->data[index])); sec->dataLength[index] = 0; sec->dataIndex = (index + 1) % 2; sec->dataUpdateState = OP_STATE_COMMITTED; sec->lastUpdateTime = time_longclock(); } break; case REQ_SEC_OWRT: secOwrtParam = (SaCkptReqSecOwrtParamT*)param; sec = SaCkptSectionFind(replica, &(secOwrtParam->sectionID)); if (sec == NULL) { cl_log(LOG_INFO, "Can not find section"); retVal = SA_ERR_INVALID_PARAM; break; } retVal = SaCkptSectionOverwrite(replica, sec, dataLength, data); if (retVal == SA_OK) { index = sec->dataIndex; replica->checkpointSize += dataLength - sec->dataLength[index]; /* commit the update */ SaCkptFree((void**)&(sec->data[index])); sec->dataLength[index]= 0; sec->dataIndex = (index + 1) % 2; sec->dataLength[sec->dataIndex] = dataLength; sec->dataUpdateState = OP_STATE_COMMITTED; sec->lastUpdateTime = time_longclock(); } break; default: break; } if (saCkptService->flagVerbose) { strReq = SaCkptReq2String(req); strErr = SaCkptErr2String(retVal); cl_log(LOG_INFO, "Replica %s update, request %s, status %s", replica->checkpointName, strReq, strErr); SaCkptFree((void*)&strReq); SaCkptFree((void*)&strErr); } return retVal;}int SaCkptReplicaUpdPrepare(SaCkptReplicaT* replica, SaCkptReqT req, int dataLength, void* data, int paramLength, void* param){ SaCkptSectionT* sec = NULL; SaCkptReqSecCrtParamT* secCrtParam = NULL; SaCkptReqSecDelParamT* secDelParam = NULL; SaCkptReqSecWrtParamT* secWrtParam = NULL; SaCkptReqSecOwrtParamT* secOwrtParam = NULL; int retVal = SA_OK; char* strReq = NULL; char* strErr = NULL; switch (req) { case REQ_SEC_CRT: secCrtParam = (SaCkptReqSecCrtParamT*)param; retVal = SaCkptSectionCreate(replica, secCrtParam, dataLength, data, &sec); if (retVal != SA_OK) { break; } sec->sectionState = STATE_CREATE_PREPARED; break; case REQ_SEC_DEL: secDelParam = (SaCkptReqSecDelParamT*)param; sec = SaCkptSectionFind(replica, &(secDelParam->sectionID)); if (sec == NULL) { if(saCkptService->flagVerbose){ cl_log(LOG_INFO, "Can not find section %s \n", secDelParam->sectionID.id); } retVal = SA_ERR_NOT_EXIST; break; } sec->sectionState = STATE_DELETE_PREPARED; break; case REQ_SEC_WRT: secWrtParam = (SaCkptReqSecWrtParamT*)param; sec = SaCkptSectionFind(replica, &(secWrtParam->sectionID)); if (sec == NULL) { if(saCkptService->flagVerbose){ cl_log(LOG_INFO, "Can not find section %s \n", secWrtParam->sectionID.id); } retVal = SA_ERR_INVALID_PARAM; break; } retVal = SaCkptSectionWrite(replica, sec, secWrtParam->offset, dataLength, data); if (retVal == SA_OK) { sec->dataUpdateState = OP_STATE_PREPARED; } break; case REQ_SEC_OWRT: secOwrtParam = (SaCkptReqSecOwrtParamT*)param; sec = SaCkptSectionFind(replica, &(secOwrtParam->sectionID)); if (sec == NULL) { if(saCkptService->flagVerbose){ cl_log(LOG_INFO, "Can not find section %s \n", secOwrtParam->sectionID.id); } retVal = SA_ERR_INVALID_PARAM; break; } retVal = SaCkptSectionOverwrite(replica, sec, dataLength, data); if (retVal == SA_OK) { sec->dataUpdateState = OP_STATE_PREPARED; } break; default: break; } if (saCkptService->flagVerbose) { strReq = SaCkptReq2String(req); strErr = SaCkptErr2String(retVal); cl_log(LOG_INFO, "Replica %s update prepared, request %s, status %s", replica->checkpointName, strReq, strErr); SaCkptFree((void*)&strReq); SaCkptFree((void*)&strErr); } return retVal;}int SaCkptReplicaUpdCommit(SaCkptReplicaT* replica, SaCkptReqT req, int dataLength, void* data, int paramLength, void* param){ SaCkptSectionT* sec = NULL; SaCkptReqSecCrtParamT* secCrtParam = NULL; SaCkptReqSecDelParamT* secDelParam = NULL;/* SaCkptReqSecReadParamT* secReadParam = NULL; */ SaCkptReqSecWrtParamT* secWrtParam = NULL; SaCkptReqSecOwrtParamT* secOwrtParam = NULL; char* strReq = NULL; char* strErr = NULL; int retVal = SA_OK; int index = 0; switch (req) { case REQ_SEC_CRT: secCrtParam = (SaCkptReqSecCrtParamT*)param; sec = SaCkptSectionFind(replica, &(secCrtParam->sectionID)); if (sec == NULL) { cl_log(LOG_INFO, "Can not find section"); retVal = SA_ERR_NOT_EXIST; break; } if (sec->sectionState != STATE_CREATE_PREPARED) { cl_log(LOG_ERR, "Section create commit: not prepared"); retVal = SA_ERR_FAILED_OPERATION; break; } sec->sectionState = OP_STATE_COMMITTED; /* update replica */ replica->checkpointSize += sec->dataLength[sec->dataIndex]; replica->sectionNumber++; break; case REQ_SEC_DEL: secDelParam = (SaCkptReqSecDelParamT*)param; sec = SaCkptSectionFind(replica, &(secDelParam->sectionID)); if (sec == NULL) { cl_log(LOG_INFO, "Can not find section"); retVal = SA_ERR_NOT_EXIST; break; } if (sec->sectionState != STATE_DELETE_PREPARED) { cl_log(LOG_ERR, "Section delete commit: not prepared"); retVal = SA_ERR_FAILED_OPERATION; break; } /* update replica */ replica->sectionList = g_list_remove( replica->sectionList, (gpointer)sec); replica->sectionNumber--; replica->checkpointSize -= sec->dataLength[sec->dataIndex]; if (saCkptService->flagVerbose) { char* strSectionID = NULL; strSectionID = SaCkptSectionId2String(sec->sectionID); cl_log(LOG_INFO, "Commit: section %s deleted from replica %s", sec->sectionID.id, replica->checkpointName); SaCkptFree((void*)&strSectionID); } /* free section */ if (sec->data[0] != NULL) { SaCkptFree((void**)&(sec->data[0])); } if (sec->data[1] != NULL) { SaCkptFree((void**)&(sec->data[1])); } SaCkptFree((void*)&sec); break; case REQ_SEC_WRT: secWrtParam = (SaCkptReqSecWrtParamT*)param; sec = SaCkptSectionFind(replica, &(secWrtParam->sectionID)); if (sec == NULL) { cl_log(LOG_INFO, "Can not find section"); retVal = SA_ERR_NOT_EXIST; break; } if (sec->dataUpdateState != OP_STATE_PREPARED) { cl_log(LOG_ERR, "Section write commit: not prepared"); retVal = SA_ERR_FAILED_OPERATION; break; } index = sec->dataIndex; /* update replica */ if ((secWrtParam->offset + dataLength) > sec->dataLength[index]){ replica->checkpointSize += secWrtParam->offset + dataLength - sec->dataLength[index]; sec->dataLength[(index+1)%2] = secWrtParam->offset + dataLength; } else { sec->dataLength[(index+1)%2] = sec->dataLength[sec->dataIndex]; } /* commit the update */ SaCkptFree((void**)&(sec->data[index])); sec->dataLength[index] = 0; sec->dataIndex = (index + 1) % 2; sec->dataUpdateState = OP_STATE_COMMITTED; sec->lastUpdateTime = time_longclock(); break; case REQ_SEC_OWRT: secOwrtParam = (SaCkptReqSecOwrtParamT*)param; sec = SaCkptSectionFind(replica, &(secOwrtParam->sectionID)); if (sec == NULL) { cl_log(LOG_INFO, "Can not find section"); retVal = SA_ERR_NOT_EXIST; break; } if (sec->dataUpdateState != OP_STATE_PREPARED) { cl_log(LOG_ERR, "Section overwrite commit: not prepared"); retVal = SA_ERR_FAILED_OPERATION; break; } index = sec->dataIndex; /* update replica */ replica->checkpointSize += dataLength - sec->dataLength[index]; /* commit the update */ SaCkptFree((void**)&(sec->data[index])); sec->dataLength[index]= 0; sec->dataIndex = (index + 1) % 2; sec->dataLength[sec->dataIndex] = dataLength;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -