⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 message.c

📁 linux集群服务器软件代码包
💻 C
📖 第 1 页 / 共 5 页
字号:
				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO, 					"Send operation to pending list");				}								SaCkptOperationStartTimer(ckptOp);								break;			}			replica->flagReplicaLock = TRUE;			if (saCkptService->flagVerbose) {				cl_log(LOG_INFO,					"Replica %s locked",					replica->checkpointName);			}						/*			 * if replica is opened with SA_CKPT_WR_ACTIVE_REPLICA			 * update active replica and return			 */			if (replica->createFlag & 				SA_CKPT_WR_ACTIVE_REPLICA) {				/* update active replica */				retVal = SaCkptReplicaUpdate(replica, 					ckptMsg->clientRequest,					ckptMsg->dataLength,					ckptMsg->data,					ckptMsg->paramLength,					ckptMsg->param);				ckptMsg->msgSubtype = M_CKPT_UPD_REPLY;				ckptMsg->retVal = retVal;				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);				if (retVal != SA_OK) {					replica->flagReplicaLock = FALSE;					if (saCkptService->flagVerbose) {						cl_log(LOG_INFO,							"Replica %s unlocked",							replica->checkpointName);					}					break;				}				/* send msg to all the nodes except itself */				ckptMsg->msgSubtype = M_CKPT_UPD_BCAST;				nodeList = g_list_copy(replica->nodeList);				list = nodeList;				while (list != NULL) {					state = (SaCkptStateT*)list->data;					if (!strcmp(state->nodeName, 						saCkptService->nodeName)) {						nodeList = g_list_remove(							nodeList,							(gpointer)state);						break;					}										list = list->next;				}				SaCkptMessageMulticast(ckptMsg, nodeList);				g_list_free(nodeList);				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}							}else {				g_hash_table_insert(replica->operationHash,					(gpointer)&(ckptOp->operationNO),					(gpointer)ckptOp);				ckptOp->state = OP_STATE_STARTED;				ckptMsg->msgSubtype = 					M_CKPT_UPD_PREPARE_BCAST;				SaCkptMessageMulticast(ckptMsg, 					ckptOp->stateList);				SaCkptOperationStartTimer(ckptOp);			}			break;		/* 		 * after the active node has updated and committed its 		 * checkpoint, it will broadcast this message to ask 		 * other standby node to update their checkpoint.		 *		 * if the update on the standby nodes is not successful, 		 * the data of this checkpoint will be marked as invalid		 *		 * this is not two phase commit since the active node has		 * already committed		 *		 * this message do not need reply		 */		case M_CKPT_UPD_BCAST:			if (replica == NULL) {				cl_log(LOG_INFO, 					"No replica, ignore message");				break;			}			SaCkptReplicaUpdate(replica,				ckptMsg->clientRequest,				ckptMsg->dataLength,				ckptMsg->data,				ckptMsg->paramLength,				ckptMsg->param);			/*			 * update the standby replica			 * FIXME:			 */			replica->nextOperationNumber = ckptMsg->operationNO + 1;			/* do not send reply */						break;		/*		 * the beginning of two phase commit algorithm		 */		case M_CKPT_UPD_PREPARE_BCAST:			if (replica == NULL) {				cl_log(LOG_INFO, 					"No replica, ignore message");				break;			}						retVal = SaCkptReplicaUpdPrepare(replica, 				ckptMsg->clientRequest,				ckptMsg->dataLength,				ckptMsg->data,				ckptMsg->paramLength,				ckptMsg->param);			ckptMsg->msgSubtype = M_CKPT_UPD_PREPARE_BCAST_REPLY;			ckptMsg->retVal = retVal;			SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName);			break;		/*		 * the reply message of M_CKPT_UPD_PREPARE_BCAST		 */		case M_CKPT_UPD_PREPARE_BCAST_REPLY:			SACKPTMESSAGEVALIDATEOP(ckptMsg);						if (ckptOp->state != OP_STATE_STARTED) {				cl_log(LOG_INFO, 					"Op state error, ignore message");				break;			}						if (ckptMsg->retVal != SA_OK) {				finished = TRUE;				ckptOp->state = 					OP_STATE_ROLLBACKED;			} else {				finished = SaCkptOperationFinished(					ckptMsg->fromNodeName, 					OP_STATE_PREPARED, 					ckptOp->stateList);				if (finished == TRUE) {					ckptOp->state = OP_STATE_PREPARED;				}			}						if (finished == TRUE) {				if (ckptOp->state == OP_STATE_ROLLBACKED) {					ckptMsg->msgSubtype = 						M_CKPT_UPD_ROLLBACK_BCAST;				} 				if (ckptOp->state == OP_STATE_PREPARED) {					ckptMsg->msgSubtype = 						M_CKPT_UPD_COMMIT_BCAST;				} 				SaCkptOperationStopTimer(ckptOp);				SaCkptMessageMulticast(ckptMsg, 					ckptOp->stateList);			}						break;		/*		 * the update commit broadcast message		 */		case M_CKPT_UPD_COMMIT_BCAST:			if (replica == NULL) {				cl_log(LOG_INFO, 					"No replica, ignore message");				break;			}						retVal = SaCkptReplicaUpdCommit(replica, 				ckptMsg->clientRequest,				ckptMsg->dataLength,				ckptMsg->data,				ckptMsg->paramLength,				ckptMsg->param);			replica->nextOperationNumber = ckptMsg->operationNO + 1;			ckptMsg->msgSubtype = M_CKPT_UPD_COMMIT_BCAST_REPLY;			SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName);			break;		/*		 * the reply message to M_CKPT_UPD_COMMIT_BCAST		 */		case M_CKPT_UPD_COMMIT_BCAST_REPLY:			SACKPTMESSAGEVALIDATEOP(ckptMsg);						if (ckptOp->state != OP_STATE_PREPARED) {				cl_log(LOG_INFO, 					"Op state error, ignore message");				break;			}			finished = SaCkptOperationFinished(				ckptMsg->fromNodeName,				OP_STATE_COMMITTED,				ckptOp->stateList);			if (finished == TRUE) {				ckptMsg->msgSubtype = M_CKPT_UPD_REPLY;								/* update reply message do not need data */				if (ckptMsg->dataLength > 0) {					SaCkptFree((void**)&(ckptMsg->data));					ckptMsg->data = NULL;					ckptMsg->dataLength = 0;				}								SaCkptMessageSend(ckptMsg,					ckptMsg->clientHostName);								/* unlock replica */				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}								SaCkptOperationRemove(&ckptOp);			}						break;		/*		 * the rollback message		 */		case M_CKPT_UPD_ROLLBACK_BCAST:			if (replica == NULL) {				cl_log(LOG_INFO, 					"No replica, ignore message");				break;			}						retVal = SaCkptReplicaUpdRollback(replica,				ckptMsg->clientRequest,				ckptMsg->dataLength,				ckptMsg->data,				ckptMsg->paramLength,				ckptMsg->param);			ckptMsg->msgSubtype = M_CKPT_UPD_ROLLBACK_BCAST_REPLY;			SaCkptMessageSend(ckptMsg, ckptMsg->activeNodeName);						break;		/*		 * the reply message to M_CKPT_UPD_ROLLBACK_BCAST		 */		case M_CKPT_UPD_ROLLBACK_BCAST_REPLY:			SACKPTMESSAGEVALIDATEOP(ckptMsg);			if (ckptOp->state != OP_STATE_ROLLBACKED) {				cl_log(LOG_INFO, 					"Op state error, ignore message");				break;			}			finished = SaCkptOperationFinished(				ckptMsg->fromNodeName,				OP_STATE_ROLLBACKED, 				ckptOp->stateList);			if (finished == TRUE) {				ckptMsg->msgSubtype = M_CKPT_UPD_REPLY;								/* update reply message do not need data */				if (ckptMsg->dataLength > 0) {					SaCkptFree((void**)&(ckptMsg->data));					ckptMsg->data = NULL;					ckptMsg->dataLength = 0;				}								SaCkptMessageSend(ckptMsg, ckptMsg->clientHostName);				/* unlock replica */				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}				SaCkptOperationRemove(&ckptOp);			}			break;		/*		 * The read (and also udpate) operation has to be sent to active		 * node first, so all these operations can be serialized		 */		case M_CKPT_READ:			if (replica == NULL) {				cl_log(LOG_INFO,					"No replica, ignore message");				break;			}						if (!replica->flagIsActive) {				cl_log(LOG_INFO,					"Standby replica, ignore message");				break;				}			if (replica->flagReplicaLock != TRUE) {				retVal = SaCkptReplicaRead(replica, 					&(ckptMsg->dataLength),					&(ckptMsg->data),					ckptMsg->paramLength,					ckptMsg->param);								ckptMsg->retVal = retVal;				ckptMsg->msgSubtype = M_CKPT_READ_REPLY;				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);			} else {				ckptOp = SaCkptOperationCreate(					ckptMsg, replica);				replica->pendingOperationList = 					g_list_append(					replica->pendingOperationList,					(gpointer)ckptOp);				cl_log(LOG_INFO, 					"Send read operation to pending list");								SaCkptOperationStartTimer(ckptOp);			}			break;		/*		 * sync message 		 * on receive this message, if the replica is not lock and no 		 * other pending operations, the active replica reply it with		 * OK. Else, the active replica add it to the the pending list		 */		case M_CKPT_SYNC:			if (replica == NULL) {				cl_log(LOG_INFO,					"No replica, ignore message");				break;			}						if (!replica->flagIsActive) {				cl_log(LOG_INFO,					"Standby replica, ignore message");				break;				}			if ((replica->flagReplicaLock != TRUE)  && 				(g_list_length(				replica->pendingOperationList) == 0)) {								ckptMsg->retVal = SA_OK;				ckptMsg->msgSubtype = M_CKPT_SYNC_REPLY;				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);			} else {				ckptOp = SaCkptOperationCreate(					ckptMsg, replica);				replica->pendingOperationList = 					g_list_append(					replica->pendingOperationList,					(gpointer)ckptOp);				cl_log(LOG_INFO, 					"Send sync operation to pending list");								SaCkptOperationStartTimer(ckptOp);			}			break;		/* 		 * set active broadcast message		 * on receive this message, all the replica stop sending 		 * requests to the active replica.		 */		case M_CKPT_ACT_SET_BCAST:			if (replica == NULL) {				cl_log(LOG_INFO,					"No replica, ignore message");				break;			}						replica->flagReplicaPending = TRUE;			cl_log(LOG_INFO,				"Replica %s stop sending requests",				replica->checkpointName);						if (!replica->flagIsActive) {				cl_log(LOG_INFO,					"Standby replica, ignore message");				break;				}			if ((replica->flagReplicaLock != TRUE)  && 				(g_list_length(				replica->pendingOperationList) == 0)) {				ckptMsg->retVal = SA_OK;				ckptMsg->msgSubtype = M_CKPT_ACT_SET_BCAST_REPLY;				SaCkptMessageSend(ckptMsg, 					ckptMsg->clientHostName);			} else {				ckptOp = SaCkptOperationCreate(					ckptMsg, replica);				replica->pendingOperationList = 					g_list_append(					replica->pendingOperationList,					(gpointer)ckptOp);				cl_log(LOG_INFO, 				"Send act_set operation to pending list");								SaCkptOperationStartTimer(ckptOp);			}			break;		case M_CKPT_ACT_SET_BCAST_REPLY:			if (replica == NULL) {				cl_log(LOG_INFO,					"No replica, ignore message");				break;			}			SACKPTMESSAGEVALIDATEREQ(ckptMsg);			replica->flagIsActive = TRUE;			strcpy(replica->activeNodeName,				saCkptService->nodeName);			cl_log(LOG_INFO, 				"checkpoint %s is set as active replica",				replica->checkpointName);						replica->flagReplicaPending = FALSE;			cl_log(LOG_INFO,				"Replica %s resume sending requests",				replica->checkpointName);						ckptMsg->retVal = SA_OK;			ckptMsg->msgSubtype = M_CKPT_ACT_SET_FINISH_BCAST;			SaCkptMessageBroadcast(ckptMsg);			/* stop timer and send back response */			SaCkptRequestStopTimer(ckptReq);						ckptResp = SaCkptResponseCreate(ckptReq);			ckptResp->resp->retVal = ckptMsg->retVal;			SaCkptResponseSend(&ckptResp);			break;					case M_CKPT_ACT_SET_FINISH_BCAST:			if (replica == NULL) {				cl_log(LOG_INFO,					"No replica, ignore message");				break;			}			if (strcmp(saCkptService->nodeName, 				ckptMsg->clientHostName) == 0) {				cl_log(LOG_INFO,					"Send from itself, ignore message");				break;			}			replica->flagIsActive = FALSE;			strcpy(replica->activeNodeName,				ckptMsg->clientHostName);			cl_log(LOG_INFO, 			"Active node of replica %s has been switched to %s",			replica->checkpointName, replica->activeNodeName);			replica->flagReplicaPending = FALSE;			cl_log(LOG_INFO,				"Replica %s resume sending requests",				replica->checkpointName);						break;						case M_CKPT_UPD_REPLY:		case M_CKPT_READ_REPLY:		case M_CKPT_SYNC_REPLY:#if 0						if (replica == NULL) {				cl_log(LOG_INFO, 					"No replica, ignore message");				break;			}#endif			SACKPTMESSAGEVALIDATEREQ(ckptMsg);			SaCkptRequestStopTimer(ckptReq);						ckptResp = SaCkptResponseCreate(ckptReq);			ckptResp->resp->retVal = ckptMsg->retVal;			if ((ckptMsg->msgSubtype == M_CKPT_READ_REPLY) &&				(ckptMsg->dataLength > 0)) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -