📄 sinvaladt.c
字号:
if (startChain == InvalidOffset) return NULL; eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + startChain); return eP;}/************************************************************************//* SIGetLastDataEntry(segP) returns last data entry in the chain *//************************************************************************/static SISegEntry *SIGetLastDataEntry(SISeg *segP){ SISegEntry *eP; Offset endChain; endChain = SIGetEndEntryChain(segP); if (endChain == InvalidOffset) return NULL; eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + endChain); return eP;}/************************************************************************//* SIGetNextDataEntry(segP, offset) returns next data entry *//************************************************************************/static SISegEntry *SIGetNextDataEntry(SISeg *segP, Offset offset){ SISegEntry *eP; if (offset == InvalidOffset) return NULL; eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + offset); return eP;}/************************************************************************//* SIGetNthDataEntry(segP, n) returns the n-th data entry in chain *//************************************************************************/static SISegEntry *SIGetNthDataEntry(SISeg *segP, int n) /* must range from 1 to MaxMessages */{ SISegEntry *eP; int i; if (n <= 0) return NULL; eP = SIGetFirstDataEntry(segP); for (i = 1; i < n; i++) { /* skip one and get the next */ eP = SIGetNextDataEntry(segP, eP->next); } return eP;}/************************************************************************//* SIEntryOffset(segP, entryP) returns the offset for an pointer *//************************************************************************/static OffsetSIEntryOffset(SISeg *segP, SISegEntry *entryP){ /* relative to B !! */ return ((Offset) ((Pointer) entryP - (Pointer) segP - SIGetStartEntrySection(segP)));}/************************************************************************//* SISetDataEntry(segP, data) - sets a message in the segemnt *//************************************************************************/boolSISetDataEntry(SISeg *segP, SharedInvalidData *data){ Offset offsetToNewData; SISegEntry *eP, *lastP; if (!SIIncNumEntries(segP, 1)) return false; /* no space */ /* get a free entry */ offsetToNewData = SIGetStartFreeSpace(segP); eP = SIGetNextDataEntry(segP, offsetToNewData); /* it's a free one */ SISetStartFreeSpace(segP, eP->next); /* fill it up */ eP->entryData = *data; eP->isfree = false; eP->next = InvalidOffset; /* handle insertion point at the end of the chain !! */ lastP = SIGetLastDataEntry(segP); if (lastP == NULL) { /* there is no chain, insert the first entry */ SISetStartEntryChain(segP, SIEntryOffset(segP, eP)); } else { /* there is a last entry in the chain */ lastP->next = SIEntryOffset(segP, eP); } SISetEndEntryChain(segP, SIEntryOffset(segP, eP)); return true;}/************************************************************************//* SIDecProcLimit(segP, num) decrements all process limits *//************************************************************************/static voidSIDecProcLimit(SISeg *segP, int num){ int i; for (i = 0; i < segP->maxBackends; i++) { /* decrement only, if there is a limit > 0 */ if (segP->procState[i].limit > 0) { segP->procState[i].limit = segP->procState[i].limit - num; if (segP->procState[i].limit < 0) { /* limit was not high enough, reset to zero */ /* negative means it's a dead backend */ segP->procState[i].limit = 0; } } }}/************************************************************************//* SIDelDataEntry(segP) - free the FIRST entry *//************************************************************************/boolSIDelDataEntry(SISeg *segP){ SISegEntry *e1P; if (!SIDecNumEntries(segP, 1)) { /* no entries in buffer */ return false; } e1P = SIGetFirstDataEntry(segP); SISetStartEntryChain(segP, e1P->next); if (SIGetStartEntryChain(segP) == InvalidOffset) { /* it was the last entry */ SISetEndEntryChain(segP, InvalidOffset); } /* free the entry */ e1P->isfree = true; e1P->next = SIGetStartFreeSpace(segP); SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P)); SIDecProcLimit(segP, 1); return true;}/************************************************************************//* SISetProcStateInvalid(segP) checks and marks a backends state as *//* invalid *//************************************************************************/voidSISetProcStateInvalid(SISeg *segP){ int i; for (i = 0; i < segP->maxBackends; i++) { if (segP->procState[i].limit == 0) { /* backend i didn't read any message */ segP->procState[i].resetState = true; /* * XXX signal backend that it has to reset its internal cache * ? */ } }}/************************************************************************//* SIReadEntryData(segP, backendId, function) *//* - marks messages to be read by id *//* and executes function *//************************************************************************/voidSIReadEntryData(SISeg *segP, int backendId, void (*invalFunction) (), void (*resetFunction) ()){ int i = 0; SISegEntry *data; Assert(segP->procState[backendId - 1].tag == MyBackendTag); if (!segP->procState[backendId - 1].resetState) { /* invalidate data, but only those, you have not seen yet !! */ /* therefore skip read messages */ data = SIGetNthDataEntry(segP, SIGetProcStateLimit(segP, backendId - 1) + 1); while (data != NULL) { i++; segP->procState[backendId - 1].limit++; /* one more message read */ invalFunction(data->entryData.cacheId, data->entryData.hashIndex, &data->entryData.pointerData); data = SIGetNextDataEntry(segP, data->next); } /* SIDelExpiredDataEntries(segP); */ } else { /* backend must not read messages, its own state has to be reset */ elog(NOTICE, "SIReadEntryData: cache state reset"); resetFunction(); /* XXXX call it here, parameters? */ /* new valid state--mark all messages "read" */ segP->procState[backendId - 1].resetState = false; segP->procState[backendId - 1].limit = SIGetNumEntries(segP); } /* check whether we can remove dead messages */ if (i > MAXNUMMESSAGES) elog(FATAL, "SIReadEntryData: Invalid segment state");}/************************************************************************//* SIDelExpiredDataEntries (segP) - removes irrelevant messages *//************************************************************************/voidSIDelExpiredDataEntries(SISeg *segP){ int min, i, h; min = 9999999; for (i = 0; i < segP->maxBackends; i++) { h = SIGetProcStateLimit(segP, i); if (h >= 0) { /* backend active */ if (h < min) min = h; } } if (min != 9999999) { /* we can remove min messages */ for (i = 1; i <= min; i++) { /* this adjusts also the state limits! */ if (!SIDelDataEntry(segP)) elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state"); } }}/************************************************************************//* SISegInit(segP) - initializes the segment *//************************************************************************/static voidSISegInit(SISeg *segP, SISegOffsets *oP, int maxBackends){ int i; SISegEntry *eP; /* set semaphore ids in the segment */ /* XXX */ SISetStartEntrySection(segP, oP->offsetToFirstEntry); SISetEndEntrySection(segP, oP->offsetToEndOfSegment); SISetStartFreeSpace(segP, 0); SISetStartEntryChain(segP, InvalidOffset); SISetEndEntryChain(segP, InvalidOffset); SISetNumEntries(segP, 0); SISetMaxNumEntries(segP, MAXNUMMESSAGES); segP->maxBackends = maxBackends; for (i = 0; i < segP->maxBackends; i++) { segP->procState[i].limit = -1; /* no backend active !! */ segP->procState[i].resetState = false; segP->procState[i].tag = InvalidBackendTag; } /* construct a chain of free entries */ for (i = 1; i < MAXNUMMESSAGES; i++) { eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + (i - 1) * sizeof(SISegEntry)); eP->isfree = true; eP->next = i * sizeof(SISegEntry); /* relative to B */ } /* handle the last free entry separate */ eP = (SISegEntry *) ((Pointer) segP + SIGetStartEntrySection(segP) + (MAXNUMMESSAGES - 1) * sizeof(SISegEntry)); eP->isfree = true; eP->next = InvalidOffset; /* it's the end of the chain !! */}/************************************************************************//* SISegmentKill(key) - kill any segment *//************************************************************************/static voidSISegmentKill(int key) /* the corresponding key for the segment */{ IpcMemoryKill(key);}/************************************************************************//* SISegmentGet(key, size) - get a shared segment of size <size> *//* returns a segment id *//************************************************************************/static IpcMemoryIdSISegmentGet(int key, /* the corresponding key for the segment */ int size, /* size of segment in bytes */ bool create){ IpcMemoryId shmid; if (create) shmid = IpcMemoryCreate(key, size, IPCProtection); else shmid = IpcMemoryIdGet(key, size); return shmid;}/************************************************************************//* SISegmentAttach(shmid) - attach a shared segment with id shmid *//************************************************************************/static voidSISegmentAttach(IpcMemoryId shmid){ shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid); if (shmInvalBuffer == IpcMemAttachFailed) { /* XXX use validity function */ elog(NOTICE, "SISegmentAttach: Could not attach segment"); elog(FATAL, "SISegmentAttach: %m"); }}/************************************************************************//* SISegmentInit() initialize SI segment *//* *//* NB: maxBackends param is only valid when killExistingSegment is true *//************************************************************************/intSISegmentInit(bool killExistingSegment, IPCKey key, int maxBackends){ SISegOffsets offsets; IpcMemoryId shmId; bool create; if (killExistingSegment) { /* Kill existing segment */ /* set semaphore */ SISegmentKill(key); /* Get a shared segment */ SIComputeSize(&offsets, maxBackends); create = true; shmId = SISegmentGet(key, offsets.offsetToEndOfSegment, create); if (shmId < 0) { perror("SISegmentGet: failed"); return -1; /* an error */ } /* Attach the shared cache invalidation segment */ /* sets the global variable shmInvalBuffer */ SISegmentAttach(shmId); /* Init shared memory table */ SISegInit(shmInvalBuffer, &offsets, maxBackends); } else { /* use an existing segment */ create = false; shmId = SISegmentGet(key, 0, create); if (shmId < 0) { perror("SISegmentGet: getting an existent segment failed"); return -1; /* an error */ } /* Attach the shared cache invalidation segment */ SISegmentAttach(shmId); } return 1;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -