📄 request.c
字号:
break; case REQ_SEC_QUERY: secQueryParam = reqParam; checkpointHandle= secQueryParam->checkpointHandle; openCkpt = g_hash_table_lookup( saCkptService->openCheckpointHash, (gpointer)&(checkpointHandle)); if ((openCkpt == NULL) || (openCkpt->replica == NULL)) { ckptResp->resp->retVal = SA_ERR_LIBRARY; SaCkptResponseSend(&ckptResp); break; } sectionDescriptor =(SaCkptSectionDescriptorT *)SaCkptMalloc(sizeof(SaCkptSectionDescriptorT)); if(sectionDescriptor == NULL){ ckptResp->resp->retVal = SA_ERR_NO_MEMORY; SaCkptResponseSend(&ckptResp); } replica = openCkpt->replica; descNumber = 0; /* * here a two-pass search used * in first search, we get how much to alloc * in second search, we copy all data we needed * maybe some improvement here needed */ for(secListPass = 0; secListPass <2; secListPass ++){ list = replica->sectionList; if(secListPass ==1){ /* * the response begin with section number * then followed by the sections, with ID at the end of each one */ ckptResp->resp->data = SaCkptMalloc( secListTotalSize + sizeof(int)); if (ckptResp->resp->data == NULL) { ckptResp->resp->retVal = SA_ERR_NO_MEMORY; SaCkptFree((void**)§ionDescriptor); SaCkptResponseSend(&ckptResp); break; } memset(ckptResp->resp->data, 0, sectNumber * sizeof(SaCkptSectionDescriptorT)); p = ckptResp->resp->data; *(int *)p = descNumber; p += sizeof(int); }else{ secListTotalSize = 0; } while (list != NULL) { sectSelected = 0; section = (SaCkptSectionT*)list->data; switch (secQueryParam->chosenFlag) { case SA_CKPT_SECTIONS_FOREVER: if (section->expirationTime == SA_TIME_END) { sectSelected = 1; } break; case SA_CKPT_SECTIONS_LEQ_EXPIRATION_TIME: if ((section->expirationTime <= secQueryParam->expireTime) && (section->expirationTime != SA_TIME_END)){ sectSelected = 1; } break; case SA_CKPT_SECTIONS_GEQ_EXPIRATION_TIME: if ((section->expirationTime >= secQueryParam->expireTime) && (section->expirationTime != SA_TIME_END)){ sectSelected = 1; } break; case SA_CKPT_SECTIONS_CORRUPTED: if (section->dataState == SA_CKPT_SECTION_CORRUPTED) { sectSelected = 1; } break; case SA_CKPT_SECTIONS_ANY: sectSelected = 1; break; default: cl_log(LOG_ERR, "Unknown section chosenFlag"); SaCkptFree((void**)§ionDescriptor); ckptResp->resp->retVal = SA_ERR_LIBRARY; SaCkptResponseSend(&ckptResp); break; } if (sectSelected == 1) { if(secListPass == 0){ descNumber ++; secListTotalSize += (section->sectionID.idLen + sizeof(SaCkptSectionDescriptorT)); }else{ sectionDescriptor->sectionId.idLen= section->sectionID.idLen; sectionDescriptor->expirationTime = section->expirationTime; sectionDescriptor->lastUpdate = section->lastUpdateTime; sectionDescriptor->sectionSize = section->dataLength[section->dataIndex]; sectionDescriptor->sectionState = section->dataState; memcpy(p,sectionDescriptor,sizeof(SaCkptSectionDescriptorT)); p += sizeof(SaCkptSectionDescriptorT); if(section->sectionID.idLen>0){ memcpy(p,section->sectionID.id,section->sectionID.idLen); p+= section->sectionID.idLen; } }/*if(secListPass == 0) */ } list = list->next; }/* while (list != NULL) */ }/* for(secListPass */ ckptResp->resp->dataLength = secListTotalSize + sizeof(int); ckptResp->resp->retVal = SA_OK; SaCkptResponseSend(&ckptResp); break; case REQ_CKPT_ULNK: ckptReq->operation = OP_CKPT_ULNK; ckptMsg = SaCkptMessageCreateReq(ckptReq, M_CKPT_UNLINK_BCAST); SaCkptMessageBroadcast(ckptMsg); SaCkptFree((void**)&ckptMsg); ckptResp->resp->retVal = SA_OK; SaCkptResponseSend(&ckptResp); break; default: cl_log(LOG_INFO, "Not implemented request"); ckptResp->resp->retVal = SA_ERR_FAILED_OPERATION; SaCkptResponseSend(&ckptResp); break; } if (ckptResp != NULL) { if (ckptResp->resp != NULL) { if (ckptResp->resp->data != NULL) { SaCkptFree((void**)&(ckptResp->resp->data)); } SaCkptFree((void**)&(ckptResp->resp)); } SaCkptFree((void*)&ckptResp); } return HA_OK;}/* request timeout process, send back timeout response to client */gbooleanSaCkptRequestTimeout(gpointer timeout_data){ SaCkptRequestT* ckptReq = (SaCkptRequestT*)timeout_data; SaCkptResponseT* ckptResp = NULL; SaCkptOpenCheckpointT* openCkpt = ckptReq->openCkpt; SaCkptClientT* client = ckptReq->client; SaCkptReplicaT* replica = NULL; SaCkptReqOpenParamT* openParam = NULL; char* strReq = NULL; strReq = SaCkptReq2String(ckptReq->clientRequest->req); cl_log(LOG_INFO, "Request timeout, client %d, request %lu (%s)", ckptReq->clientRequest->clientHandle, ckptReq->clientRequest->requestNO, strReq); SaCkptFree((void*)&strReq); ckptResp = SaCkptResponseCreate(ckptReq); switch (ckptReq->clientRequest->req) { case REQ_CKPT_OPEN: case REQ_CKPT_OPEN_ASYNC: openParam = ckptReq->clientRequest->reqParam; if (openParam->openFlag & SA_CKPT_CHECKPOINT_COLOCATED) { /* create local replica */ replica = SaCkptReplicaCreate(openParam); SACKPTASSERT(replica != NULL); replica->replicaState= STATE_CREATE_COMMITTED; /* open the local replica */ openCkpt = SaCkptCheckpointOpen(client, replica, openParam); SACKPTASSERT(openCkpt != NULL); ckptReq->openCkpt = openCkpt; /* create and send client response */ ckptResp->resp->dataLength = sizeof(SaCkptCheckpointHandleT); ckptResp->resp->data = SaCkptMalloc( sizeof(SaCkptCheckpointHandleT)); memcpy(ckptResp->resp->data, &(openCkpt->checkpointHandle), sizeof(openCkpt->checkpointHandle)); ckptResp->resp->retVal = SA_OK; notifyLowPrioNode(openParam); removeOpenPendingQueue(ckptReq->clientRequest->reqParam); } else { ckptResp->resp->retVal = SA_ERR_TIMEOUT; } break; case REQ_CKPT_CLOSE: case REQ_SEC_CRT: case REQ_SEC_DEL: case REQ_SEC_RD: case REQ_SEC_WRT: case REQ_SEC_OWRT: ckptResp->resp->retVal = SA_ERR_TIMEOUT; break; default: break; } SaCkptResponseSend(&ckptResp); /* alway return FALSE since it was one-time timer*/ return FALSE;}SaCkptRequestT*SaCkptRequestReceive(IPC_Channel* clientChannel){ IPC_Message *ipcMsg = NULL; SaCkptRequestT *ckptReq = NULL; SaCkptResponseT *ckptResp = NULL; SaCkptClientRequestT *clientRequest = NULL; char *p = NULL; int rc = IPC_OK; char *strReq = NULL; while (clientChannel->ops->is_message_pending(clientChannel) != TRUE) { cl_shortsleep(); } /* receive ipc message */ rc = clientChannel->ops->recv(clientChannel, &ipcMsg); if (rc != IPC_OK) {/* cl_log(LOG_ERR, "Receive error request"); */ return NULL; } if (ipcMsg->msg_len < sizeof(SaCkptClientRequestT) - 2*sizeof(void*)) { cl_log(LOG_ERR, "Error request"); return NULL; } p = ipcMsg->msg_body; clientRequest = SaCkptMalloc(sizeof(SaCkptClientRequestT)); SACKPTASSERT(clientRequest != NULL); memcpy(clientRequest, p, sizeof(SaCkptClientRequestT) - 2*sizeof(void*)); p += (sizeof(SaCkptClientRequestT) - 2*sizeof(void*)); /* the param and data length should not be negative */ if (clientRequest->reqParamLength > 0) { clientRequest->reqParam = SaCkptMalloc( clientRequest->reqParamLength); SACKPTASSERT(clientRequest->reqParam != NULL); memcpy(clientRequest->reqParam, p, clientRequest->reqParamLength); p += clientRequest->reqParamLength; } else { clientRequest->reqParam = NULL; } if (clientRequest->dataLength > 0) { clientRequest->data = SaCkptMalloc( clientRequest->dataLength); SACKPTASSERT(clientRequest->data != NULL); memcpy(clientRequest->data, p, clientRequest->dataLength); p += clientRequest->dataLength; } else { clientRequest->data = NULL; } /* free ipc message */ if (ipcMsg->msg_body != NULL) { free(ipcMsg->msg_body); } free(ipcMsg); if (saCkptService->flagVerbose) { strReq = SaCkptReq2String(clientRequest->req); cl_log(LOG_INFO, "<<<---"); cl_log(LOG_INFO, "Receive request %lu (%s), client %d", clientRequest->requestNO, strReq, clientRequest->clientHandle); SaCkptFree((void*)&strReq); } ckptReq = SaCkptMalloc(sizeof(SaCkptRequestT)); SACKPTASSERT(ckptReq != NULL); ckptReq->clientRequest= clientRequest; ckptReq->clientChannel = clientChannel; ckptReq->client = NULL; ckptReq->openCkpt = NULL; ckptReq->operation = 0; ckptReq->timeoutTag = 0; /* by default, the request send to itself */ strcpy(ckptReq->toNodeName, saCkptService->nodeName); /* check the clienthanle */ if (clientRequest->clientHandle < 0) { ckptResp = SaCkptResponseCreate(ckptReq); ckptResp->resp->retVal = SA_ERR_BAD_HANDLE; SaCkptResponseSend(&ckptResp); return NULL; } else { ckptReq->client = (SaCkptClientT*)g_hash_table_lookup( saCkptService->clientHash, (gpointer)&(clientRequest->clientHandle)); if ((ckptReq->client == NULL) && (ckptReq->clientRequest->req != REQ_SERVICE_INIT)) { ckptResp = SaCkptResponseCreate(ckptReq); ckptResp->resp->retVal = SA_ERR_BAD_HANDLE; SaCkptResponseSend(&ckptResp); return NULL; } } return ckptReq;}/* remove the request and free its memory */int SaCkptRequestRemove(SaCkptRequestT** pCkptReq){ SaCkptRequestT* ckptReq = *pCkptReq; SaCkptClientT* client = ckptReq->client; unsigned int requestNO = ckptReq->clientRequest->requestNO; GList* list = NULL; if (client != NULL) { /* remove ckptReq from request queue */ g_hash_table_remove(client->requestHash, (gconstpointer)&(requestNO)); } SaCkptRequestStopTimer(ckptReq); /* free ckptReq*/ if (ckptReq->clientRequest->data != NULL) { SaCkptFree((void**)&(ckptReq->clientRequest->data)); } if (ckptReq->clientRequest->reqParam != NULL) { SaCkptFree((void**)&(ckptReq->clientRequest->reqParam)); } SaCkptFree((void**)&(ckptReq->clientRequest)); SaCkptFree((void**)&ckptReq); if (client == NULL) { *pCkptReq = NULL; return HA_OK; } /* start pending request */ list = client->pendingRequestList; if (list != NULL) { ckptReq = (SaCkptRequestT*)list->data; if (ckptReq != NULL) { SaCkptReplicaT* replica = NULL; replica = ckptReq->openCkpt->replica; if ((replica != NULL) && (replica->flagReplicaPending == FALSE)) { client->pendingRequestList = g_list_remove( client->pendingRequestList, (gpointer)ckptReq); SaCkptRequestStart(ckptReq); } /* FIXME */ /* How about the replica is NULL? */ } } *pCkptReq = NULL; return HA_OK;}void SaCkptRequestStartTimer(SaCkptRequestT* ckptReq, SaTimeT timeout){ char* strReq = NULL; /* to avoid start more than one timer */ if (ckptReq->timeoutTag <= 0) { ckptReq->timeoutTag = Gmain_timeout_add( timeout / 1000000, SaCkptRequestTimeout, (gpointer)ckptReq); if (saCkptService->flagVerbose) { strReq = SaCkptReq2String( ckptReq->clientRequest->req); cl_log(LOG_INFO, "Start timer %u for request %lu (%s), client %d", ckptReq->timeoutTag, ckptReq->clientRequest->requestNO, strReq, ckptReq->client->clientHandle); SaCkptFree((void*)&strReq); } } return;}voidSaCkptRequestStopTimer(SaCkptRequestT* ckptReq){ char* strReq = NULL; if (ckptReq->timeoutTag > 0) { if (saCkptService->flagVerbose) { strReq = SaCkptReq2String( ckptReq->clientRequest->req); cl_log(LOG_INFO, "delete timer %u for request %lu (%s), client %d", ckptReq->timeoutTag, ckptReq->clientRequest->requestNO, strReq, ckptReq->client->clientHandle); SaCkptFree((void*)&strReq); } g_source_remove(ckptReq->timeoutTag); ckptReq->timeoutTag = 0; } return;}char* SaCkptReq2String(SaCkptReqT req){ char* strTemp = NULL; char* strReq = NULL; strTemp = (char*)SaCkptMalloc(64); SACKPTASSERT(strTemp != NULL); switch (req) { case REQ_SERVICE_INIT: strcpy(strTemp, "REQ_SERVICE_INIT"); break; case REQ_SERVICE_FINL: strcpy(strTemp, "REQ_SERVICE_FINL"); break; case REQ_CKPT_OPEN: strcpy(strTemp, "REQ_CKPT_OPEN"); break; case REQ_CKPT_OPEN_ASYNC: strcpy(strTemp, "REQ_CKPT_OPEN_ASYNC"); break; case REQ_CKPT_CLOSE: strcpy(strTemp, "REQ_CKPT_CLOSE"); break; case REQ_CKPT_ULNK: strcpy(strTemp, "REQ_CKPT_ULNK"); break; case REQ_CKPT_RTN_SET: strcpy(strTemp, "REQ_CKPT_RTN_SET"); break; case REQ_CKPT_ACT_SET: strcpy(strTemp, "REQ_CKPT_ACT_SET"); break; case REQ_CKPT_STAT_GET: strcpy(strTemp, "REQ_CKPT_STAT_GET"); break; case REQ_SEC_CRT: strcpy(strTemp, "REQ_SEC_CRT"); break; case REQ_SEC_DEL: strcpy(strTemp, "REQ_SEC_DEL"); break; case REQ_SEC_EXP_SET: strcpy(strTemp, "REQ_SEC_EXP_SET"); break; case REQ_SEC_QUERY: strcpy(strTemp, "REQ_SEC_QUERY"); break; case REQ_SEC_WRT: strcpy(strTemp, "REQ_SEC_WRT"); break; case REQ_SEC_OWRT: strcpy(strTemp, "REQ_SEC_OWRT"); break; case REQ_SEC_RD: strcpy(strTemp, "REQ_SEC_RD"); break; case REQ_CKPT_SYNC: strcpy(strTemp, "REQ_CKPT_SYNC"); break; case REQ_CKPT_SYNC_ASYNC: strcpy(strTemp, "REQ_CKPT_SYNC_ASYNC"); break; } strReq = (char*)SaCkptMalloc(strlen(strTemp)+1); if (strReq == NULL) { return NULL; } memcpy(strReq, strTemp, strlen(strTemp)+1); SaCkptFree((void*)&strTemp); return strReq;}/* * after the node failure, * the request will be resent to the new active replica */void SaCkptRequestNodeFailure(gpointer key, gpointer value, gpointer userdata){ SaCkptRequestT* ckptReq = value; char* strNodeName = userdata; /* restart the sent but not finished requests */ if (!strcmp(ckptReq->toNodeName, strNodeName)) { SaCkptRequestStart(ckptReq); } return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -