📄 message.c
字号:
strHamsg = msg2string(haMsg); cl_log(LOG_DEBUG, "Broadcast cluster message\n%s", strHamsg); SaCkptFree((void**)&strHamsg); }#endif } else { cl_log(LOG_ERR, "Broadcast message to cluster failed"); } ha_msg_del(haMsg); } return rc;}/* send message to multiple nodes */int SaCkptMessageMulticast(SaCkptMessageT* ckptMsg, GList* list){ ll_cluster_t *hb = saCkptService->heartbeat; struct ha_msg *haMsg = NULL; SaCkptStateT *state = NULL; char *strMsgSubtype = NULL; int rc; strcpy(ckptMsg->fromNodeName, saCkptService->nodeName); haMsg = SaCkptMessage2Hamsg(ckptMsg); if (haMsg == NULL) { cl_log(LOG_ERR, "Convert ckptmsg to hamsg failed"); return HA_OK; } while (list != NULL) { state = (SaCkptStateT*)list->data; rc = hb->llc_ops->sendnodemsg(hb, haMsg, state->nodeName); if (rc == HA_OK) { if (saCkptService->flagVerbose) { strMsgSubtype = SaCkptMsgSubtype2String( ckptMsg->msgSubtype); cl_log(LOG_INFO, "Send message to %s, type %s, subtype %s", state->nodeName, ckptMsg->msgType, strMsgSubtype); SaCkptFree((void*)&strMsgSubtype); } } else { cl_log(LOG_ERR, "Send message to %s failed", state->nodeName); } list = list->next; } #ifdef CKPTDEBUG { char * strHamsg = NULL; strHamsg = msg2string(haMsg); cl_log(LOG_DEBUG, "Multicast message\n%s", strHamsg); SaCkptFree((void**)&strHamsg); }#endif ha_msg_del(haMsg); return HA_OK;}/* * convert ckpt message subtype to string * used for debug purpose */char* SaCkptMsgSubtype2String(SaCkptMsgSubtypeT msgSubtype){ char* strMsgSubtype = NULL; char *strTemp = NULL; strTemp = (char*)SaCkptMalloc(64); SACKPTASSERT(strTemp != NULL); switch (msgSubtype) { case M_CKPT_CREATED: strcpy(strTemp, "M_CKPT_CREATED"); break; case M_CKPT_CREATED_REPLY: strcpy(strTemp, "M_CKPT_CREATED_REPLY"); break; case M_CKPT_OPEN_BCAST: strcpy(strTemp, "M_CKPT_OPEN_BCAST"); break; case M_CKPT_OPEN_BCAST_REPLY: strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY"); break; case M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA: strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA"); break; case M_CKPT_OPEN_BCAST_REPLY_STANDBY: strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_STANDBY"); break; case M_CKPT_OPEN_BCAST_REPLY_RACE_HIGH: strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_RACE_HIGH"); break; case M_CKPT_OPEN_BCAST_REPLY_RACE_LOW: strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_RACE_LOW"); break; case M_CKPT_OPEN_BCAST_REPLY_SELF: strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_SELF"); break; case M_CKPT_OPEN_BCAST_REPLY_EARLIER: strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_EARLIER"); break; case M_RPLC_CRT: strcpy(strTemp, "M_RPLC_CRT"); break; case M_RPLC_CRT_REPLY: strcpy(strTemp, "M_RPLC_CRT_REPLY"); break; case M_RPLC_ADD: strcpy(strTemp, "M_RPLC_ADD"); break; case M_RPLC_ADD_REPLY: strcpy(strTemp, "M_RPLC_ADD_REPLY"); break; case M_RPLC_ADD_PREPARE_BCAST: strcpy(strTemp, "M_RPLC_ADD_PREPARE_BCAST"); break; case M_RPLC_ADD_PREPARE_BCAST_REPLY: strcpy(strTemp, "M_RPLC_ADD_PREPARE_BCAST_REPLY"); break; case M_RPLC_ADD_COMMIT_BCAST: strcpy(strTemp, "M_RPLC_ADD_COMMIT_BCAST"); break; case M_RPLC_ADD_COMMIT_BCAST_REPLY: strcpy(strTemp, "M_RPLC_ADD_COMMIT_BCAST_REPLY"); break; case M_RPLC_ADD_ROLLBACK_BCAST: strcpy(strTemp, "M_RPLC_ADD_ROLLBACK_BCAST"); break; case M_RPLC_ADD_ROLLBACK_BCAST_REPLY: strcpy(strTemp, "M_RPLC_ADD_ROLLBACK_BCAST_REPLY"); break; case M_CKPT_OPEN_REMOTE: strcpy(strTemp, "M_CKPT_OPEN_REMOTE"); break; case M_CKPT_OPEN_REMOTE_REPLY: strcpy(strTemp, "M_CKPT_OPEN_REMOTE_REPLY"); break; case M_CKPT_CLOSE_REMOTE: strcpy(strTemp, "M_CKPT_CLOSE_REMOTE"); break; case M_CKPT_CLOSE_REMOTE_REPLY: strcpy(strTemp, "M_CKPT_CLOSE_REMOTE_REPLY"); break; case M_CKPT_CKPT_CREATE_BCAST: strcpy(strTemp, "M_CKPT_CKPT_CREATE_BCAST"); break; case M_CKPT_CKPT_CREATE_BCAST_REPLY: strcpy(strTemp, "M_CKPT_CKPT_CREATE_BCAST_REPLY"); break; case M_RPLC_DEL: strcpy(strTemp, "M_RPLC_DEL"); break; case M_RPLC_DEL_REPLY: strcpy(strTemp, "M_RPLC_DEL_REPLY"); break; case M_RPLC_DEL_BCAST: strcpy(strTemp, "M_RPLC_DEL_BCAST"); break; case M_RPLC_DEL_BCAST_REPLY: strcpy(strTemp, "M_RPLC_DEL_BCAST_REPLY"); break; case M_CKPT_UPD: strcpy(strTemp, "M_CKPT_UPD"); break; case M_CKPT_UPD_REPLY: strcpy(strTemp, "M_CKPT_UPD_REPLY"); break; case M_CKPT_UPD_PREPARE_BCAST: strcpy(strTemp, "M_CKPT_UPD_PREPARE_BCAST"); break; case M_CKPT_UPD_PREPARE_BCAST_REPLY: strcpy(strTemp, "M_CKPT_UPD_PREPARE_BCAST_REPLY"); break; case M_CKPT_UPD_COMMIT_BCAST: strcpy(strTemp, "M_CKPT_UPD_COMMIT_BCAST"); break; case M_CKPT_UPD_COMMIT_BCAST_REPLY: strcpy(strTemp, "M_CKPT_UPD_COMMIT_BCAST_REPLY"); break; case M_CKPT_UPD_ROLLBACK_BCAST: strcpy(strTemp, "M_CKPT_UPD_ROLLBACK_BCAST"); break; case M_CKPT_UPD_ROLLBACK_BCAST_REPLY: strcpy(strTemp, "M_CKPT_UPD_ROLLBACK_BCAST_REPLY"); break; case M_CKPT_UPD_BCAST: strcpy(strTemp, "M_CKPT_UPD_BCAST"); break; case M_CKPT_UPD_BCAST_REPLY: strcpy(strTemp, "M_CKPT_UPD_BCAST_REPLY"); break; case M_CKPT_SYNC: strcpy(strTemp, "M_CKPT_SYNC"); break; case M_CKPT_SYNC_REPLY: strcpy(strTemp, "M_CKPT_SYNC_REPLY"); break; case M_CKPT_ACT_SET_BCAST: strcpy(strTemp, "M_CKPT_ACT_SET_BCAST"); break; case M_CKPT_ACT_SET_BCAST_REPLY: strcpy(strTemp, "M_CKPT_ACT_SET_BCAST_REPLY"); break; case M_CKPT_ACT_SET_FINISH_BCAST: strcpy(strTemp, "M_CKPT_ACT_SET_FINISH_BCAST"); break; case M_CKPT_READ: strcpy(strTemp, "M_CKPT_READ"); break; case M_CKPT_READ_REPLY: strcpy(strTemp, "M_CKPT_READ_REPLY"); break; case M_CKPT_UNLINK_BCAST: strcpy(strTemp, "M_CKPT_UNLINK_BCAST"); break; default: strcpy(strTemp, "NULL"); } strMsgSubtype = SaCkptMalloc(strlen(strTemp)+1); if (strMsgSubtype == NULL) { return NULL; } memcpy(strMsgSubtype, strTemp, strlen(strTemp)+1); SaCkptFree((void*)&strTemp); return strMsgSubtype;}/* create ckpt message according to a request */SaCkptMessageT* SaCkptMessageCreateReq(SaCkptRequestT* ckptReq, SaCkptMsgSubtypeT msgSubtype){ SaCkptMessageT* ckptMsg = NULL; ckptMsg = (SaCkptMessageT*)SaCkptMalloc(sizeof(SaCkptMessageT)); if (ckptMsg == NULL) { return NULL; } strcpy(ckptMsg->msgType, T_CKPT); ckptMsg->msgSubtype = msgSubtype; ckptMsg->msgVersion = saCkptService->version; ckptMsg->retVal = SA_OK; if (ckptReq->openCkpt != NULL) { strcpy(ckptMsg->checkpointName, ckptReq->openCkpt->checkpointName); strcpy(ckptMsg->activeNodeName, ckptReq->openCkpt->activeNodeName); } ckptMsg->clientHandle = ckptReq->clientRequest->clientHandle; strcpy(ckptMsg->fromNodeName, saCkptService->nodeName); strcpy(ckptMsg->clientHostName, saCkptService->nodeName); ckptMsg->clientRequest = ckptReq->clientRequest->req; ckptMsg->clientRequestNO = ckptReq->clientRequest->requestNO; ckptMsg->param = ckptReq->clientRequest->reqParam; ckptMsg->paramLength = ckptReq->clientRequest->reqParamLength; ckptMsg->data = ckptReq->clientRequest->data; ckptMsg->dataLength = ckptReq->clientRequest->dataLength; return ckptMsg;}/* create ckpt message according to an operation */SaCkptMessageT* SaCkptMessageCreateOp(SaCkptOperationT* ckptOp, SaCkptMsgSubtypeT msgSubtype){ SaCkptMessageT* ckptMsg = NULL; ckptMsg = (SaCkptMessageT*)SaCkptMalloc(sizeof(SaCkptMessageT)); if (ckptMsg == NULL) { return NULL; } strcpy(ckptMsg->msgType, T_CKPT); ckptMsg->msgSubtype = msgSubtype; ckptMsg->msgVersion = saCkptService->version; ckptMsg->retVal = SA_OK; strcpy(ckptMsg->checkpointName, ckptOp->replica->checkpointName); strcpy(ckptMsg->activeNodeName, ckptOp->replica->activeNodeName); ckptMsg->clientHandle = ckptOp->clientHandle; strcpy(ckptMsg->fromNodeName, saCkptService->nodeName); strcpy(ckptMsg->clientHostName, ckptOp->clientHostName); ckptMsg->clientRequest = ckptOp->clientRequest; ckptMsg->clientRequestNO = ckptOp->clientRequestNO; ckptMsg->operation = ckptOp->operation; ckptMsg->operationNO = ckptOp->operationNO; ckptMsg->param = ckptOp->param; ckptMsg->paramLength = ckptOp->paramLength; ckptMsg->data = ckptOp->data; ckptMsg->dataLength = ckptOp->dataLength; return ckptMsg;}/* create an operation on the active replica according to the ckpt message */SaCkptOperationT* SaCkptOperationCreate(SaCkptMessageT* ckptMsg, SaCkptReplicaT* replica){ SaCkptOperationT* ckptOp = NULL; GList* list = NULL; SaCkptStateT* state = NULL; ckptOp = (SaCkptOperationT*)SaCkptMalloc(sizeof(SaCkptOperationT)); if (ckptOp == NULL) { return NULL; } ckptOp->replica = replica; ckptOp->clientHandle = ckptMsg->clientHandle; strcpy(ckptOp->clientHostName, ckptMsg->clientHostName); ckptOp->clientRequest = ckptMsg->clientRequest; ckptOp->clientRequestNO = ckptMsg->clientRequestNO; ckptOp->stateList = NULL; switch (ckptMsg->msgSubtype) { case M_RPLC_CRT: ckptOp->operation = OP_RPLC_CRT; break; case M_RPLC_ADD: ckptOp->operation = OP_RPLC_ADD; break; case M_CKPT_UPD: ckptOp->operation = OP_CKPT_UPD; break; case M_CKPT_READ: ckptOp->operation = OP_CKPT_READ; break; default: ckptOp->operation = OP_NULL; } replica->nextOperationNumber++; if (replica->nextOperationNumber <= 0) { replica->nextOperationNumber = 1; } ckptOp->operationNO = replica->nextOperationNumber; if (ckptMsg->paramLength > 0) { ckptOp->paramLength = ckptMsg->paramLength; ckptOp->param = SaCkptMalloc(ckptMsg->paramLength); SACKPTASSERT(ckptOp->param != NULL); memcpy(ckptOp->param, ckptMsg->param, ckptMsg->paramLength); } if (ckptMsg->dataLength > 0) { ckptOp->dataLength = ckptMsg->dataLength; ckptOp->data = SaCkptMalloc(ckptMsg->dataLength); SACKPTASSERT(ckptOp->data != NULL); memcpy(ckptOp->data, ckptMsg->data, ckptMsg->dataLength); } ckptOp->state = OP_STATE_PENDING; list = replica->nodeList; while (list != NULL) { state = (SaCkptStateT*)SaCkptMalloc(sizeof(SaCkptStateT)); SACKPTASSERT(state != NULL); memcpy(state, list->data, sizeof(SaCkptStateT)); state->state = OP_STATE_STARTED; ckptOp->stateList = g_list_append(ckptOp->stateList, (gpointer)state); list = list->next; } ckptOp->timeoutTag = 0; /* update message */ ckptMsg->operation = ckptOp->operation; ckptMsg->operationNO = ckptOp->operationNO; return ckptOp;}/* free the ckpt message */void SaCkptMessageDelete(SaCkptMessageT** pCkptMsg){ SaCkptMessageT* ckptMsg = *pCkptMsg; if (ckptMsg->paramLength > 0) { SaCkptFree((void**)&(ckptMsg->param)); } if (ckptMsg->dataLength > 0) { SaCkptFree((void**)&(ckptMsg->data)); } SaCkptFree((void*)&ckptMsg); *pCkptMsg = NULL; return;}/** keep this open request */void initOpenReqNodeStatus(SaCkptClientRequestT *clientReq){ SaCkptReqOpenParamT *openParam = NULL; if(clientReq == NULL){ cl_log(LOG_ERR, "NULL clientReq in initOpenReqNodeStatus\n"); return; } if((clientReq->req != REQ_CKPT_OPEN) && (clientReq->req != REQ_CKPT_OPEN_ASYNC)){ cl_log(LOG_ERR, "Not Open Request in initOpenReqNodeStatus\n"); return; } openParam = (SaCkptReqOpenParamT *)clientReq->reqParam; g_hash_table_foreach(saCkptService->nodeStatusHash, getNodeCkptStatus,openParam); g_hash_table_insert(saCkptService->openRequestHash, (gpointer)openParam->ckptName.value,(gpointer)clientReq);}/* set the open request's status according to node status*/voidgetNodeCkptStatus(gpointer key,gpointer value, gpointer user_data){ saCkptNodeInfo *ckptNodeInfo = value; const char * nodeName = key; SaCkptReqOpenParamT * openParam = user_data; saOpenNodeStatusT *status = NULL; status = (saOpenNodeStatusT *)ha_malloc( sizeof(saOpenNodeStatusT)); if(status == NULL){ /*FIXME how to report error on hash fucntions*/ cl_log(LOG_INFO,"malloc error in getNodeCkptStatus\n"); return; } strncpy(status->nodeName, nodeName, SA_MAX_NAME_LENGTH); if(ckptNodeInfo->ckptStatus == CKPT_RUNNING){ status->status = RES_NO_RESPONSE ; }else{ status->status = RES_NOT_RUN; } openParam->nodeReponse = g_list_append(openParam->nodeReponse,status);}gboolean isLoopMessage(SaCkptMessageT * ckptMsg){ if( ckptMsg == NULL) return FALSE; if( strncmp(ckptMsg->fromNodeName, saCkptService->nodeName, SA_MAX_NAME_LENGTH)){ return FALSE; } return TRUE;}/* Check if a open request exist already*/SaCkptClientRequestT *isOnOpenProcess(SaCkptReqOpenParamT *openParam ){ SaCkptClientRequestT * clientReq = NULL; if(openParam == NULL){ if(saCkptService->flagVerbose){ cl_log(LOG_INFO,"NULL openParam on isOnOpenProcess\n"); return 0; } } clientReq = (SaCkptClientRequestT *)g_hash_table_lookup( \ saCkptService->openRequestHash, \ (gconstpointer)openParam->ckptName.value); return clientReq ;}gbooleanisHighPriority(const SaCkptMessageT *ckptMsg ){ if(ckptMsg == NULL){ cl_log(LOG_INFO,"NULL ckptMsg on isHighPriority\n"); return FALSE; } if( strncmp(ckptMsg->fromNodeName, saCkptService->nodeName, SA_MAX_NAME_LENGTH) < 0){ return FALSE; } return TRUE;}gintupdateOpenProcessQueue(const SaCkptMessageT *ckptMsg ,saOpenResponseTypeT *type){ SaCkptClientRequestT *clientReq = NULL; SaCkptReqOpenParamT *openParam = NULL; saOpenResponseTypeT nodeStatus = RES_NO_REPLICA; gint result = 1; if(ckptMsg == NULL){ cl_log(
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -