📄 operation.c
字号:
OPERATION_TIMEOUT * 1000000, SaCkptOperationTimeout, (gpointer)ckptOp); if (saCkptService->flagVerbose) { strOp = SaCkptOp2String(ckptOp->operation); cl_log(LOG_INFO, "Start timer %u for op %d (%s)", ckptOp->timeoutTag, ckptOp->operationNO, strOp); SaCkptFree((void*)&strOp); } } return;}/* stop the timer for the operation */void SaCkptOperationStopTimer(SaCkptOperationT* ckptOp){ char* strOp = NULL; if (ckptOp->timeoutTag > 0) { if (saCkptService->flagVerbose) { strOp = SaCkptOp2String(ckptOp->operation); cl_log(LOG_INFO, "delete timer %u for op %d (%s)", ckptOp->timeoutTag, ckptOp->operationNO, strOp); SaCkptFree((void*)&strOp); } g_source_remove(ckptOp->timeoutTag); ckptOp->timeoutTag = 0; } return;}/* whether the operation finished or not */intSaCkptOperationFinished(char* fromNodeName, SaCkptOpStateT opState, GList* list){ int finished = TRUE; SaCkptStateT* state = NULL; while (list != NULL) { state = (SaCkptStateT*)list->data; if (!strcmp(state->nodeName, fromNodeName)) { state->state = opState; } else { /* * Shouldn't state->state be the same type * as opState? FIXME or comment me */ if ((SaCkptOpStateT)state->state != opState) { finished = FALSE; } } list = list->next; } return finished;}char* SaCkptOp2String(SaCkptOpT op){ char* strOp = NULL; char* strTemp = NULL; strTemp = (char*)SaCkptMalloc(64); SACKPTASSERT(strTemp != NULL); switch (op) { case OP_NULL: strcpy(strTemp, "OP_NULL"); break; case OP_CKPT_OPEN: strcpy(strTemp, "OP_CKPT_OPEN"); break; case OP_RPLC_CRT: strcpy(strTemp, "OP_RPLC_CRT"); break; case OP_RPLC_ADD: strcpy(strTemp, "OP_RPLC_ADD"); break; case OP_CKPT_UPD: strcpy(strTemp, "OP_CKPT_UPD"); break; case OP_CKPT_READ: strcpy(strTemp, "OP_CKPT_READ"); break; case OP_CKPT_ULNK: strcpy(strTemp, "OP_CKPT_ULNK"); break; case OP_CKPT_SYNC: strcpy(strTemp, "OP_CKPT_SYNC"); break; case OP_CKPT_ACT_SET: strcpy(strTemp, "OP_CKPT_ACT_SET"); break; } strOp = (char*)SaCkptMalloc(strlen(strTemp)+1); if (strOp == NULL) { return NULL; } memcpy(strOp, strTemp, strlen(strTemp)+1); SaCkptFree((void*)&strTemp); return strOp;}/* Continue the operation after a node failure*/void SaCkptOperationNodeFailure(gpointer key, gpointer value, gpointer userdata){ SaCkptOperationT* ckptOp = NULL; SaCkptMessageT* ckptMsg = NULL; SaCkptReplicaT* replica = NULL; SaCkptStateT* state = NULL; char* strNodeName = NULL; GList* list = NULL; int finished = TRUE; int opState = -1; ckptOp = (SaCkptOperationT*)value; replica = ckptOp->replica; strNodeName = (char*)userdata; /* * if the operation is started by the failed node * remove the operation directly */ if (!strcmp(ckptOp->clientHostName, strNodeName)) { SaCkptOperationRemove(&ckptOp); return; } /* only OP_RPLC_ADD and OP_CKPT_UPD need to be processed */ if ((ckptOp->operation == OP_RPLC_ADD) || (ckptOp->operation == OP_CKPT_UPD)) { list = ckptOp->stateList; finished = TRUE; while (list != NULL) { state = (SaCkptStateT*)list->data; if (state == NULL) { list = list->next; continue; } if (!strcmp(state->nodeName, strNodeName)) { ckptOp->stateList = g_list_remove( ckptOp->stateList, (gpointer)state); list = ckptOp->stateList; continue; } else { if (opState == -1) { opState = state->state; } else { if (opState != state->state) { finished = FALSE; } } } list = list->next; } if (finished) { switch(opState) { case OP_STATE_PREPARED: ckptOp->state = OP_STATE_PREPARED; /* broadcast commit message */ if (ckptOp->operation == OP_RPLC_ADD) { ckptMsg = SaCkptMessageCreateOp(ckptOp, M_RPLC_ADD_COMMIT_BCAST); } else { ckptMsg = SaCkptMessageCreateOp(ckptOp, M_CKPT_UPD_COMMIT_BCAST); } SaCkptMessageMulticast(ckptMsg, ckptOp->stateList); SaCkptFree((void*)&ckptMsg); break; case OP_STATE_COMMITTED: /* send back response */ ckptOp->state = OP_STATE_COMMITTED; if (ckptOp->operation == OP_RPLC_ADD) { ckptMsg = SaCkptMessageCreateOp(ckptOp, M_RPLC_ADD_REPLY); } else { ckptMsg = SaCkptMessageCreateOp(ckptOp, M_CKPT_UPD_REPLY); } SaCkptMessageSend(ckptMsg, ckptOp->clientHostName); SaCkptFree((void*)&ckptMsg); replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } SaCkptOperationRemove(&ckptOp); break; case OP_STATE_ROLLBACKED: /* send back response */ ckptOp->state = OP_STATE_ROLLBACKED; if (ckptOp->operation == OP_RPLC_ADD) { ckptMsg = SaCkptMessageCreateOp(ckptOp, M_RPLC_ADD_REPLY); } else { ckptMsg = SaCkptMessageCreateOp(ckptOp, M_CKPT_UPD_REPLY); } ckptMsg->retVal = SA_ERR_FAILED_OPERATION; SaCkptMessageSend(ckptMsg, ckptOp->clientHostName); SaCkptFree((void*)&ckptMsg); replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } SaCkptOperationRemove(&ckptOp); break; default: break; } } } return;}SaCkptResponseT* createLocalReplical(SaCkptRequestT* ckptReq){ SaCkptResponseT* ckptResp = NULL; SaCkptOpenCheckpointT* openCkpt = ckptReq->openCkpt; SaCkptClientT* client = ckptReq->client; SaCkptReplicaT* replica = NULL; SaCkptReqOpenParamT* openParam = NULL; char* strReq = NULL; if(ckptReq == NULL) { cl_log(LOG_ERR,"NULL ckptReq in createLocalReplica"); return NULL; } if(ckptReq->clientRequest->req != REQ_CKPT_OPEN && ckptReq->clientRequest->req != REQ_CKPT_OPEN_ASYNC){ cl_log(LOG_ERR,"Not open req in createLocalReplica"); return NULL; } strReq = SaCkptReq2String(ckptReq->clientRequest->req); cl_log(LOG_INFO, "client %d, request %lu (%s), create a local replica", ckptReq->clientRequest->clientHandle, ckptReq->clientRequest->requestNO, strReq); SaCkptFree((void*)&strReq); ckptResp = SaCkptResponseCreate(ckptReq); openParam = ckptReq->clientRequest->reqParam; if (openParam->openFlag & SA_CKPT_CHECKPOINT_COLOCATED) { /* create local replica */ replica = SaCkptReplicaCreate(openParam); SACKPTASSERT(replica != NULL); replica->replicaState= STATE_CREATE_COMMITTED; /* open the local replica */ openCkpt = SaCkptCheckpointOpen(client, replica, openParam); SACKPTASSERT(openCkpt != NULL); ckptReq->openCkpt = openCkpt; /* create and send client response */ ckptResp->resp->dataLength = sizeof(SaCkptCheckpointHandleT); ckptResp->resp->data = SaCkptMalloc( sizeof(SaCkptCheckpointHandleT)); memcpy(ckptResp->resp->data, &(openCkpt->checkpointHandle), sizeof(openCkpt->checkpointHandle)); ckptResp->resp->retVal = SA_OK; } else { ckptResp->resp->retVal = SA_ERR_TIMEOUT; } return ckptResp;}voidupdateReplicaPendingOption(SaCkptReplicaT *replica, const char *hostName){ GList * list = NULL; GList * opStateList = NULL; SaCkptOperationT* ckptOp = NULL; SaCkptStateT* state = NULL; gboolean nodeExisted = FALSE; list = replica->pendingOperationList; while(list != NULL){ ckptOp = (SaCkptOperationT*)list->data; opStateList = ckptOp->stateList; nodeExisted = FALSE; /* we should check if this node name exist already, maybe just for debug*/ while(opStateList != NULL){ state = (SaCkptStateT* )opStateList->data; if(!strncmp(state->nodeName, hostName,SA_MAX_NAME_LENGTH )){ nodeExisted = TRUE; cl_log(LOG_ERR, "hostName %s exist already in updateReplicaPendingOption\n",hostName); break; } opStateList = opStateList->next; } if(!nodeExisted){ opStateList = ckptOp->stateList; state = (SaCkptStateT*)ha_malloc(sizeof(SaCkptStateT)); strncpy(state->nodeName,hostName,SA_MAX_NAME_LENGTH); state->nodeName[SA_MAX_NAME_LENGTH-1]='\0'; state->state = OP_STATE_STARTED; ckptOp->stateList = g_list_append(ckptOp->stateList, (gpointer)state); } list = list->next; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -