📄 message.c
字号:
SACKPTASSERT(ckptResp->resp->data != NULL); memcpy(ckptResp->resp->data, &(openCkpt->checkpointHandle), ckptResp->resp->dataLength); SaCkptResponseSend(&ckptResp); break; /* * close the remotely opened checkpoint */ case M_CKPT_CLOSE_REMOTE: if (replica == NULL) { cl_log(LOG_ERR, "No replica, ignore message"); break; } if (!replica->flagIsActive) { cl_log(LOG_ERR, "Standby replica, ignore message"); break; } checkpointHandle = *(int*)ckptMsg->data; openCkpt = (SaCkptOpenCheckpointT*)g_hash_table_lookup( saCkptService->openCheckpointHash, (gpointer)&checkpointHandle); if (openCkpt == NULL) { cl_log(LOG_ERR, "No opencheckpoint, ignore message"); break; } ckptMsg->retVal = SaCkptCheckpointClose(&openCkpt); ckptMsg->msgSubtype = M_CKPT_CLOSE_REMOTE_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); break; /* * the reply message to M_CKPT_CLOSE_REMOTE */ case M_CKPT_CLOSE_REMOTE_REPLY: if (replica != NULL) { cl_log(LOG_ERR, "Replica exists, ignore message"); break; } SACKPTMESSAGEVALIDATEREQ(ckptMsg); SaCkptRequestStopTimer(ckptReq); closeParam = ckptReq->clientRequest->reqParam; checkpointHandle = closeParam->checkpointHandle; openCkpt = g_hash_table_lookup( saCkptService->openCheckpointHash, (gpointer)&checkpointHandle); ckptResp = SaCkptResponseCreate(ckptReq); if (openCkpt == NULL) { ckptResp->resp->retVal= SA_ERR_BAD_HANDLE; } else { ckptResp->resp->retVal = SaCkptCheckpointClose(&openCkpt); } SaCkptResponseSend(&ckptResp); break; /* * if the client want to create a local copy of the * checkpoint, it will send this message to ask the * active node to send it the data */ case M_RPLC_CRT: /* FIXME: */ /* if the message size is exceed 1400, break */ /* this message into several messages */ if (replica == NULL) { cl_log(LOG_ERR, "No replica, ignore message"); break; } if (!replica->flagIsActive) { cl_log(LOG_ERR, "Standby replica, ignore message"); break; } ckptOp = SaCkptOperationCreate(ckptMsg, replica); ckptOp->operation= OP_RPLC_CRT; if (replica->flagReplicaLock != TRUE){ /* lock the replica first */ replica->flagReplicaLock = TRUE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s locked", replica->checkpointName); } ckptOp->state = OP_STATE_STARTED; SaCkptReplicaPack(&(ckptMsg->data), &(ckptMsg->dataLength), replica); ckptMsg->msgSubtype = M_RPLC_CRT_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); g_hash_table_insert(replica->operationHash, (gpointer)&(ckptOp->operationNO), (gpointer)ckptOp); } else { replica->pendingOperationList = g_list_append( replica->pendingOperationList, (gpointer)ckptOp); if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Send operation to pending list"); } } SaCkptOperationStartTimer(ckptOp); break; /* * the reply message to M_RPLC_CRT * the active node send its data to the standby node via * this message */ case M_RPLC_CRT_REPLY: if (replica != NULL) { cl_log(LOG_ERR, "Replica exists, ignore message"); break; } SACKPTMESSAGEVALIDATEREQ(ckptMsg); replica = SaCkptReplicaUnpack(ckptMsg->data, ckptMsg->dataLength); g_hash_table_insert(saCkptService->replicaHash, (gpointer)replica->checkpointName, (gpointer)replica); SaCkptFree((void**)&(ckptMsg->data)); ckptMsg->data = NULL; ckptMsg->dataLength = 0; ckptMsg->msgSubtype = M_RPLC_ADD; SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName); ckptReq->operation = OP_RPLC_ADD; break; /* * after create the local copy of the checkpoint, it will send * this message to active node. The active node then tell all * nodes to update their replica list */ case M_RPLC_ADD: SACKPTMESSAGEVALIDATEOP(ckptMsg); ckptOp->operation = OP_RPLC_ADD; state = (SaCkptStateT*)SaCkptMalloc( sizeof(SaCkptStateT)); SACKPTASSERT(state != NULL); state->state = OP_STATE_STARTED; strcpy(state->nodeName, ckptMsg->clientHostName); ckptOp->stateList = g_list_append(ckptOp->stateList, (gpointer)state); ckptMsg->msgSubtype = M_RPLC_ADD_PREPARE_BCAST; SaCkptMessageMulticast(ckptMsg, ckptOp->stateList); break; /* * the active node ask other node to prepare the update */ case M_RPLC_ADD_PREPARE_BCAST: if (replica == NULL) { cl_log(LOG_ERR, "No replica, ignore message"); break; } state = (SaCkptStateT*)SaCkptMalloc( sizeof(SaCkptStateT)); if (state != NULL) { state->state = OP_STATE_PREPARED; strcpy(state->nodeName, ckptMsg->clientHostName); replica->nodeList = g_list_append(replica->nodeList, (gpointer)state); /*should update the request on pending list*/ updateReplicaPendingOption(replica, ckptMsg->clientHostName); ckptMsg->retVal = SA_OK; } else { ckptMsg->retVal = SA_ERR_NO_MEMORY; } ckptMsg->msgSubtype = M_RPLC_ADD_PREPARE_BCAST_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName); break; /* * the reply message to M_RPLC_ADD_PREPARE_BCAST * the standby nodes tell the active node whether the * preparation is successfull or not */ case M_RPLC_ADD_PREPARE_BCAST_REPLY: SACKPTMESSAGEVALIDATEOP(ckptMsg); if (ckptOp->state != OP_STATE_STARTED) { cl_log(LOG_INFO, "Op state error, ignore message"); break; } if (ckptMsg->retVal != SA_OK) { finished = TRUE; ckptOp->state = OP_STATE_ROLLBACKED; } else { finished = SaCkptOperationFinished( ckptMsg->fromNodeName, OP_STATE_PREPARED, ckptOp->stateList); if (finished == TRUE) { ckptOp->state = OP_STATE_PREPARED; } } if (finished == TRUE) { if (ckptOp->state == OP_STATE_ROLLBACKED) { ckptMsg->msgSubtype = M_RPLC_ADD_ROLLBACK_BCAST; } if (ckptOp->state == OP_STATE_PREPARED) { ckptMsg->msgSubtype = M_RPLC_ADD_COMMIT_BCAST; } SaCkptOperationStopTimer(ckptOp); SaCkptMessageMulticast(ckptMsg, ckptOp->stateList); } break; /* * if all the nodes prepared successfully, the active node * tell them to commit the update */ case M_RPLC_ADD_COMMIT_BCAST: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } list = replica->nodeList; while (list != NULL) { state = (SaCkptStateT*)list->data; if (!strcmp(state->nodeName, ckptMsg->clientHostName)) { state->state = OP_STATE_COMMITTED; break; } list = list->next; } ckptMsg->msgSubtype = M_RPLC_ADD_COMMIT_BCAST_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName); break; /* * the reply message to M_RPLC_ADD_COMMIT_BCAST * the commit will always success */ case M_RPLC_ADD_COMMIT_BCAST_REPLY: SACKPTMESSAGEVALIDATEOP(ckptMsg); if (ckptOp->state != OP_STATE_PREPARED) { cl_log(LOG_ERR, "Op state error, ignore message"); break; } finished = SaCkptOperationFinished( ckptMsg->fromNodeName, OP_STATE_COMMITTED, ckptOp->stateList); if (finished == TRUE) { ckptOp->state = OP_STATE_COMMITTED; ckptMsg->msgSubtype = M_RPLC_ADD_REPLY; SaCkptMessageSend(ckptMsg, ckptOp->clientHostName); /* unlock the replica */ replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } SaCkptOperationRemove(&ckptOp); } break; /* * the reply message to M_RPLC_ADD * after got the reply, it will send response the client * application */ case M_RPLC_ADD_REPLY: if (replica == NULL) { cl_log(LOG_ERR, "No replica, ignore message"); break; } SACKPTMESSAGEVALIDATEREQ(ckptMsg); SaCkptRequestStopTimer(ckptReq); ckptResp = SaCkptResponseCreate(ckptReq); ckptResp->resp->retVal = ckptMsg->retVal; if (ckptMsg->retVal != SA_OK) { SaCkptReplicaRemove(&replica); } else { replica->replicaState = STATE_CREATE_COMMITTED; replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } replica->flagPendOperation = FALSE; openParam = ckptReq->clientRequest->reqParam; openCkpt = SaCkptCheckpointOpen(client, replica, openParam); ckptReq->openCkpt = openCkpt; ckptResp->resp->dataLength = sizeof(SaCkptCheckpointHandleT); ckptResp->resp->data = SaCkptMalloc( ckptResp->resp->dataLength); SACKPTASSERT(ckptResp->resp->data != NULL); memcpy(ckptResp->resp->data, &(openCkpt->checkpointHandle), ckptResp->resp->dataLength); } SaCkptResponseSend(&ckptResp); break; /* * one or more nodes cannot update their replica list, so * rollback the operation */ case M_RPLC_ADD_ROLLBACK_BCAST: if (replica == NULL) { cl_log(LOG_ERR, "No replica, ignore message"); break; } if (!strcmp(ckptMsg->clientHostName, saCkptService->nodeName)) { SaCkptReplicaRemove(&replica); } else { list = replica->nodeList; while (list != NULL) { state = (SaCkptStateT*)list->data; if (!strcmp(state->nodeName, ckptMsg->clientHostName)) { replica->nodeList = g_list_remove( replica->nodeList, (gpointer)state); break; } list = list->next; } } ckptMsg->msgSubtype = M_RPLC_ADD_ROLLBACK_BCAST_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName); break; /* * the reply message to M_RPLC_ADD_ROLLBACK_BCAST */ case M_RPLC_ADD_ROLLBACK_BCAST_REPLY: SACKPTMESSAGEVALIDATEOP(ckptMsg); if (ckptOp->state != OP_STATE_ROLLBACKED) { cl_log(LOG_INFO, "Op state error, ignore message"); break; } finished = SaCkptOperationFinished( ckptMsg->fromNodeName, OP_STATE_ROLLBACKED, ckptOp->stateList); if (finished == TRUE) { ckptOp->state = OP_STATE_ROLLBACKED; ckptMsg->msgSubtype = M_RPLC_ADD_REPLY; SaCkptMessageSend(ckptMsg, ckptOp->clientHostName); replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } SaCkptOperationRemove(&ckptOp); } break; /* * after the checkpoint was closed and deleted on one node, * the other nodes have to update their replica list */ case M_RPLC_DEL_BCAST: if (replica == NULL) { cl_log(LOG_ERR, "No replica, ignore message"); break; } list = replica->nodeList; while (list != NULL) { state = (SaCkptStateT*)list->data; if (!strcmp(state->nodeName, ckptMsg->fromNodeName)) { replica->nodeList = g_list_remove( replica->nodeList, (gpointer)state); break; } list = list->next; } if (!strcmp(replica->activeNodeName, ckptMsg->fromNodeName)) { /* FIXME: */ /* got the active replica by election */ /* the most updated replica will be the active */ /* replica */ strcpy(replica->activeNodeName, saCkptService->nodeName); replica->flagIsActive = TRUE; replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); }#if 0 /* FIXME: */ /* if there are started requests on this replica */ /* return SA_ERR_TRY_AGAIN */ g_hash_table_foreach(saCkptService->clientHash, xxxx, (gpointer)replica);#endif } break; /* * Any update request will need to send this message to active * node first * * if the checkpoint has SA_CKPT_WR_ACTIVE_REPLICA flag, the * active node will update and commit first, then send reply * to the client application, no matter the standby nodes * can update or not. This is not two phase commit * * if the checkpoint has SA_CKPT_WR_ALL_REPLICAS flag, it will * update the replicas on all the nodes via two phase commit * algorithm */ case M_CKPT_UPD: if (replica == NULL) { cl_log(LOG_ERR, "No replica, ignore message"); break; } if (!replica->flagIsActive) { cl_log(LOG_ERR, "Standby replica, ignore message"); break; } ckptOp = SaCkptOperationCreate(ckptMsg, replica); SACKPTASSERT (ckptOp!= NULL); if (replica->flagReplicaLock == TRUE) { replica->pendingOperationList = g_list_append( replica->pendingOperationList, (gpointer)ckptOp);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -