📄 gistxlog.c
字号:
case XLOG_GIST_INSERT_COMPLETE: appendStringInfo(buf, "complete_insert: rel %u/%u/%u", ((gistxlogInsertComplete *) rec)->node.spcNode, ((gistxlogInsertComplete *) rec)->node.dbNode, ((gistxlogInsertComplete *) rec)->node.relNode); break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; }}IndexTuplegist_form_invalid_tuple(BlockNumber blkno){ /* * we don't alloc space for null's bitmap, this is invalid tuple, be * carefull in read and write code */ Size size = IndexInfoFindDataOffset(0); IndexTuple tuple = (IndexTuple) palloc0(size); tuple->t_info |= size; ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); GistTupleSetInvalid(tuple); return tuple;}static voidgistxlogFindPath(Relation index, gistIncompleteInsert *insert){ GISTInsertStack *top; insert->pathlen = 0; insert->path = NULL; if ((top = gistFindPath(index, insert->origblkno)) != NULL) { int i; GISTInsertStack *ptr; for (ptr = top; ptr; ptr = ptr->parent) insert->pathlen++; insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); i = 0; for (ptr = top; ptr; ptr = ptr->parent) insert->path[i++] = ptr->blkno; } else elog(ERROR, "lost parent for block %u", insert->origblkno);}static SplitedPageLayout *gistMakePageLayout(Buffer *buffers, int nbuffers){ SplitedPageLayout *res = NULL, *resptr; while (nbuffers-- > 0) { Page page = BufferGetPage(buffers[nbuffers]); IndexTuple *vec; int veclen; resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout)); resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]); resptr->block.num = PageGetMaxOffsetNumber(page); vec = gistextractpage(page, &veclen); resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist)); resptr->next = res; res = resptr; } return res;}/* * Continue insert after crash. In normal situations, there aren't any * incomplete inserts, but if a crash occurs partway through an insertion * sequence, we'll need to finish making the index valid at the end of WAL * replay. * * Note that we assume the index is now in a valid state, except for the * unfinished insertion. In particular it's safe to invoke gistFindPath(); * there shouldn't be any garbage pages for it to run into. * * To complete insert we can't use basic insertion algorithm because * during insertion we can't call user-defined support functions of opclass. * So, we insert 'invalid' tuples without real key and do it by separate algorithm. * 'invalid' tuple should be updated by vacuum full. */static voidgistContinueInsert(gistIncompleteInsert *insert){ IndexTuple *itup; int i, lenitup; Relation index; index = XLogOpenRelation(insert->node); /* * needed vector itup never will be more than initial lenblkno+2, because * during this processing Indextuple can be only smaller */ lenitup = insert->lenblk; itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ )); for (i = 0; i < insert->lenblk; i++) itup[i] = gist_form_invalid_tuple(insert->blkno[i]); /* * any insertion of itup[] should make LOG message about */ if (insert->origblkno == GIST_ROOT_BLKNO) { /* * it was split root, so we should only make new root. it can't be * simple insert into root, we should replace all content of root. */ Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true); gistnewroot(index, buffer, itup, lenitup, NULL); UnlockReleaseBuffer(buffer); } else { Buffer *buffers; Page *pages; int numbuffer; OffsetNumber *todelete; /* construct path */ gistxlogFindPath(index, insert); Assert(insert->pathlen > 0); buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ )); for (i = 0; i < insert->pathlen; i++) { int j, k, pituplen = 0; XLogRecData *rdata; XLogRecPtr recptr; Buffer tempbuffer = InvalidBuffer; int ntodelete = 0; numbuffer = 1; buffers[0] = ReadBuffer(index, insert->path[i]); LockBuffer(buffers[0], GIST_EXCLUSIVE); /* * we check buffer, because we restored page earlier */ gistcheckpage(index, buffers[0]); pages[0] = BufferGetPage(buffers[0]); Assert(!GistPageIsLeaf(pages[0])); pituplen = PageGetMaxOffsetNumber(pages[0]); /* find remove old IndexTuples to remove */ for (j = 0; j < pituplen && ntodelete < lenitup; j++) { BlockNumber blkno; ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber); IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid); blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); for (k = 0; k < lenitup; k++) if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) { todelete[ntodelete] = j + FirstOffsetNumber - ntodelete; ntodelete++; break; } } if (ntodelete == 0) elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)"); /* * we check space with subtraction only first tuple to delete, * hope, that wiil be enough space.... */ if (gistnospace(pages[0], itup, lenitup, *todelete, 0)) { /* no space left on page, so we must split */ buffers[numbuffer] = ReadBuffer(index, P_NEW); LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber); numbuffer++; if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) { Buffer tmp; /* * we split root, just copy content from root to new page */ /* sanity check */ if (i + 1 != insert->pathlen) elog(PANIC, "unexpected pathlen in index \"%s\"", RelationGetRelationName(index)); /* fill new page, root will be changed later */ tempbuffer = ReadBuffer(index, P_NEW); LockBuffer(tempbuffer, GIST_EXCLUSIVE); memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer)); /* swap buffers[0] (was root) and temp buffer */ tmp = buffers[0]; buffers[0] = tempbuffer; tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, * it is still unchanged */ pages[0] = BufferGetPage(buffers[0]); } START_CRIT_SECTION(); for (j = 0; j < ntodelete; j++) PageIndexTupleDelete(pages[0], todelete[j]); rdata = formSplitRdata(index->rd_node, insert->path[i], false, &(insert->key), gistMakePageLayout(buffers, numbuffer)); } else { START_CRIT_SECTION(); for (j = 0; j < ntodelete; j++) PageIndexTupleDelete(pages[0], todelete[j]); gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber); rdata = formUpdateRdata(index->rd_node, buffers[0], todelete, ntodelete, itup, lenitup, &(insert->key)); } /* * use insert->key as mark for completion of insert (form*Rdata() * above) for following possible replays */ /* write pages, we should mark it dirty befor XLogInsert() */ for (j = 0; j < numbuffer; j++) { GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; MarkBufferDirty(buffers[j]); } recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); for (j = 0; j < numbuffer; j++) { PageSetLSN(pages[j], recptr); PageSetTLI(pages[j], ThisTimeLineID); } END_CRIT_SECTION(); lenitup = numbuffer; for (j = 0; j < numbuffer; j++) { itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); UnlockReleaseBuffer(buffers[j]); } if (tempbuffer != InvalidBuffer) { /* * it was a root split, so fill it by new values */ gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key)); UnlockReleaseBuffer(tempbuffer); } } } ereport(LOG, (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery", insert->node.spcNode, insert->node.dbNode, insert->node.relNode), errdetail("Incomplete insertion detected during crash replay.")));}voidgist_xlog_startup(void){ incomplete_inserts = NIL; insertCtx = AllocSetContextCreate(CurrentMemoryContext, "GiST recovery temporary context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); opCtx = createTempGistContext();}voidgist_xlog_cleanup(void){ ListCell *l; MemoryContext oldCxt; oldCxt = MemoryContextSwitchTo(opCtx); foreach(l, incomplete_inserts) { gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); gistContinueInsert(insert); MemoryContextReset(opCtx); } MemoryContextSwitchTo(oldCxt); MemoryContextDelete(opCtx); MemoryContextDelete(insertCtx);}boolgist_safe_restartpoint(void){ if (incomplete_inserts) return false; return true;}XLogRecData *formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ItemPointer key, SplitedPageLayout *dist){ XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); SplitedPageLayout *ptr; int npage = 0, cur = 1; ptr = dist; while (ptr) { npage++; ptr = ptr->next; } rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); xlrec->node = node; xlrec->origblkno = blkno; xlrec->origleaf = page_is_leaf; xlrec->npage = (uint16) npage; if (key) xlrec->key = *key; else ItemPointerSetInvalid(&(xlrec->key)); rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) xlrec; rdata[0].len = sizeof(gistxlogPageSplit); rdata[0].next = NULL; ptr = dist; while (ptr) { rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) &(ptr->block); rdata[cur].len = sizeof(gistxlogPage); rdata[cur - 1].next = &(rdata[cur]); cur++; rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) (ptr->list); rdata[cur].len = ptr->lenlist; rdata[cur - 1].next = &(rdata[cur]); rdata[cur].next = NULL; cur++; ptr = ptr->next; } return rdata;}/* * Construct the rdata array for an XLOG record describing a page update * (deletion and/or insertion of tuples on a single index page). * * Note that both the todelete array and the tuples are marked as belonging * to the target buffer; they need not be stored in XLOG if XLogInsert decides * to log the whole buffer contents instead. Also, we take care that there's * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer, OffsetNumber *todelete, int ntodelete, IndexTuple *itup, int ituplen, ItemPointer key){ XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; if (key) xlrec->key = *key; else ItemPointerSetInvalid(&(xlrec->key)); rdata[0].buffer = buffer; rdata[0].buffer_std = true; rdata[0].data = NULL; rdata[0].len = 0; rdata[0].next = &(rdata[1]); rdata[1].data = (char *) xlrec; rdata[1].len = sizeof(gistxlogPageUpdate); rdata[1].buffer = InvalidBuffer; rdata[1].next = &(rdata[2]); rdata[2].data = (char *) todelete; rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); rdata[2].buffer = buffer; rdata[2].buffer_std = true; rdata[2].next = NULL; /* new tuples */ cur = 3; for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); rdata[cur].data = (char *) (itup[i]); rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; rdata[cur].next = NULL; cur++; } return rdata;}XLogRecPtrgistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len){ gistxlogInsertComplete xlrec; XLogRecData rdata[2]; XLogRecPtr recptr; Assert(len > 0); xlrec.node = node; rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = sizeof(gistxlogInsertComplete); rdata[0].next = &(rdata[1]); rdata[1].buffer = InvalidBuffer; rdata[1].data = (char *) keys; rdata[1].len = sizeof(ItemPointerData) * len; rdata[1].next = NULL; START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); END_CRIT_SECTION(); return recptr;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -