⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 message.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
				ckptResp->resp->dataLength = 					ckptMsg->dataLength;				ckptResp->resp->data = 					SaCkptMalloc(ckptMsg->dataLength);				SACKPTASSERT(ckptResp->resp->data != NULL);				memcpy(ckptResp->resp->data,					ckptMsg->data,					ckptMsg->dataLength);			} 			SaCkptResponseSend(&ckptResp);			break;		default:						break;		}	/*	 * other heartbeat messages	 */	} else if(!strcmp(ckptMsg->msgType, T_STATUS)) {		/* 		 * if node is dead, remove it 		 */		if (!strcmp(ckptMsg->hamsgStatus, "dead")) {			cl_log(LOG_INFO, 				"Node %s dead", 				ckptMsg->fromNodeName);							/*			 * for each replica, remove the dead node from its			 * nodeList			 */			g_hash_table_foreach(saCkptService->replicaHash,				SaCkptReplicaNodeFailure,				(gpointer)ckptMsg->fromNodeName);			/*			 *FIXME update saCkptService->nodeStatus			 */			/* 			 * for each sent client request, redo it since all the 			 * operations are reentriable			 */			 g_hash_table_foreach(saCkptService->clientHash,			 	SaCkptClientNodeFailure,			 	(gpointer)ckptMsg->fromNodeName);		}	} else {		cl_log(LOG_INFO, 			"Unrecognized message %s ", ckptMsg->msgType);	}		SaCkptMessageDelete(&ckptMsg);	/*	 * if there are still other messages waiting for processing,	 * continue to process them.	 */	SaCkptClusterMsgProcess();	return TRUE;}/*  * receive message from heartbeat daemon  * it read message from the heartbeat daemon and  * convert it to ckpt message  */SaCkptMessageT* SaCkptMessageReceive(){	ll_cluster_t	*hb		= saCkptService->heartbeat;	struct ha_msg	*haMsg		= NULL;	SaCkptMessageT	*ckptMsg 	= NULL;	if (!hb->llc_ops->msgready(hb)) {		return NULL;	}		haMsg = hb->llc_ops->readmsg(hb, TRUE);	if (haMsg == NULL) {		return NULL;	}#ifdef CKPTDEBUG		if (saCkptService->flagVerbose) {		char *	msgString = NULL;				msgString = msg2string(haMsg);		cl_log(LOG_DEBUG, 			"Receive message\n%s", 			msgString);		SaCkptFree((void**)&msgString);	}#endif		ckptMsg = SaHamsg2CkptMessage(haMsg);	if (saCkptService->flagVerbose) {		char* strSubtype = NULL;		char* strErr = NULL;		strSubtype = SaCkptMsgSubtype2String(ckptMsg->msgSubtype);		strErr = SaCkptErr2String(ckptMsg->retVal);		cl_log(LOG_INFO, 			"Message from %s, type %s, subtype %s, status %s",			ckptMsg->fromNodeName,			ckptMsg->msgType, strSubtype, strErr);		SaCkptFree((void*)&strSubtype);		SaCkptFree((void*)&strErr);	}		ha_msg_del(haMsg);	return ckptMsg;}/* convert ckpt messaage to Linux-HA message format */struct ha_msg* SaCkptMessage2Hamsg(SaCkptMessageT* ckptMsg) {	struct ha_msg	*haMsg = NULL;	char *strVersion = NULL;	char *strTemp = NULL;	int 	rc;	strVersion = (char*)SaCkptMalloc(32);	SACKPTASSERT(strVersion != NULL);	strTemp = (char*)SaCkptMalloc(MAXMSG);	SACKPTASSERT(strTemp != NULL);		haMsg = ha_msg_new(30);	SACKPTASSERT(haMsg != NULL);		rc = ha_msg_mod(haMsg, F_TYPE, T_CKPT);	if (rc != HA_OK) {		cl_log(LOG_ERR, 			"Add field %s to hamsg failed",			F_TYPE);	}		rc = ha_msg_mod(haMsg, F_ORIG, saCkptService->nodeName);	if (rc != HA_OK) {		cl_log(LOG_ERR, 			"Add field %s to hamsg failed",			F_ORIG);	}		strTemp[0] = 0;	sprintf(strTemp, "%d", ckptMsg->msgSubtype);	rc = ha_msg_mod(haMsg, F_CKPT_SUBTYPE, strTemp);	if (rc != HA_OK) {		cl_log(LOG_ERR, 			"Add field %s to hamsg failed",			F_CKPT_SUBTYPE);	}		SaCkptPackVersion(strVersion, &(ckptMsg->msgVersion));	rc = ha_msg_mod(haMsg, F_CKPT_VERSION, strVersion);	if (rc != HA_OK) {		cl_log(LOG_ERR, 			"Add field %s to hamsg failed",			F_CKPT_VERSION);	}		if (ckptMsg->checkpointName[0] != 0) {		rc = ha_msg_mod(haMsg, F_CKPT_CHECKPOINT_NAME, 			ckptMsg->checkpointName);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_CHECKPOINT_NAME);		}	}		if (ckptMsg->clientHostName[0] != 0) {		rc = ha_msg_mod(haMsg, F_CKPT_CLIENT_HOSTNAME, 			ckptMsg->clientHostName);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_CLIENT_HOSTNAME);		}	}		if (ckptMsg->clientHandle > 0) {		strTemp[0] = 0;		sprintf(strTemp, "%d", ckptMsg->clientHandle);		rc = ha_msg_mod(haMsg, F_CKPT_CLIENT_HANDLE, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_CLIENT_HANDLE);		}	}		if (ckptMsg->clientRequest > 0) {		strTemp[0] = 0;		sprintf(strTemp, "%d", ckptMsg->clientRequest);		rc = ha_msg_mod(haMsg, F_CKPT_CLIENT_REQUEST, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_CLIENT_REQUEST);		}	}		if (ckptMsg->clientRequestNO > 0) {		strTemp[0] = 0;		sprintf(strTemp, "%d", ckptMsg->clientRequestNO);		rc = ha_msg_mod(haMsg, F_CKPT_CLIENT_REQUEST_NO, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_CLIENT_REQUEST_NO);		}	}		if (ckptMsg->activeNodeName[0] != 0) {		rc = ha_msg_mod(haMsg, F_CKPT_ACTIVE_NODENAME, 			ckptMsg->activeNodeName);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_ACTIVE_NODENAME);		}	}		if (ckptMsg->operation > 0) {		strTemp[0] = 0;		sprintf(strTemp, "%d", ckptMsg->operation);		rc = ha_msg_mod(haMsg, F_CKPT_OPERATION, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_OPERATION);		}	}		if (ckptMsg->operationNO > 0) {		strTemp[0] = 0;		sprintf(strTemp, "%d", ckptMsg->operationNO);		rc = ha_msg_mod(haMsg, F_CKPT_OPERATION_NO, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_OPERATION_NO);		}	}		if (ckptMsg->paramLength > 0) {		strTemp[0] = 0;		sprintf(strTemp, "%d", (int)ckptMsg->paramLength);		rc = ha_msg_mod(haMsg, F_CKPT_PARAM_LENGTH, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_PARAM_LENGTH);		}			strTemp[0] = 0;		binary_to_base64(ckptMsg->param, ckptMsg->paramLength, 			strTemp, MAXMSG);		rc = ha_msg_mod(haMsg, F_CKPT_PARAM, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_PARAM);		}	}		if (ckptMsg->dataLength > 0) {		strTemp[0] = 0;		sprintf(strTemp, "%d", (int)ckptMsg->dataLength);		rc = ha_msg_mod(haMsg, F_CKPT_DATA_LENGTH, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_DATA_LENGTH);		}				strTemp[0] = 0;		binary_to_base64(ckptMsg->data, ckptMsg->dataLength, 			strTemp, MAXMSG);		rc = ha_msg_mod(haMsg, F_CKPT_DATA, strTemp);		if (rc != HA_OK) {			cl_log(LOG_ERR, 				"Add field %s to hamsg failed",				F_CKPT_DATA);		}	}	strTemp[0] = 0;	sprintf(strTemp, "%d", ckptMsg->retVal);	rc = ha_msg_mod(haMsg, F_CKPT_RETVAL, strTemp);	if (rc != HA_OK) {		cl_log(LOG_ERR, 			"Add field %s to hamsg failed",			F_CKPT_RETVAL);	}	SaCkptFree((void*)&strTemp);	SaCkptFree((void*)&strVersion);	return haMsg;}/* convert Linux-HA message to ckpt message */SaCkptMessageT* SaHamsg2CkptMessage(struct ha_msg* haMsg){	const char	*strType		= NULL;	const char	*strSubtype		= NULL;	const char	*strVersion		= NULL;	const char	*strOrig		= NULL;	const char	*strCheckpointName	= NULL;	const char	*strActiveHostname	= NULL;	const char	*strClientHostname	= NULL;	const char	*strClientHandle	= NULL;	const char	*strClientRequest	= NULL;	const char	*strClientRequestNO	= NULL;	const char	*strOperation		= NULL;	const char	*strOperationNO 	= NULL;	const char	*strParam		= NULL;	const char	*strParamLength 	= NULL;	const char	*strData		= NULL;	const char	*strDataLength		= NULL;	const char	*strRc		= NULL;		const char	*strHostname		= NULL;	const char	*strStatus		= NULL;		SaCkptMessageT	*ckptMsg = NULL;		ckptMsg = SaCkptMalloc(sizeof(SaCkptMessageT));	if (ckptMsg == NULL) {		return NULL;	}		strType = ha_msg_value(haMsg, F_TYPE);	if (strType != NULL) {		strcpy(ckptMsg->msgType, strType);	}		strSubtype = ha_msg_value(haMsg, F_CKPT_SUBTYPE);	if (strSubtype != NULL) {		ckptMsg->msgSubtype = atoi(strSubtype);	}		strVersion = ha_msg_value(haMsg, F_CKPT_VERSION);	if (strVersion != NULL) {		SaCkptUnpackVersion(strVersion, &(ckptMsg->msgVersion));	}	strOrig = ha_msg_value(haMsg, F_ORIG);	if (strOrig != NULL) {		strcpy(ckptMsg->fromNodeName, strOrig);	}		strCheckpointName = ha_msg_value(haMsg, F_CKPT_CHECKPOINT_NAME);	if (strCheckpointName != NULL) {		strcpy(ckptMsg->checkpointName, strCheckpointName);	}		strActiveHostname = ha_msg_value(haMsg, F_CKPT_ACTIVE_NODENAME);	if (strActiveHostname != NULL) {		strcpy(ckptMsg->activeNodeName, strActiveHostname);	}		strClientHostname = ha_msg_value(haMsg, F_CKPT_CLIENT_HOSTNAME);	if (strClientHostname != NULL) {		strcpy(ckptMsg->clientHostName, strClientHostname);	}		strClientHandle = ha_msg_value(haMsg, F_CKPT_CLIENT_HANDLE);	if (strClientHandle != NULL) {		ckptMsg->clientHandle = atoi(strClientHandle);	}		strClientRequest = ha_msg_value(haMsg, F_CKPT_CLIENT_REQUEST);	if (strClientRequest != NULL) {		ckptMsg->clientRequest = atoi(strClientRequest);	}		strClientRequestNO = ha_msg_value(haMsg, F_CKPT_CLIENT_REQUEST_NO);	if (strClientRequestNO != NULL) {		ckptMsg->clientRequestNO = atoi(strClientRequestNO);	}		strOperation = ha_msg_value(haMsg, F_CKPT_OPERATION);	if (strOperation != NULL) {		ckptMsg->operation = atoi(strOperation);	}		strOperationNO = ha_msg_value(haMsg, F_CKPT_OPERATION_NO);	if (strOperationNO != NULL) {		ckptMsg->operationNO = atoi(strOperationNO);	}		strParam = ha_msg_value(haMsg, F_CKPT_PARAM);	strParamLength = ha_msg_value(haMsg, F_CKPT_PARAM_LENGTH);	if (strParamLength != NULL) {		ckptMsg->paramLength = atoi(strParamLength);				ckptMsg->param = SaCkptMalloc(ckptMsg->paramLength);		SACKPTASSERT(ckptMsg->param != NULL);		base64_to_binary(strParam, strlen(strParam), 			ckptMsg->param, ckptMsg->paramLength);	}		strData = ha_msg_value(haMsg, F_CKPT_DATA);	strDataLength = ha_msg_value(haMsg, F_CKPT_DATA_LENGTH);	if (strDataLength != NULL) {		ckptMsg->dataLength = atoi(strDataLength);			ckptMsg->data = SaCkptMalloc(ckptMsg->dataLength);		SACKPTASSERT(ckptMsg->data != NULL);		base64_to_binary(strData, strlen(strData), 			ckptMsg->data, ckptMsg->dataLength);	}		strRc = ha_msg_value(haMsg, F_CKPT_RETVAL);	if (strRc != NULL) {		ckptMsg->retVal = atoi(strRc);	} 		strHostname = ha_msg_value(haMsg, F_ORIG);	if (strHostname != NULL) {		strcpy(ckptMsg->fromNodeName, strHostname);	}		strStatus = ha_msg_value(haMsg, F_STATUS);	if (strStatus != NULL) {		strcpy(ckptMsg->hamsgStatus, strStatus);	}		return ckptMsg;}/* send message to one node */int SaCkptMessageSend(SaCkptMessageT* ckptMsg, char* nodename){	ll_cluster_t	*hb = saCkptService->heartbeat;	struct ha_msg	*haMsg = NULL;	char		*strSubtype = NULL;	char		*strErr = NULL;	int		rc;	strcpy(ckptMsg->fromNodeName, saCkptService->nodeName);	haMsg = SaCkptMessage2Hamsg(ckptMsg);	if (haMsg == NULL) {		cl_log(LOG_ERR, 			"Convert ckptmsg to hamsg failed");		return HA_OK;	} else {		rc =  hb->llc_ops->sendnodemsg(hb, haMsg, nodename);		if (rc == HA_OK) {			if (saCkptService->flagVerbose) {				strSubtype = SaCkptMsgSubtype2String(					ckptMsg->msgSubtype);				strErr = SaCkptErr2String(ckptMsg->retVal);				cl_log(LOG_INFO, 				"Send message to %s, type %s, subtype %s, status %s",				nodename, ckptMsg->msgType, 				strSubtype, strErr);				SaCkptFree((void*)&strSubtype);				SaCkptFree((void*)&strErr);			}#ifdef CKPTDEBUG			{				char *	strHamsg = NULL;								strHamsg = msg2string(haMsg);				cl_log(LOG_DEBUG, 					"Send cluster message\n%s", 					strHamsg);				SaCkptFree((void**)&strHamsg);			}#endif					} else {			cl_log(LOG_ERR, 				"Send message to %s failed", 				nodename);		}				ha_msg_del(haMsg);		haMsg = NULL;	}	return rc;}/* send message to all the cluster nodes */int SaCkptMessageBroadcast(SaCkptMessageT* ckptMsg){	ll_cluster_t	*hb = saCkptService->heartbeat;	struct ha_msg	*haMsg = NULL;	char		*strMsgSubtype = NULL;	int		rc;		strcpy(ckptMsg->fromNodeName, saCkptService->nodeName);	haMsg = SaCkptMessage2Hamsg(ckptMsg);		if (haMsg == NULL) {		cl_log(LOG_ERR, 			"Convert ckptmsg to hamsg failed");		return HA_OK;	} else {		rc = hb->llc_ops->sendclustermsg(hb, haMsg);		if (rc == HA_OK) {			if (saCkptService->flagVerbose) {				strMsgSubtype = SaCkptMsgSubtype2String(					ckptMsg->msgSubtype);				cl_log(LOG_INFO, 				"Broadcast message, type %s, subtype %s",				ckptMsg->msgType, strMsgSubtype);				SaCkptFree((void*)&strMsgSubtype);			}#ifdef CKPTDEBUG						{				char *	strHamsg = NULL;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -