📄 message.c
字号:
ckptResp->resp->dataLength = ckptMsg->dataLength; ckptResp->resp->data = SaCkptMalloc(ckptMsg->dataLength); SACKPTASSERT(ckptResp->resp->data != NULL); memcpy(ckptResp->resp->data, ckptMsg->data, ckptMsg->dataLength); } SaCkptResponseSend(&ckptResp); break; default: break; } /* * other heartbeat messages */ } else if(!strcmp(ckptMsg->msgType, T_STATUS)) { /* * if node is dead, remove it */ if (!strcmp(ckptMsg->hamsgStatus, "dead")) { cl_log(LOG_INFO, "Node %s dead", ckptMsg->fromNodeName); /* * for each replica, remove the dead node from its * nodeList */ g_hash_table_foreach(saCkptService->replicaHash, SaCkptReplicaNodeFailure, (gpointer)ckptMsg->fromNodeName); /* *FIXME update saCkptService->nodeStatus */ /* * for each sent client request, redo it since all the * operations are reentriable */ g_hash_table_foreach(saCkptService->clientHash, SaCkptClientNodeFailure, (gpointer)ckptMsg->fromNodeName); } } else { cl_log(LOG_INFO, "Unrecognized message %s ", ckptMsg->msgType); } SaCkptMessageDelete(&ckptMsg); /* * if there are still other messages waiting for processing, * continue to process them. */ SaCkptClusterMsgProcess(); return TRUE;}/* * receive message from heartbeat daemon * it read message from the heartbeat daemon and * convert it to ckpt message */SaCkptMessageT* SaCkptMessageReceive(){ ll_cluster_t *hb = saCkptService->heartbeat; struct ha_msg *haMsg = NULL; SaCkptMessageT *ckptMsg = NULL; if (!hb->llc_ops->msgready(hb)) { return NULL; } haMsg = hb->llc_ops->readmsg(hb, TRUE); if (haMsg == NULL) { return NULL; }#ifdef CKPTDEBUG if (saCkptService->flagVerbose) { char * msgString = NULL; msgString = msg2string(haMsg); cl_log(LOG_DEBUG, "Receive message\n%s", msgString); SaCkptFree((void**)&msgString); }#endif ckptMsg = SaHamsg2CkptMessage(haMsg); if (saCkptService->flagVerbose) { char* strSubtype = NULL; char* strErr = NULL; strSubtype = SaCkptMsgSubtype2String(ckptMsg->msgSubtype); strErr = SaCkptErr2String(ckptMsg->retVal); cl_log(LOG_INFO, "Message from %s, type %s, subtype %s, status %s", ckptMsg->fromNodeName, ckptMsg->msgType, strSubtype, strErr); SaCkptFree((void*)&strSubtype); SaCkptFree((void*)&strErr); } ha_msg_del(haMsg); return ckptMsg;}/* convert ckpt messaage to Linux-HA message format */struct ha_msg* SaCkptMessage2Hamsg(SaCkptMessageT* ckptMsg) { struct ha_msg *haMsg = NULL; char *strVersion = NULL; char *strTemp = NULL; int rc; strVersion = (char*)SaCkptMalloc(32); SACKPTASSERT(strVersion != NULL); strTemp = (char*)SaCkptMalloc(MAXMSG); SACKPTASSERT(strTemp != NULL); haMsg = ha_msg_new(30); SACKPTASSERT(haMsg != NULL); rc = ha_msg_mod(haMsg, F_TYPE, T_CKPT); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_TYPE); } rc = ha_msg_mod(haMsg, F_ORIG, saCkptService->nodeName); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_ORIG); } strTemp[0] = 0; sprintf(strTemp, "%d", ckptMsg->msgSubtype); rc = ha_msg_mod(haMsg, F_CKPT_SUBTYPE, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_SUBTYPE); } SaCkptPackVersion(strVersion, &(ckptMsg->msgVersion)); rc = ha_msg_mod(haMsg, F_CKPT_VERSION, strVersion); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_VERSION); } if (ckptMsg->checkpointName[0] != 0) { rc = ha_msg_mod(haMsg, F_CKPT_CHECKPOINT_NAME, ckptMsg->checkpointName); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_CHECKPOINT_NAME); } } if (ckptMsg->clientHostName[0] != 0) { rc = ha_msg_mod(haMsg, F_CKPT_CLIENT_HOSTNAME, ckptMsg->clientHostName); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_CLIENT_HOSTNAME); } } if (ckptMsg->clientHandle > 0) { strTemp[0] = 0; sprintf(strTemp, "%d", ckptMsg->clientHandle); rc = ha_msg_mod(haMsg, F_CKPT_CLIENT_HANDLE, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_CLIENT_HANDLE); } } if (ckptMsg->clientRequest > 0) { strTemp[0] = 0; sprintf(strTemp, "%d", ckptMsg->clientRequest); rc = ha_msg_mod(haMsg, F_CKPT_CLIENT_REQUEST, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_CLIENT_REQUEST); } } if (ckptMsg->clientRequestNO > 0) { strTemp[0] = 0; sprintf(strTemp, "%d", ckptMsg->clientRequestNO); rc = ha_msg_mod(haMsg, F_CKPT_CLIENT_REQUEST_NO, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_CLIENT_REQUEST_NO); } } if (ckptMsg->activeNodeName[0] != 0) { rc = ha_msg_mod(haMsg, F_CKPT_ACTIVE_NODENAME, ckptMsg->activeNodeName); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_ACTIVE_NODENAME); } } if (ckptMsg->operation > 0) { strTemp[0] = 0; sprintf(strTemp, "%d", ckptMsg->operation); rc = ha_msg_mod(haMsg, F_CKPT_OPERATION, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_OPERATION); } } if (ckptMsg->operationNO > 0) { strTemp[0] = 0; sprintf(strTemp, "%d", ckptMsg->operationNO); rc = ha_msg_mod(haMsg, F_CKPT_OPERATION_NO, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_OPERATION_NO); } } if (ckptMsg->paramLength > 0) { strTemp[0] = 0; sprintf(strTemp, "%d", (int)ckptMsg->paramLength); rc = ha_msg_mod(haMsg, F_CKPT_PARAM_LENGTH, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_PARAM_LENGTH); } strTemp[0] = 0; binary_to_base64(ckptMsg->param, ckptMsg->paramLength, strTemp, MAXMSG); rc = ha_msg_mod(haMsg, F_CKPT_PARAM, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_PARAM); } } if (ckptMsg->dataLength > 0) { strTemp[0] = 0; sprintf(strTemp, "%d", (int)ckptMsg->dataLength); rc = ha_msg_mod(haMsg, F_CKPT_DATA_LENGTH, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_DATA_LENGTH); } strTemp[0] = 0; binary_to_base64(ckptMsg->data, ckptMsg->dataLength, strTemp, MAXMSG); rc = ha_msg_mod(haMsg, F_CKPT_DATA, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_DATA); } } strTemp[0] = 0; sprintf(strTemp, "%d", ckptMsg->retVal); rc = ha_msg_mod(haMsg, F_CKPT_RETVAL, strTemp); if (rc != HA_OK) { cl_log(LOG_ERR, "Add field %s to hamsg failed", F_CKPT_RETVAL); } SaCkptFree((void*)&strTemp); SaCkptFree((void*)&strVersion); return haMsg;}/* convert Linux-HA message to ckpt message */SaCkptMessageT* SaHamsg2CkptMessage(struct ha_msg* haMsg){ const char *strType = NULL; const char *strSubtype = NULL; const char *strVersion = NULL; const char *strOrig = NULL; const char *strCheckpointName = NULL; const char *strActiveHostname = NULL; const char *strClientHostname = NULL; const char *strClientHandle = NULL; const char *strClientRequest = NULL; const char *strClientRequestNO = NULL; const char *strOperation = NULL; const char *strOperationNO = NULL; const char *strParam = NULL; const char *strParamLength = NULL; const char *strData = NULL; const char *strDataLength = NULL; const char *strRc = NULL; const char *strHostname = NULL; const char *strStatus = NULL; SaCkptMessageT *ckptMsg = NULL; ckptMsg = SaCkptMalloc(sizeof(SaCkptMessageT)); if (ckptMsg == NULL) { return NULL; } strType = ha_msg_value(haMsg, F_TYPE); if (strType != NULL) { strcpy(ckptMsg->msgType, strType); } strSubtype = ha_msg_value(haMsg, F_CKPT_SUBTYPE); if (strSubtype != NULL) { ckptMsg->msgSubtype = atoi(strSubtype); } strVersion = ha_msg_value(haMsg, F_CKPT_VERSION); if (strVersion != NULL) { SaCkptUnpackVersion(strVersion, &(ckptMsg->msgVersion)); } strOrig = ha_msg_value(haMsg, F_ORIG); if (strOrig != NULL) { strcpy(ckptMsg->fromNodeName, strOrig); } strCheckpointName = ha_msg_value(haMsg, F_CKPT_CHECKPOINT_NAME); if (strCheckpointName != NULL) { strcpy(ckptMsg->checkpointName, strCheckpointName); } strActiveHostname = ha_msg_value(haMsg, F_CKPT_ACTIVE_NODENAME); if (strActiveHostname != NULL) { strcpy(ckptMsg->activeNodeName, strActiveHostname); } strClientHostname = ha_msg_value(haMsg, F_CKPT_CLIENT_HOSTNAME); if (strClientHostname != NULL) { strcpy(ckptMsg->clientHostName, strClientHostname); } strClientHandle = ha_msg_value(haMsg, F_CKPT_CLIENT_HANDLE); if (strClientHandle != NULL) { ckptMsg->clientHandle = atoi(strClientHandle); } strClientRequest = ha_msg_value(haMsg, F_CKPT_CLIENT_REQUEST); if (strClientRequest != NULL) { ckptMsg->clientRequest = atoi(strClientRequest); } strClientRequestNO = ha_msg_value(haMsg, F_CKPT_CLIENT_REQUEST_NO); if (strClientRequestNO != NULL) { ckptMsg->clientRequestNO = atoi(strClientRequestNO); } strOperation = ha_msg_value(haMsg, F_CKPT_OPERATION); if (strOperation != NULL) { ckptMsg->operation = atoi(strOperation); } strOperationNO = ha_msg_value(haMsg, F_CKPT_OPERATION_NO); if (strOperationNO != NULL) { ckptMsg->operationNO = atoi(strOperationNO); } strParam = ha_msg_value(haMsg, F_CKPT_PARAM); strParamLength = ha_msg_value(haMsg, F_CKPT_PARAM_LENGTH); if (strParamLength != NULL) { ckptMsg->paramLength = atoi(strParamLength); ckptMsg->param = SaCkptMalloc(ckptMsg->paramLength); SACKPTASSERT(ckptMsg->param != NULL); base64_to_binary(strParam, strlen(strParam), ckptMsg->param, ckptMsg->paramLength); } strData = ha_msg_value(haMsg, F_CKPT_DATA); strDataLength = ha_msg_value(haMsg, F_CKPT_DATA_LENGTH); if (strDataLength != NULL) { ckptMsg->dataLength = atoi(strDataLength); ckptMsg->data = SaCkptMalloc(ckptMsg->dataLength); SACKPTASSERT(ckptMsg->data != NULL); base64_to_binary(strData, strlen(strData), ckptMsg->data, ckptMsg->dataLength); } strRc = ha_msg_value(haMsg, F_CKPT_RETVAL); if (strRc != NULL) { ckptMsg->retVal = atoi(strRc); } strHostname = ha_msg_value(haMsg, F_ORIG); if (strHostname != NULL) { strcpy(ckptMsg->fromNodeName, strHostname); } strStatus = ha_msg_value(haMsg, F_STATUS); if (strStatus != NULL) { strcpy(ckptMsg->hamsgStatus, strStatus); } return ckptMsg;}/* send message to one node */int SaCkptMessageSend(SaCkptMessageT* ckptMsg, char* nodename){ ll_cluster_t *hb = saCkptService->heartbeat; struct ha_msg *haMsg = NULL; char *strSubtype = NULL; char *strErr = NULL; int rc; strcpy(ckptMsg->fromNodeName, saCkptService->nodeName); haMsg = SaCkptMessage2Hamsg(ckptMsg); if (haMsg == NULL) { cl_log(LOG_ERR, "Convert ckptmsg to hamsg failed"); return HA_OK; } else { rc = hb->llc_ops->sendnodemsg(hb, haMsg, nodename); if (rc == HA_OK) { if (saCkptService->flagVerbose) { strSubtype = SaCkptMsgSubtype2String( ckptMsg->msgSubtype); strErr = SaCkptErr2String(ckptMsg->retVal); cl_log(LOG_INFO, "Send message to %s, type %s, subtype %s, status %s", nodename, ckptMsg->msgType, strSubtype, strErr); SaCkptFree((void*)&strSubtype); SaCkptFree((void*)&strErr); }#ifdef CKPTDEBUG { char * strHamsg = NULL; strHamsg = msg2string(haMsg); cl_log(LOG_DEBUG, "Send cluster message\n%s", strHamsg); SaCkptFree((void**)&strHamsg); }#endif } else { cl_log(LOG_ERR, "Send message to %s failed", nodename); } ha_msg_del(haMsg); haMsg = NULL; } return rc;}/* send message to all the cluster nodes */int SaCkptMessageBroadcast(SaCkptMessageT* ckptMsg){ ll_cluster_t *hb = saCkptService->heartbeat; struct ha_msg *haMsg = NULL; char *strMsgSubtype = NULL; int rc; strcpy(ckptMsg->fromNodeName, saCkptService->nodeName); haMsg = SaCkptMessage2Hamsg(ckptMsg); if (haMsg == NULL) { cl_log(LOG_ERR, "Convert ckptmsg to hamsg failed"); return HA_OK; } else { rc = hb->llc_ops->sendclustermsg(hb, haMsg); if (rc == HA_OK) { if (saCkptService->flagVerbose) { strMsgSubtype = SaCkptMsgSubtype2String( ckptMsg->msgSubtype); cl_log(LOG_INFO, "Broadcast message, type %s, subtype %s", ckptMsg->msgType, strMsgSubtype); SaCkptFree((void*)&strMsgSubtype); }#ifdef CKPTDEBUG { char * strHamsg = NULL;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -