📄 message.c
字号:
if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Send operation to pending list"); } SaCkptOperationStartTimer(ckptOp); break; } replica->flagReplicaLock = TRUE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s locked", replica->checkpointName); } /* * if replica is opened with SA_CKPT_WR_ACTIVE_REPLICA * update active replica and return */ if (replica->createFlag & SA_CKPT_WR_ACTIVE_REPLICA) { /* update active replica */ retVal = SaCkptReplicaUpdate(replica, ckptMsg->clientRequest, ckptMsg->dataLength, ckptMsg->data, ckptMsg->paramLength, ckptMsg->param); ckptMsg->msgSubtype = M_CKPT_UPD_REPLY; ckptMsg->retVal = retVal; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); if (retVal != SA_OK) { replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } break; } /* send msg to all the nodes except itself */ ckptMsg->msgSubtype = M_CKPT_UPD_BCAST; nodeList = g_list_copy(replica->nodeList); list = nodeList; while (list != NULL) { state = (SaCkptStateT*)list->data; if (!strcmp(state->nodeName, saCkptService->nodeName)) { nodeList = g_list_remove( nodeList, (gpointer)state); break; } list = list->next; } SaCkptMessageMulticast(ckptMsg, nodeList); g_list_free(nodeList); replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } }else { g_hash_table_insert(replica->operationHash, (gpointer)&(ckptOp->operationNO), (gpointer)ckptOp); ckptOp->state = OP_STATE_STARTED; ckptMsg->msgSubtype = M_CKPT_UPD_PREPARE_BCAST; SaCkptMessageMulticast(ckptMsg, ckptOp->stateList); SaCkptOperationStartTimer(ckptOp); } break; /* * after the active node has updated and committed its * checkpoint, it will broadcast this message to ask * other standby node to update their checkpoint. * * if the update on the standby nodes is not successful, * the data of this checkpoint will be marked as invalid * * this is not two phase commit since the active node has * already committed * * this message do not need reply */ case M_CKPT_UPD_BCAST: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } SaCkptReplicaUpdate(replica, ckptMsg->clientRequest, ckptMsg->dataLength, ckptMsg->data, ckptMsg->paramLength, ckptMsg->param); /* * update the standby replica * FIXME: */ replica->nextOperationNumber = ckptMsg->operationNO + 1; /* do not send reply */ break; /* * the beginning of two phase commit algorithm */ case M_CKPT_UPD_PREPARE_BCAST: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } retVal = SaCkptReplicaUpdPrepare(replica, ckptMsg->clientRequest, ckptMsg->dataLength, ckptMsg->data, ckptMsg->paramLength, ckptMsg->param); ckptMsg->msgSubtype = M_CKPT_UPD_PREPARE_BCAST_REPLY; ckptMsg->retVal = retVal; SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName); break; /* * the reply message of M_CKPT_UPD_PREPARE_BCAST */ case M_CKPT_UPD_PREPARE_BCAST_REPLY: SACKPTMESSAGEVALIDATEOP(ckptMsg); if (ckptOp->state != OP_STATE_STARTED) { cl_log(LOG_INFO, "Op state error, ignore message"); break; } if (ckptMsg->retVal != SA_OK) { finished = TRUE; ckptOp->state = OP_STATE_ROLLBACKED; } else { finished = SaCkptOperationFinished( ckptMsg->fromNodeName, OP_STATE_PREPARED, ckptOp->stateList); if (finished == TRUE) { ckptOp->state = OP_STATE_PREPARED; } } if (finished == TRUE) { if (ckptOp->state == OP_STATE_ROLLBACKED) { ckptMsg->msgSubtype = M_CKPT_UPD_ROLLBACK_BCAST; } if (ckptOp->state == OP_STATE_PREPARED) { ckptMsg->msgSubtype = M_CKPT_UPD_COMMIT_BCAST; } SaCkptOperationStopTimer(ckptOp); SaCkptMessageMulticast(ckptMsg, ckptOp->stateList); } break; /* * the update commit broadcast message */ case M_CKPT_UPD_COMMIT_BCAST: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } retVal = SaCkptReplicaUpdCommit(replica, ckptMsg->clientRequest, ckptMsg->dataLength, ckptMsg->data, ckptMsg->paramLength, ckptMsg->param); replica->nextOperationNumber = ckptMsg->operationNO + 1; ckptMsg->msgSubtype = M_CKPT_UPD_COMMIT_BCAST_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName); break; /* * the reply message to M_CKPT_UPD_COMMIT_BCAST */ case M_CKPT_UPD_COMMIT_BCAST_REPLY: SACKPTMESSAGEVALIDATEOP(ckptMsg); if (ckptOp->state != OP_STATE_PREPARED) { cl_log(LOG_INFO, "Op state error, ignore message"); break; } finished = SaCkptOperationFinished( ckptMsg->fromNodeName, OP_STATE_COMMITTED, ckptOp->stateList); if (finished == TRUE) { ckptMsg->msgSubtype = M_CKPT_UPD_REPLY; /* update reply message do not need data */ if (ckptMsg->dataLength > 0) { SaCkptFree((void**)&(ckptMsg->data)); ckptMsg->data = NULL; ckptMsg->dataLength = 0; } SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); /* unlock replica */ replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } SaCkptOperationRemove(&ckptOp); } break; /* * the rollback message */ case M_CKPT_UPD_ROLLBACK_BCAST: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } retVal = SaCkptReplicaUpdRollback(replica, ckptMsg->clientRequest, ckptMsg->dataLength, ckptMsg->data, ckptMsg->paramLength, ckptMsg->param); ckptMsg->msgSubtype = M_CKPT_UPD_ROLLBACK_BCAST_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName); break; /* * the reply message to M_CKPT_UPD_ROLLBACK_BCAST */ case M_CKPT_UPD_ROLLBACK_BCAST_REPLY: SACKPTMESSAGEVALIDATEOP(ckptMsg); if (ckptOp->state != OP_STATE_ROLLBACKED) { cl_log(LOG_INFO, "Op state error, ignore message"); break; } finished = SaCkptOperationFinished( ckptMsg->fromNodeName, OP_STATE_ROLLBACKED, ckptOp->stateList); if (finished == TRUE) { ckptMsg->msgSubtype = M_CKPT_UPD_REPLY; /* update reply message do not need data */ if (ckptMsg->dataLength > 0) { SaCkptFree((void**)&(ckptMsg->data)); ckptMsg->data = NULL; ckptMsg->dataLength = 0; } SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); /* unlock replica */ replica->flagReplicaLock = FALSE; if (saCkptService->flagVerbose) { cl_log(LOG_INFO, "Replica %s unlocked", replica->checkpointName); } SaCkptOperationRemove(&ckptOp); } break; /* * The read (and also udpate) operation has to be sent to active * node first, so all these operations can be serialized */ case M_CKPT_READ: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } if (!replica->flagIsActive) { cl_log(LOG_INFO, "Standby replica, ignore message"); break; } if (replica->flagReplicaLock != TRUE) { retVal = SaCkptReplicaRead(replica, &(ckptMsg->dataLength), &(ckptMsg->data), ckptMsg->paramLength, ckptMsg->param); ckptMsg->retVal = retVal; ckptMsg->msgSubtype = M_CKPT_READ_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); } else { ckptOp = SaCkptOperationCreate( ckptMsg, replica); replica->pendingOperationList = g_list_append( replica->pendingOperationList, (gpointer)ckptOp); cl_log(LOG_INFO, "Send read operation to pending list"); SaCkptOperationStartTimer(ckptOp); } break; /* * sync message * on receive this message, if the replica is not lock and no * other pending operations, the active replica reply it with * OK. Else, the active replica add it to the the pending list */ case M_CKPT_SYNC: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } if (!replica->flagIsActive) { cl_log(LOG_INFO, "Standby replica, ignore message"); break; } if ((replica->flagReplicaLock != TRUE) && (g_list_length( replica->pendingOperationList) == 0)) { ckptMsg->retVal = SA_OK; ckptMsg->msgSubtype = M_CKPT_SYNC_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); } else { ckptOp = SaCkptOperationCreate( ckptMsg, replica); replica->pendingOperationList = g_list_append( replica->pendingOperationList, (gpointer)ckptOp); cl_log(LOG_INFO, "Send sync operation to pending list"); SaCkptOperationStartTimer(ckptOp); } break; /* * set active broadcast message * on receive this message, all the replica stop sending * requests to the active replica. */ case M_CKPT_ACT_SET_BCAST: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } replica->flagReplicaPending = TRUE; cl_log(LOG_INFO, "Replica %s stop sending requests", replica->checkpointName); if (!replica->flagIsActive) { cl_log(LOG_INFO, "Standby replica, ignore message"); break; } if ((replica->flagReplicaLock != TRUE) && (g_list_length( replica->pendingOperationList) == 0)) { ckptMsg->retVal = SA_OK; ckptMsg->msgSubtype = M_CKPT_ACT_SET_BCAST_REPLY; SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName); } else { ckptOp = SaCkptOperationCreate( ckptMsg, replica); replica->pendingOperationList = g_list_append( replica->pendingOperationList, (gpointer)ckptOp); cl_log(LOG_INFO, "Send act_set operation to pending list"); SaCkptOperationStartTimer(ckptOp); } break; case M_CKPT_ACT_SET_BCAST_REPLY: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } SACKPTMESSAGEVALIDATEREQ(ckptMsg); replica->flagIsActive = TRUE; strcpy(replica->activeNodeName, saCkptService->nodeName); cl_log(LOG_INFO, "checkpoint %s is set as active replica", replica->checkpointName); replica->flagReplicaPending = FALSE; cl_log(LOG_INFO, "Replica %s resume sending requests", replica->checkpointName); ckptMsg->retVal = SA_OK; ckptMsg->msgSubtype = M_CKPT_ACT_SET_FINISH_BCAST; SaCkptMessageBroadcast(ckptMsg); /* stop timer and send back response */ SaCkptRequestStopTimer(ckptReq); ckptResp = SaCkptResponseCreate(ckptReq); ckptResp->resp->retVal = ckptMsg->retVal; SaCkptResponseSend(&ckptResp); break; case M_CKPT_ACT_SET_FINISH_BCAST: if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; } if (strcmp(saCkptService->nodeName, ckptMsg->clientHostName) == 0) { cl_log(LOG_INFO, "Send from itself, ignore message"); break; } replica->flagIsActive = FALSE; strcpy(replica->activeNodeName, ckptMsg->clientHostName); cl_log(LOG_INFO, "Active node of replica %s has been switched to %s", replica->checkpointName, replica->activeNodeName); replica->flagReplicaPending = FALSE; cl_log(LOG_INFO, "Replica %s resume sending requests", replica->checkpointName); break; case M_CKPT_UPD_REPLY: case M_CKPT_READ_REPLY: case M_CKPT_SYNC_REPLY:#if 0 if (replica == NULL) { cl_log(LOG_INFO, "No replica, ignore message"); break; }#endif SACKPTMESSAGEVALIDATEREQ(ckptMsg); SaCkptRequestStopTimer(ckptReq); ckptResp = SaCkptResponseCreate(ckptReq); ckptResp->resp->retVal = ckptMsg->retVal; if ((ckptMsg->msgSubtype == M_CKPT_READ_REPLY) && (ckptMsg->dataLength > 0)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -