⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 message.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
								strHamsg = msg2string(haMsg);				cl_log(LOG_DEBUG, 					"Broadcast cluster message\n%s", 					strHamsg);				SaCkptFree((void**)&strHamsg);			}#endif				} else {			cl_log(LOG_ERR, 				"Broadcast message to cluster failed");		}				ha_msg_del(haMsg);	}	return rc;}/* send message to multiple nodes */int SaCkptMessageMulticast(SaCkptMessageT* ckptMsg, GList* list){	ll_cluster_t	*hb = saCkptService->heartbeat;	struct ha_msg	*haMsg = NULL;	SaCkptStateT	*state = NULL;	char		*strMsgSubtype = NULL;	int 	rc;	strcpy(ckptMsg->fromNodeName, saCkptService->nodeName);	haMsg = SaCkptMessage2Hamsg(ckptMsg);	if (haMsg == NULL) {		cl_log(LOG_ERR, "Convert ckptmsg to hamsg failed");		return HA_OK;	}	while (list != NULL) {		state = (SaCkptStateT*)list->data;				rc = hb->llc_ops->sendnodemsg(hb, haMsg, state->nodeName);		if (rc == HA_OK) {			if (saCkptService->flagVerbose) {				strMsgSubtype = SaCkptMsgSubtype2String(					ckptMsg->msgSubtype);				cl_log(LOG_INFO, 				"Send message to %s, type %s, subtype %s",				state->nodeName,				ckptMsg->msgType, strMsgSubtype);				SaCkptFree((void*)&strMsgSubtype);			}		} else {			cl_log(LOG_ERR, 				"Send message to %s failed", 				state->nodeName);		}				list = list->next;	}	#ifdef CKPTDEBUG	{		char *	strHamsg = NULL;				strHamsg = msg2string(haMsg);		cl_log(LOG_DEBUG, 			"Multicast message\n%s", 			strHamsg);		SaCkptFree((void**)&strHamsg);	}#endif	ha_msg_del(haMsg);		return HA_OK;}/*  * convert ckpt message subtype to string * used for debug purpose */char* SaCkptMsgSubtype2String(SaCkptMsgSubtypeT msgSubtype){	char* strMsgSubtype = NULL;	char *strTemp = NULL;	strTemp = (char*)SaCkptMalloc(64);	SACKPTASSERT(strTemp != NULL);		switch (msgSubtype) {			case M_CKPT_CREATED:		strcpy(strTemp, "M_CKPT_CREATED");		break;	case M_CKPT_CREATED_REPLY:			strcpy(strTemp, "M_CKPT_CREATED_REPLY");		break;	case M_CKPT_OPEN_BCAST:		strcpy(strTemp, "M_CKPT_OPEN_BCAST");		break;	case M_CKPT_OPEN_BCAST_REPLY:		strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY");		break;		case M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA:		strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_NO_REPLICA");		break;		case M_CKPT_OPEN_BCAST_REPLY_STANDBY:		strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_STANDBY");		break;		case M_CKPT_OPEN_BCAST_REPLY_RACE_HIGH:		strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_RACE_HIGH");		break;		case M_CKPT_OPEN_BCAST_REPLY_RACE_LOW:		strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_RACE_LOW");		break;			case M_CKPT_OPEN_BCAST_REPLY_SELF:		strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_SELF");		break;	case M_CKPT_OPEN_BCAST_REPLY_EARLIER:		strcpy(strTemp, "M_CKPT_OPEN_BCAST_REPLY_EARLIER");		break;		case M_RPLC_CRT:		strcpy(strTemp, "M_RPLC_CRT");		break;	case M_RPLC_CRT_REPLY:		strcpy(strTemp, "M_RPLC_CRT_REPLY");		break;	case M_RPLC_ADD:		strcpy(strTemp, "M_RPLC_ADD");		break;	case M_RPLC_ADD_REPLY:		strcpy(strTemp, "M_RPLC_ADD_REPLY");		break;	case M_RPLC_ADD_PREPARE_BCAST:		strcpy(strTemp, "M_RPLC_ADD_PREPARE_BCAST");		break;	case M_RPLC_ADD_PREPARE_BCAST_REPLY:		strcpy(strTemp, "M_RPLC_ADD_PREPARE_BCAST_REPLY");		break;	case M_RPLC_ADD_COMMIT_BCAST:		strcpy(strTemp, "M_RPLC_ADD_COMMIT_BCAST");		break;	case M_RPLC_ADD_COMMIT_BCAST_REPLY:		strcpy(strTemp, "M_RPLC_ADD_COMMIT_BCAST_REPLY");		break;	case M_RPLC_ADD_ROLLBACK_BCAST:		strcpy(strTemp, "M_RPLC_ADD_ROLLBACK_BCAST");		break;	case M_RPLC_ADD_ROLLBACK_BCAST_REPLY:		strcpy(strTemp, "M_RPLC_ADD_ROLLBACK_BCAST_REPLY");		break;	case M_CKPT_OPEN_REMOTE:		strcpy(strTemp, "M_CKPT_OPEN_REMOTE");		break;	case M_CKPT_OPEN_REMOTE_REPLY:		strcpy(strTemp, "M_CKPT_OPEN_REMOTE_REPLY");		break;	case M_CKPT_CLOSE_REMOTE:		strcpy(strTemp, "M_CKPT_CLOSE_REMOTE");		break;	case M_CKPT_CLOSE_REMOTE_REPLY:		strcpy(strTemp, "M_CKPT_CLOSE_REMOTE_REPLY");		break;	case M_CKPT_CKPT_CREATE_BCAST:		strcpy(strTemp, "M_CKPT_CKPT_CREATE_BCAST");		break;	case M_CKPT_CKPT_CREATE_BCAST_REPLY:		strcpy(strTemp, "M_CKPT_CKPT_CREATE_BCAST_REPLY");		break;	case M_RPLC_DEL:		strcpy(strTemp, "M_RPLC_DEL");		break;	case M_RPLC_DEL_REPLY:		strcpy(strTemp, "M_RPLC_DEL_REPLY");		break;	case M_RPLC_DEL_BCAST:		strcpy(strTemp, "M_RPLC_DEL_BCAST");		break;	case M_RPLC_DEL_BCAST_REPLY:		strcpy(strTemp, "M_RPLC_DEL_BCAST_REPLY");		break;	case M_CKPT_UPD:		strcpy(strTemp, "M_CKPT_UPD");		break;	case M_CKPT_UPD_REPLY:		strcpy(strTemp, "M_CKPT_UPD_REPLY");		break;	case M_CKPT_UPD_PREPARE_BCAST:		strcpy(strTemp, "M_CKPT_UPD_PREPARE_BCAST");		break;	case M_CKPT_UPD_PREPARE_BCAST_REPLY:		strcpy(strTemp, "M_CKPT_UPD_PREPARE_BCAST_REPLY");		break;	case M_CKPT_UPD_COMMIT_BCAST:		strcpy(strTemp, "M_CKPT_UPD_COMMIT_BCAST");		break;	case M_CKPT_UPD_COMMIT_BCAST_REPLY:		strcpy(strTemp, "M_CKPT_UPD_COMMIT_BCAST_REPLY");		break;	case M_CKPT_UPD_ROLLBACK_BCAST:		strcpy(strTemp, "M_CKPT_UPD_ROLLBACK_BCAST");		break;	case M_CKPT_UPD_ROLLBACK_BCAST_REPLY:		strcpy(strTemp, "M_CKPT_UPD_ROLLBACK_BCAST_REPLY");		break;	case M_CKPT_UPD_BCAST:		strcpy(strTemp, "M_CKPT_UPD_BCAST");		break;	case M_CKPT_UPD_BCAST_REPLY:		strcpy(strTemp, "M_CKPT_UPD_BCAST_REPLY");		break;	case M_CKPT_SYNC:		strcpy(strTemp, "M_CKPT_SYNC");		break;	case M_CKPT_SYNC_REPLY:		strcpy(strTemp, "M_CKPT_SYNC_REPLY");		break;	case M_CKPT_ACT_SET_BCAST:		strcpy(strTemp, "M_CKPT_ACT_SET_BCAST");		break;	case M_CKPT_ACT_SET_BCAST_REPLY:		strcpy(strTemp, "M_CKPT_ACT_SET_BCAST_REPLY");		break;	case M_CKPT_ACT_SET_FINISH_BCAST:		strcpy(strTemp, "M_CKPT_ACT_SET_FINISH_BCAST");		break;	case M_CKPT_READ:		strcpy(strTemp, "M_CKPT_READ");		break;	case M_CKPT_READ_REPLY:		strcpy(strTemp, "M_CKPT_READ_REPLY");		break;	case M_CKPT_UNLINK_BCAST:		strcpy(strTemp, "M_CKPT_UNLINK_BCAST");		break;	default:		strcpy(strTemp, "NULL");	}	strMsgSubtype = SaCkptMalloc(strlen(strTemp)+1);	if (strMsgSubtype == NULL) {		return NULL;	}	memcpy(strMsgSubtype, strTemp, strlen(strTemp)+1);	SaCkptFree((void*)&strTemp);	return strMsgSubtype;}/* create ckpt message according to a request */SaCkptMessageT* SaCkptMessageCreateReq(SaCkptRequestT* ckptReq, SaCkptMsgSubtypeT msgSubtype){	SaCkptMessageT* ckptMsg = NULL;	ckptMsg = (SaCkptMessageT*)SaCkptMalloc(sizeof(SaCkptMessageT));	if (ckptMsg == NULL) {		return NULL;	}	strcpy(ckptMsg->msgType, T_CKPT);	ckptMsg->msgSubtype = msgSubtype;	ckptMsg->msgVersion = saCkptService->version;	ckptMsg->retVal = SA_OK;	if (ckptReq->openCkpt != NULL) {		strcpy(ckptMsg->checkpointName, 			ckptReq->openCkpt->checkpointName);		strcpy(ckptMsg->activeNodeName,			ckptReq->openCkpt->activeNodeName);	}	ckptMsg->clientHandle = ckptReq->clientRequest->clientHandle;	strcpy(ckptMsg->fromNodeName, saCkptService->nodeName);	strcpy(ckptMsg->clientHostName, saCkptService->nodeName);	ckptMsg->clientRequest = ckptReq->clientRequest->req;	ckptMsg->clientRequestNO = ckptReq->clientRequest->requestNO;	ckptMsg->param = ckptReq->clientRequest->reqParam;	ckptMsg->paramLength = ckptReq->clientRequest->reqParamLength;	ckptMsg->data = ckptReq->clientRequest->data;	ckptMsg->dataLength = ckptReq->clientRequest->dataLength;	return ckptMsg;}/* create ckpt message according to an operation */SaCkptMessageT* SaCkptMessageCreateOp(SaCkptOperationT* ckptOp, SaCkptMsgSubtypeT msgSubtype){	SaCkptMessageT* ckptMsg = NULL;	ckptMsg = (SaCkptMessageT*)SaCkptMalloc(sizeof(SaCkptMessageT));	if (ckptMsg == NULL) {		return NULL;	}	strcpy(ckptMsg->msgType, T_CKPT);	ckptMsg->msgSubtype = msgSubtype;	ckptMsg->msgVersion = saCkptService->version;	ckptMsg->retVal = SA_OK;	strcpy(ckptMsg->checkpointName, ckptOp->replica->checkpointName);	strcpy(ckptMsg->activeNodeName, ckptOp->replica->activeNodeName);	ckptMsg->clientHandle = ckptOp->clientHandle;	strcpy(ckptMsg->fromNodeName, saCkptService->nodeName);	strcpy(ckptMsg->clientHostName, ckptOp->clientHostName);	ckptMsg->clientRequest = ckptOp->clientRequest;	ckptMsg->clientRequestNO = ckptOp->clientRequestNO;	ckptMsg->operation = ckptOp->operation;	ckptMsg->operationNO = ckptOp->operationNO;	ckptMsg->param = ckptOp->param;	ckptMsg->paramLength = ckptOp->paramLength;	ckptMsg->data = ckptOp->data;	ckptMsg->dataLength = ckptOp->dataLength;	return ckptMsg;}/* create an operation on the active replica according to the ckpt message */SaCkptOperationT* SaCkptOperationCreate(SaCkptMessageT* ckptMsg, SaCkptReplicaT* replica){	SaCkptOperationT* ckptOp = NULL;	GList* list = NULL;	SaCkptStateT* state = NULL;	ckptOp = (SaCkptOperationT*)SaCkptMalloc(sizeof(SaCkptOperationT));	if (ckptOp == NULL) {		return NULL;	}	ckptOp->replica = replica;	ckptOp->clientHandle = ckptMsg->clientHandle;	strcpy(ckptOp->clientHostName, ckptMsg->clientHostName);	ckptOp->clientRequest = ckptMsg->clientRequest;	ckptOp->clientRequestNO = ckptMsg->clientRequestNO;	ckptOp->stateList = NULL;	switch (ckptMsg->msgSubtype) {	case M_RPLC_CRT:		ckptOp->operation = OP_RPLC_CRT;		break;	case M_RPLC_ADD:		ckptOp->operation = OP_RPLC_ADD;		break;	case M_CKPT_UPD:		ckptOp->operation = OP_CKPT_UPD;		break;	case M_CKPT_READ:		ckptOp->operation = OP_CKPT_READ;		break;	default:		ckptOp->operation = OP_NULL;	}	replica->nextOperationNumber++;	if (replica->nextOperationNumber <= 0) {		replica->nextOperationNumber = 1;	}	ckptOp->operationNO = replica->nextOperationNumber;	if (ckptMsg->paramLength > 0) {		ckptOp->paramLength = ckptMsg->paramLength;		ckptOp->param = SaCkptMalloc(ckptMsg->paramLength);		SACKPTASSERT(ckptOp->param != NULL);		memcpy(ckptOp->param, ckptMsg->param, ckptMsg->paramLength);	}	if (ckptMsg->dataLength > 0) {		ckptOp->dataLength = ckptMsg->dataLength;		ckptOp->data = SaCkptMalloc(ckptMsg->dataLength);		SACKPTASSERT(ckptOp->data != NULL);		memcpy(ckptOp->data, ckptMsg->data, ckptMsg->dataLength);	}	ckptOp->state = OP_STATE_PENDING;	list = replica->nodeList;	while (list != NULL) {		state = (SaCkptStateT*)SaCkptMalloc(sizeof(SaCkptStateT));		SACKPTASSERT(state != NULL);		memcpy(state, list->data, sizeof(SaCkptStateT));		state->state = OP_STATE_STARTED;		ckptOp->stateList = g_list_append(ckptOp->stateList,			(gpointer)state);				list = list->next;	}	ckptOp->timeoutTag = 0;	/* update message */	ckptMsg->operation = ckptOp->operation;	ckptMsg->operationNO = ckptOp->operationNO;	return ckptOp;}/* free the ckpt message */void SaCkptMessageDelete(SaCkptMessageT** pCkptMsg){	SaCkptMessageT* ckptMsg = *pCkptMsg;		if (ckptMsg->paramLength > 0) {		SaCkptFree((void**)&(ckptMsg->param));	}	if (ckptMsg->dataLength > 0) {		SaCkptFree((void**)&(ckptMsg->data));	}	SaCkptFree((void*)&ckptMsg);	*pCkptMsg = NULL;	return;}/** keep this open request */void initOpenReqNodeStatus(SaCkptClientRequestT *clientReq){		SaCkptReqOpenParamT	*openParam	= NULL;		if(clientReq == NULL){		cl_log(LOG_ERR, "NULL clientReq in initOpenReqNodeStatus\n");		return;	}	if((clientReq->req != REQ_CKPT_OPEN) 				&& (clientReq->req != REQ_CKPT_OPEN_ASYNC)){		cl_log(LOG_ERR, "Not Open Request in initOpenReqNodeStatus\n");		return;	}		openParam = (SaCkptReqOpenParamT *)clientReq->reqParam;		g_hash_table_foreach(saCkptService->nodeStatusHash, getNodeCkptStatus,openParam);		g_hash_table_insert(saCkptService->openRequestHash, (gpointer)openParam->ckptName.value,(gpointer)clientReq);}/* set the open request's status according to node status*/voidgetNodeCkptStatus(gpointer key,gpointer value,			gpointer user_data){	saCkptNodeInfo *ckptNodeInfo = value;	const char *	nodeName = key;	SaCkptReqOpenParamT * openParam = user_data;	saOpenNodeStatusT  	*status 	= NULL;	status = (saOpenNodeStatusT *)ha_malloc(		sizeof(saOpenNodeStatusT));	if(status == NULL){		/*FIXME how to report error on hash fucntions*/		cl_log(LOG_INFO,"malloc error in getNodeCkptStatus\n");		return;	}	strncpy(status->nodeName, nodeName, SA_MAX_NAME_LENGTH);		if(ckptNodeInfo->ckptStatus == CKPT_RUNNING){		status->status = RES_NO_RESPONSE ;	}else{		status->status = RES_NOT_RUN;	}	openParam->nodeReponse = g_list_append(openParam->nodeReponse,status);}gboolean isLoopMessage(SaCkptMessageT * ckptMsg){	if( ckptMsg == NULL) return FALSE;		if( strncmp(ckptMsg->fromNodeName, 		saCkptService->nodeName, SA_MAX_NAME_LENGTH)){		return FALSE;		}		return TRUE;}/* Check if a open request exist already*/SaCkptClientRequestT *isOnOpenProcess(SaCkptReqOpenParamT *openParam ){	SaCkptClientRequestT *	clientReq = NULL;	if(openParam == NULL){		if(saCkptService->flagVerbose){			cl_log(LOG_INFO,"NULL openParam on isOnOpenProcess\n");			return 0;		}	}		clientReq = (SaCkptClientRequestT *)g_hash_table_lookup(	\			saCkptService->openRequestHash,	\			(gconstpointer)openParam->ckptName.value);		return clientReq ;}gbooleanisHighPriority(const SaCkptMessageT *ckptMsg ){	if(ckptMsg == NULL){		cl_log(LOG_INFO,"NULL ckptMsg on isHighPriority\n");		return FALSE;	}	if( strncmp(ckptMsg->fromNodeName, 		saCkptService->nodeName, SA_MAX_NAME_LENGTH) < 0){		return FALSE;		}		return TRUE;}gintupdateOpenProcessQueue(const SaCkptMessageT *ckptMsg ,saOpenResponseTypeT *type){		SaCkptClientRequestT 		*clientReq	=	NULL;	SaCkptReqOpenParamT 		*openParam	=	NULL;	saOpenResponseTypeT		nodeStatus	=	RES_NO_REPLICA;	gint 				result		=	1;	if(ckptMsg == NULL){		cl_log(

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -