⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 message.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
			SACKPTASSERT(ckptResp->resp->data != NULL);			memcpy(ckptResp->resp->data, 				&(openCkpt->checkpointHandle), 				ckptResp->resp->dataLength);			SaCkptResponseSend(&ckptResp);			break;		/* 		 * close the remotely opened checkpoint		 */		case M_CKPT_CLOSE_REMOTE:			if (replica == NULL) {				cl_log(LOG_ERR,					"No replica, ignore message");				break;			}						if (!replica->flagIsActive) {				cl_log(LOG_ERR,					"Standby replica, ignore message");				break;				}			checkpointHandle = *(int*)ckptMsg->data;			openCkpt = (SaCkptOpenCheckpointT*)g_hash_table_lookup(				saCkptService->openCheckpointHash,				(gpointer)&checkpointHandle);			if (openCkpt == NULL) {				cl_log(LOG_ERR, 					"No opencheckpoint, ignore message");				break;			}			ckptMsg->retVal = SaCkptCheckpointClose(&openCkpt);			ckptMsg->msgSubtype = M_CKPT_CLOSE_REMOTE_REPLY;			SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName);			break;		/* 		 * the reply message to M_CKPT_CLOSE_REMOTE		 */		case M_CKPT_CLOSE_REMOTE_REPLY:			if (replica != NULL) {				cl_log(LOG_ERR, 					"Replica exists, ignore message");				break;			}			SACKPTMESSAGEVALIDATEREQ(ckptMsg);			SaCkptRequestStopTimer(ckptReq);			closeParam = ckptReq->clientRequest->reqParam;			checkpointHandle = closeParam->checkpointHandle;			openCkpt = g_hash_table_lookup(				saCkptService->openCheckpointHash,				(gpointer)&checkpointHandle);			ckptResp = SaCkptResponseCreate(ckptReq);			if (openCkpt == NULL) {				ckptResp->resp->retVal= SA_ERR_BAD_HANDLE;			} else {				ckptResp->resp->retVal = 					SaCkptCheckpointClose(&openCkpt);			}			SaCkptResponseSend(&ckptResp);			break;		/* 		 * if the client want to create a local copy of the 		 * checkpoint, it will send this message to ask the 		 * active node to send it the data		 */		case M_RPLC_CRT:			/* FIXME: */			/* if the message size is exceed 1400, break */			/* this message into several messages */						if (replica == NULL) {				cl_log(LOG_ERR,					"No replica, ignore message");				break;			}						if (!replica->flagIsActive) {				cl_log(LOG_ERR,					"Standby replica, ignore message");				break;				}			ckptOp = SaCkptOperationCreate(ckptMsg, replica);			ckptOp->operation= OP_RPLC_CRT;			if (replica->flagReplicaLock != TRUE){				/* lock the replica first */				replica->flagReplicaLock = TRUE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s locked",						replica->checkpointName);				}				ckptOp->state = OP_STATE_STARTED;								SaCkptReplicaPack(&(ckptMsg->data),					&(ckptMsg->dataLength), 					replica);				ckptMsg->msgSubtype = M_RPLC_CRT_REPLY;				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);				g_hash_table_insert(replica->operationHash,					(gpointer)&(ckptOp->operationNO),					(gpointer)ckptOp);			} else {				replica->pendingOperationList = 					g_list_append(					replica->pendingOperationList,					(gpointer)ckptOp);								if (saCkptService->flagVerbose) {					cl_log(LOG_INFO, 					"Send operation to pending list");				}			}			SaCkptOperationStartTimer(ckptOp);			break;		/* 		 * the reply message to M_RPLC_CRT		 * the active node send its data to the standby node via		 * this message		 */		case M_RPLC_CRT_REPLY:			if (replica != NULL) {				cl_log(LOG_ERR, 					"Replica exists, ignore message");				break;			}						SACKPTMESSAGEVALIDATEREQ(ckptMsg);			replica = SaCkptReplicaUnpack(ckptMsg->data, 				ckptMsg->dataLength);			g_hash_table_insert(saCkptService->replicaHash, 				(gpointer)replica->checkpointName, 				(gpointer)replica);						SaCkptFree((void**)&(ckptMsg->data));			ckptMsg->data = NULL;			ckptMsg->dataLength = 0;			ckptMsg->msgSubtype = M_RPLC_ADD;			SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName);						ckptReq->operation = OP_RPLC_ADD;						break;		/*		 * after create the local copy of the checkpoint, it will send		 * this message to active node. The active node then tell all 		 * nodes to update their replica list		 */		case M_RPLC_ADD:			SACKPTMESSAGEVALIDATEOP(ckptMsg);			ckptOp->operation = OP_RPLC_ADD;						state = (SaCkptStateT*)SaCkptMalloc( 				sizeof(SaCkptStateT));			SACKPTASSERT(state != NULL);			state->state = OP_STATE_STARTED;			strcpy(state->nodeName, ckptMsg->clientHostName);			ckptOp->stateList = 				g_list_append(ckptOp->stateList,				(gpointer)state);			ckptMsg->msgSubtype = M_RPLC_ADD_PREPARE_BCAST;			SaCkptMessageMulticast(ckptMsg, 				ckptOp->stateList);			 			break;		/*		 * the active node ask other node to prepare the update		 */		case M_RPLC_ADD_PREPARE_BCAST:			if (replica == NULL) {				cl_log(LOG_ERR, "No replica, ignore message");				break;			}			state = (SaCkptStateT*)SaCkptMalloc( 				sizeof(SaCkptStateT));			if (state != NULL) {				state->state = OP_STATE_PREPARED;				strcpy(state->nodeName,					ckptMsg->clientHostName);				replica->nodeList = 					g_list_append(replica->nodeList,					(gpointer)state);				/*should update the request on pending list*/				updateReplicaPendingOption(replica, ckptMsg->clientHostName);				ckptMsg->retVal = SA_OK;			} else {				ckptMsg->retVal = SA_ERR_NO_MEMORY;			}			ckptMsg->msgSubtype = M_RPLC_ADD_PREPARE_BCAST_REPLY;			SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName);			break;		/*		 * the reply message to M_RPLC_ADD_PREPARE_BCAST		 * the standby nodes tell the active node whether the 		 * preparation is successfull or not		 */		case M_RPLC_ADD_PREPARE_BCAST_REPLY:			SACKPTMESSAGEVALIDATEOP(ckptMsg);			if (ckptOp->state != OP_STATE_STARTED) {				cl_log(LOG_INFO, 					"Op state error, ignore message");				break;			}			if (ckptMsg->retVal != SA_OK) {				finished = TRUE;				ckptOp->state = OP_STATE_ROLLBACKED;			} else {				finished = SaCkptOperationFinished(					ckptMsg->fromNodeName, 					OP_STATE_PREPARED, 					ckptOp->stateList);				if (finished == TRUE) {					ckptOp->state = OP_STATE_PREPARED;				}			}			if (finished == TRUE) {				if (ckptOp->state == OP_STATE_ROLLBACKED) {					ckptMsg->msgSubtype = 						M_RPLC_ADD_ROLLBACK_BCAST;				} 				if (ckptOp->state == OP_STATE_PREPARED) {					ckptMsg->msgSubtype = 						M_RPLC_ADD_COMMIT_BCAST;				} 				SaCkptOperationStopTimer(ckptOp);				SaCkptMessageMulticast(ckptMsg, 					ckptOp->stateList);			}			break;		/*		 * if all the nodes prepared successfully, the active node		 * tell them to commit the update		 */		case M_RPLC_ADD_COMMIT_BCAST:			if (replica == NULL) {				cl_log(LOG_INFO, 					"No replica, ignore message");				break;			}			list = replica->nodeList;			while (list != NULL) {				state = (SaCkptStateT*)list->data;				if (!strcmp(state->nodeName,					ckptMsg->clientHostName)) {					state->state = 	OP_STATE_COMMITTED;					break;				}				list = list->next;			}			ckptMsg->msgSubtype = M_RPLC_ADD_COMMIT_BCAST_REPLY;			SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName);			break;		/*		 * the reply message to M_RPLC_ADD_COMMIT_BCAST		 * the commit will always success		 */		case M_RPLC_ADD_COMMIT_BCAST_REPLY:			SACKPTMESSAGEVALIDATEOP(ckptMsg);						if (ckptOp->state != OP_STATE_PREPARED) {				cl_log(LOG_ERR, 					"Op state error, ignore message");				break;			}			finished = SaCkptOperationFinished(				ckptMsg->fromNodeName, 				OP_STATE_COMMITTED, 				ckptOp->stateList);			if (finished == TRUE) {				ckptOp->state = OP_STATE_COMMITTED;								ckptMsg->msgSubtype = M_RPLC_ADD_REPLY;				SaCkptMessageSend(ckptMsg, 					ckptOp->clientHostName);				/* unlock the replica */				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}				SaCkptOperationRemove(&ckptOp);			}			break;		/*		 * the reply message to M_RPLC_ADD		 * after got the reply, it will send response the client		 * application		 */		case M_RPLC_ADD_REPLY:			if (replica == NULL) {				cl_log(LOG_ERR, 					"No replica, ignore message");				break;			}			SACKPTMESSAGEVALIDATEREQ(ckptMsg);			SaCkptRequestStopTimer(ckptReq);						ckptResp = SaCkptResponseCreate(ckptReq);			ckptResp->resp->retVal = ckptMsg->retVal;			if (ckptMsg->retVal != SA_OK) {				SaCkptReplicaRemove(&replica);			} else {				replica->replicaState = STATE_CREATE_COMMITTED;				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}				replica->flagPendOperation = FALSE;								openParam = ckptReq->clientRequest->reqParam;				openCkpt = SaCkptCheckpointOpen(client, 					replica, openParam);				ckptReq->openCkpt = openCkpt;								ckptResp->resp->dataLength = 					sizeof(SaCkptCheckpointHandleT);				ckptResp->resp->data = SaCkptMalloc(					ckptResp->resp->dataLength);				SACKPTASSERT(ckptResp->resp->data != NULL);				memcpy(ckptResp->resp->data, 					&(openCkpt->checkpointHandle), 					ckptResp->resp->dataLength);			}			SaCkptResponseSend(&ckptResp);			break;		/*		 * one or more nodes cannot update their replica list, so		 * rollback the operation		 */		case M_RPLC_ADD_ROLLBACK_BCAST:			if (replica == NULL) {				cl_log(LOG_ERR, 					"No replica, ignore message");				break;			}			if (!strcmp(ckptMsg->clientHostName,				saCkptService->nodeName)) {				SaCkptReplicaRemove(&replica);			} else {				list = replica->nodeList;				while (list != NULL) {					state = (SaCkptStateT*)list->data;					if (!strcmp(state->nodeName,						ckptMsg->clientHostName)) {						replica->nodeList = 							g_list_remove(							replica->nodeList,							(gpointer)state);						break;					}					list = list->next;				}			}			ckptMsg->msgSubtype = M_RPLC_ADD_ROLLBACK_BCAST_REPLY;			SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName);			break;		/* 		 * the reply message to M_RPLC_ADD_ROLLBACK_BCAST		 */		case M_RPLC_ADD_ROLLBACK_BCAST_REPLY:			SACKPTMESSAGEVALIDATEOP(ckptMsg);			if (ckptOp->state != OP_STATE_ROLLBACKED) {				cl_log(LOG_INFO, 					"Op state error, ignore message");				break;			}			finished = SaCkptOperationFinished(				ckptMsg->fromNodeName, 				OP_STATE_ROLLBACKED,				ckptOp->stateList);			if (finished == TRUE) {				ckptOp->state = OP_STATE_ROLLBACKED;								ckptMsg->msgSubtype = M_RPLC_ADD_REPLY;				SaCkptMessageSend(ckptMsg, 					ckptOp->clientHostName);				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}				SaCkptOperationRemove(&ckptOp);			}			break;		/*		 * after the checkpoint was closed and deleted on one node, 		 * the other nodes have to update their replica list		 */		case M_RPLC_DEL_BCAST:			if (replica == NULL) {				cl_log(LOG_ERR, 					"No replica, ignore message");				break;			}			list = replica->nodeList;			while (list != NULL) {				state = (SaCkptStateT*)list->data;				if (!strcmp(state->nodeName,					ckptMsg->fromNodeName)) {					replica->nodeList = 						g_list_remove(						replica->nodeList,						(gpointer)state);					break;				}								list = list->next;			}			if (!strcmp(replica->activeNodeName, 				ckptMsg->fromNodeName)) {				/* FIXME:  */				/* got the active replica by election */				/* the most updated replica will be the active  */				/* replica */				strcpy(replica->activeNodeName, 					saCkptService->nodeName);				replica->flagIsActive = TRUE;				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}#if 0				/* FIXME:  */				/* if there are started requests on this replica */				/* return SA_ERR_TRY_AGAIN */				g_hash_table_foreach(saCkptService->clientHash, 					xxxx,					(gpointer)replica);#endif			}						break;		/*		 * Any update request will need to send this message to active		 * node first		 *		 * if the checkpoint has SA_CKPT_WR_ACTIVE_REPLICA flag, the 		 * active node will update and commit first, then send reply 		 * to the client application, no matter the standby nodes		 * can update or not. This is not two phase commit		 *		 * if the checkpoint has SA_CKPT_WR_ALL_REPLICAS flag, it will		 * update the replicas on all the nodes via two phase commit		 * algorithm		 */		case M_CKPT_UPD:			if (replica == NULL) {				cl_log(LOG_ERR,					"No replica, ignore message");				break;			}						if (!replica->flagIsActive) {				cl_log(LOG_ERR,					"Standby replica, ignore message");				break;				}			ckptOp = SaCkptOperationCreate(ckptMsg, replica);			SACKPTASSERT (ckptOp!= NULL);						if (replica->flagReplicaLock == TRUE) {				replica->pendingOperationList = 					g_list_append(					replica->pendingOperationList,					(gpointer)ckptOp);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -