⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 operation.c

📁 在LINUX下实现HA的源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
			OPERATION_TIMEOUT * 1000000, 			SaCkptOperationTimeout, 			(gpointer)ckptOp);		if (saCkptService->flagVerbose) {			strOp = SaCkptOp2String(ckptOp->operation);			cl_log(LOG_INFO, 				"Start timer %u for op %d (%s)",				ckptOp->timeoutTag, 				ckptOp->operationNO, strOp);			SaCkptFree((void*)&strOp);		}	}	return;}/* stop the timer for the operation */void SaCkptOperationStopTimer(SaCkptOperationT* ckptOp){	char* strOp = NULL;	if (ckptOp->timeoutTag > 0) {		if (saCkptService->flagVerbose) {			strOp = SaCkptOp2String(ckptOp->operation);			cl_log(LOG_INFO, 				"delete timer %u for op %d (%s)",				ckptOp->timeoutTag,				ckptOp->operationNO, strOp);			SaCkptFree((void*)&strOp);		}				g_source_remove(ckptOp->timeoutTag);		ckptOp->timeoutTag = 0;	}	return;}/* whether the operation finished or not */intSaCkptOperationFinished(char* fromNodeName, SaCkptOpStateT opState, GList* list){	int finished = TRUE;	SaCkptStateT* state = NULL;	while (list != NULL) {		state = (SaCkptStateT*)list->data;		if (!strcmp(state->nodeName, fromNodeName)) {			state->state = opState;		} else {			/*			 * Shouldn't state->state be the same type			 * as opState?  FIXME or comment me			 */			if ((SaCkptOpStateT)state->state != opState) {				finished = FALSE;			}		}		list = list->next;	}	return finished;}char* SaCkptOp2String(SaCkptOpT op){	char* strOp = NULL;	char*  strTemp = NULL;	strTemp = (char*)SaCkptMalloc(64);	SACKPTASSERT(strTemp != NULL);	switch (op) {	case OP_NULL:		strcpy(strTemp, "OP_NULL");		break;	case OP_CKPT_OPEN:		strcpy(strTemp, "OP_CKPT_OPEN");		break;	case OP_RPLC_CRT:		strcpy(strTemp, "OP_RPLC_CRT");		break;	case OP_RPLC_ADD:		strcpy(strTemp, "OP_RPLC_ADD");		break;	case OP_CKPT_UPD:		strcpy(strTemp, "OP_CKPT_UPD");		break;	case OP_CKPT_READ:		strcpy(strTemp, "OP_CKPT_READ");		break;	case OP_CKPT_ULNK:		strcpy(strTemp, "OP_CKPT_ULNK");		break;	case OP_CKPT_SYNC:		strcpy(strTemp, "OP_CKPT_SYNC");		break;	case OP_CKPT_ACT_SET:		strcpy(strTemp, "OP_CKPT_ACT_SET");		break;	}	strOp = (char*)SaCkptMalloc(strlen(strTemp)+1);	if (strOp == NULL) {		return NULL;	}	memcpy(strOp, strTemp, strlen(strTemp)+1);	SaCkptFree((void*)&strTemp);	return strOp;}/* Continue the operation after a node failure*/void SaCkptOperationNodeFailure(gpointer key, 	gpointer value, 	gpointer userdata){	SaCkptOperationT* ckptOp = NULL;	SaCkptMessageT* ckptMsg = NULL;	SaCkptReplicaT* replica = NULL;	SaCkptStateT* state = NULL;	char* strNodeName = NULL;	GList* list = NULL;	int finished = TRUE;	int opState = -1;	ckptOp = (SaCkptOperationT*)value;	replica = ckptOp->replica;	strNodeName = (char*)userdata;	/* 	 * if the operation is started by the failed node	 * remove the operation directly	 */	if (!strcmp(ckptOp->clientHostName, strNodeName)) {		SaCkptOperationRemove(&ckptOp);		return;	}	/* only OP_RPLC_ADD and OP_CKPT_UPD need to be processed */	if ((ckptOp->operation == OP_RPLC_ADD) ||		(ckptOp->operation == OP_CKPT_UPD)) {		list = ckptOp->stateList;		finished = TRUE;		while (list != NULL) {			state = (SaCkptStateT*)list->data;			if (state == NULL) {				list = list->next;				continue;			}			if (!strcmp(state->nodeName, strNodeName)) {				ckptOp->stateList = g_list_remove(					ckptOp->stateList,					(gpointer)state);				list = ckptOp->stateList;				continue;			} else {				if (opState == -1) {					opState = state->state;				} else {					if (opState != state->state) {						finished = FALSE;					}				}			}						list = list->next;		}		if (finished) {			switch(opState) {			case OP_STATE_PREPARED:				ckptOp->state = OP_STATE_PREPARED;								/* broadcast commit message */				if (ckptOp->operation == OP_RPLC_ADD) {					ckptMsg = SaCkptMessageCreateOp(ckptOp,						M_RPLC_ADD_COMMIT_BCAST);				} else {					ckptMsg = SaCkptMessageCreateOp(ckptOp,						M_CKPT_UPD_COMMIT_BCAST);				}				SaCkptMessageMulticast(ckptMsg, 					ckptOp->stateList);				SaCkptFree((void*)&ckptMsg);								break;							case OP_STATE_COMMITTED:				/* send back response */				ckptOp->state = OP_STATE_COMMITTED;				if (ckptOp->operation == OP_RPLC_ADD) {					ckptMsg = SaCkptMessageCreateOp(ckptOp,						M_RPLC_ADD_REPLY);				} else {					ckptMsg = SaCkptMessageCreateOp(ckptOp,						M_CKPT_UPD_REPLY);				}				SaCkptMessageSend(ckptMsg, 					ckptOp->clientHostName);				SaCkptFree((void*)&ckptMsg);				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}				SaCkptOperationRemove(&ckptOp);				break;			case OP_STATE_ROLLBACKED:				/* send back response */				ckptOp->state = OP_STATE_ROLLBACKED;				if (ckptOp->operation == OP_RPLC_ADD) {					ckptMsg = SaCkptMessageCreateOp(ckptOp,						M_RPLC_ADD_REPLY);				} else {					ckptMsg = SaCkptMessageCreateOp(ckptOp,						M_CKPT_UPD_REPLY);				}				ckptMsg->retVal = SA_ERR_FAILED_OPERATION;				SaCkptMessageSend(ckptMsg, 					ckptOp->clientHostName);				SaCkptFree((void*)&ckptMsg);				replica->flagReplicaLock = FALSE;				if (saCkptService->flagVerbose) {					cl_log(LOG_INFO,						"Replica %s unlocked",						replica->checkpointName);				}				SaCkptOperationRemove(&ckptOp);				break;							default:				break;			}		}	}	return;}SaCkptResponseT* createLocalReplical(SaCkptRequestT* ckptReq){		SaCkptResponseT* ckptResp = NULL;	SaCkptOpenCheckpointT* openCkpt = ckptReq->openCkpt;	SaCkptClientT* client = ckptReq->client;	SaCkptReplicaT* replica = NULL;	SaCkptReqOpenParamT* openParam = NULL;	char* strReq = NULL;		if(ckptReq == NULL) {		cl_log(LOG_ERR,"NULL ckptReq in createLocalReplica");		return NULL;	}		if(ckptReq->clientRequest->req != REQ_CKPT_OPEN &&			ckptReq->clientRequest->req != REQ_CKPT_OPEN_ASYNC){		cl_log(LOG_ERR,"Not open req in createLocalReplica");		return NULL;	}	strReq = SaCkptReq2String(ckptReq->clientRequest->req);	cl_log(LOG_INFO, 		"client %d, request %lu (%s), create a local replica", 		ckptReq->clientRequest->clientHandle,		ckptReq->clientRequest->requestNO, strReq);	SaCkptFree((void*)&strReq);	ckptResp = SaCkptResponseCreate(ckptReq);		openParam = ckptReq->clientRequest->reqParam;	if (openParam->openFlag & 		SA_CKPT_CHECKPOINT_COLOCATED) {		/* create local replica */		replica = SaCkptReplicaCreate(openParam);		SACKPTASSERT(replica != NULL);		replica->replicaState= STATE_CREATE_COMMITTED;		/* open the local replica */		openCkpt = SaCkptCheckpointOpen(client, 			replica, openParam);		SACKPTASSERT(openCkpt != NULL);		ckptReq->openCkpt = openCkpt;		/* create and send client response */		ckptResp->resp->dataLength = 			sizeof(SaCkptCheckpointHandleT);		ckptResp->resp->data = SaCkptMalloc(			sizeof(SaCkptCheckpointHandleT));		memcpy(ckptResp->resp->data, 			&(openCkpt->checkpointHandle),			sizeof(openCkpt->checkpointHandle));		ckptResp->resp->retVal = SA_OK;	} else {		ckptResp->resp->retVal = SA_ERR_TIMEOUT;	}	return ckptResp;}voidupdateReplicaPendingOption(SaCkptReplicaT *replica, const char *hostName){	GList * list = NULL;	GList * opStateList = NULL;	SaCkptOperationT* ckptOp = NULL;	SaCkptStateT* state = NULL;	gboolean nodeExisted = FALSE;	list = replica->pendingOperationList;		while(list != NULL){		ckptOp = (SaCkptOperationT*)list->data;		opStateList = ckptOp->stateList;		nodeExisted = FALSE;		/* we should check if this node name exist already, maybe just for debug*/		while(opStateList != NULL){			state = (SaCkptStateT* )opStateList->data;			if(!strncmp(state->nodeName, hostName,SA_MAX_NAME_LENGTH )){				nodeExisted = TRUE;				cl_log(LOG_ERR, "hostName %s exist already in updateReplicaPendingOption\n",hostName);				break;			}			opStateList = opStateList->next;		}				if(!nodeExisted){			opStateList = ckptOp->stateList;			state = (SaCkptStateT*)ha_malloc(sizeof(SaCkptStateT));			strncpy(state->nodeName,hostName,SA_MAX_NAME_LENGTH);			state->nodeName[SA_MAX_NAME_LENGTH-1]='\0';			state->state = OP_STATE_STARTED;			ckptOp->stateList = g_list_append(ckptOp->stateList,				(gpointer)state);		}		list = list->next;	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -