⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nbtxlog.c

📁 PostgreSQL7.4.6 for Linux
💻 C
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------- * * nbtxlog.c *	  WAL replay logic for btrees. * * * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.7 2003/09/29 23:40:26 tgl Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/nbtree.h"#include "access/xlogutils.h"/* * We must keep track of expected insertions due to page splits, and apply * them manually if they are not seen in the WAL log during replay.  This * makes it safe for page insertion to be a multiple-WAL-action process. * * The data structure is a simple linked list --- this should be good enough, * since we don't expect a page split to remain incomplete for long. */typedef struct bt_incomplete_split{	RelFileNode node;			/* the index */	BlockNumber leftblk;		/* left half of split */	BlockNumber rightblk;		/* right half of split */	bool		is_root;		/* we split the root */} bt_incomplete_split;static List *incomplete_splits;static voidlog_incomplete_split(RelFileNode node, BlockNumber leftblk,					 BlockNumber rightblk, bool is_root){	bt_incomplete_split *split = palloc(sizeof(bt_incomplete_split));	split->node = node;	split->leftblk = leftblk;	split->rightblk = rightblk;	split->is_root = is_root;	incomplete_splits = lappend(incomplete_splits, split);}static voidforget_matching_split(Relation reln, RelFileNode node,					  BlockNumber insertblk, OffsetNumber offnum,					  bool is_root){	Buffer		buffer;	Page		page;	BTItem		btitem;	BlockNumber rightblk;	List	   *l;	/* Get downlink TID from page */	buffer = XLogReadBuffer(false, reln, insertblk);	if (!BufferIsValid(buffer))		elog(PANIC, "forget_matching_split: block unfound");	page = (Page) BufferGetPage(buffer);	btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));	rightblk = ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid));	Assert(ItemPointerGetOffsetNumber(&(btitem->bti_itup.t_tid)) == P_HIKEY);	UnlockAndReleaseBuffer(buffer);	foreach(l, incomplete_splits)	{		bt_incomplete_split *split = (bt_incomplete_split *) lfirst(l);		if (RelFileNodeEquals(node, split->node) &&			rightblk == split->rightblk)		{			if (is_root != split->is_root)				elog(LOG, "forget_matching_split: fishy is_root data");			incomplete_splits = lremove(split, incomplete_splits);			break;				/* need not look further */		}	}}static void_bt_restore_page(Page page, char *from, int len){	BTItemData	btdata;	Size		itemsz;	char	   *end = from + len;	for (; from < end;)	{		memcpy(&btdata, from, sizeof(BTItemData));		itemsz = IndexTupleDSize(btdata.bti_itup) +			(sizeof(BTItemData) - sizeof(IndexTupleData));		itemsz = MAXALIGN(itemsz);		if (PageAddItem(page, (Item) from, itemsz,					  FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)			elog(PANIC, "_bt_restore_page: can't add item to page");		from += itemsz;	}}static void_bt_restore_meta(Relation reln, XLogRecPtr lsn,				 BlockNumber root, uint32 level,				 BlockNumber fastroot, uint32 fastlevel,				 bool markvalid){	Buffer		metabuf;	Page		metapg;	BTMetaPageData *md;	BTPageOpaque pageop;	metabuf = XLogReadBuffer(true, reln, BTREE_METAPAGE);	if (!BufferIsValid(metabuf))		elog(PANIC, "_bt_restore_meta: no metapage");	metapg = BufferGetPage(metabuf);	_bt_pageinit(metapg, BufferGetPageSize(metabuf));	md = BTPageGetMeta(metapg);	md->btm_magic = markvalid ? BTREE_MAGIC : 0;	md->btm_version = BTREE_VERSION;	md->btm_root = root;	md->btm_level = level;	md->btm_fastroot = fastroot;	md->btm_fastlevel = fastlevel;	pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);	pageop->btpo_flags = BTP_META;	PageSetLSN(metapg, lsn);	PageSetSUI(metapg, ThisStartUpID);	UnlockAndWriteBuffer(metabuf);}static voidbtree_xlog_insert(bool redo, bool isleaf, bool ismeta,				  XLogRecPtr lsn, XLogRecord *record){	xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);	Relation	reln;	Buffer		buffer;	Page		page;	BTPageOpaque pageop;	char	   *datapos;	int			datalen;	xl_btree_metadata md;	datapos = (char *) xlrec + SizeOfBtreeInsert;	datalen = record->xl_len - SizeOfBtreeInsert;	if (ismeta)	{		memcpy(&md, datapos, sizeof(xl_btree_metadata));		datapos += sizeof(xl_btree_metadata);		datalen -= sizeof(xl_btree_metadata);	}	if (redo && (record->xl_info & XLR_BKP_BLOCK_1) && !ismeta &&		incomplete_splits == NIL)		return;					/* nothing to do */	reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);	if (!RelationIsValid(reln))		return;	if (!redo || !(record->xl_info & XLR_BKP_BLOCK_1))	{		buffer = XLogReadBuffer(false, reln,						ItemPointerGetBlockNumber(&(xlrec->target.tid)));		if (!BufferIsValid(buffer))			elog(PANIC, "btree_insert_%sdo: block unfound", (redo) ? "re" : "un");		page = (Page) BufferGetPage(buffer);		if (PageIsNew((PageHeader) page))			elog(PANIC, "btree_insert_%sdo: uninitialized page", (redo) ? "re" : "un");		pageop = (BTPageOpaque) PageGetSpecialPointer(page);		if (redo)		{			if (XLByteLE(lsn, PageGetLSN(page)))				UnlockAndReleaseBuffer(buffer);			else			{				if (PageAddItem(page, (Item) datapos, datalen,						ItemPointerGetOffsetNumber(&(xlrec->target.tid)),								LP_USED) == InvalidOffsetNumber)					elog(PANIC, "btree_insert_redo: failed to add item");				PageSetLSN(page, lsn);				PageSetSUI(page, ThisStartUpID);				UnlockAndWriteBuffer(buffer);			}		}		else		{			if (XLByteLT(PageGetLSN(page), lsn))				elog(PANIC, "btree_insert_undo: bad page LSN");			if (!P_ISLEAF(pageop))				UnlockAndReleaseBuffer(buffer);			else				elog(PANIC, "btree_insert_undo: unimplemented");		}	}	if (redo)					/* metapage changes not undoable */	{		if (ismeta)			_bt_restore_meta(reln, lsn,							 md.root, md.level,							 md.fastroot, md.fastlevel,							 true);	}	/* Forget any split this insertion completes */	if (redo && !isleaf && incomplete_splits != NIL)	{		forget_matching_split(reln, xlrec->target.node,						 ItemPointerGetBlockNumber(&(xlrec->target.tid)),						ItemPointerGetOffsetNumber(&(xlrec->target.tid)),							  false);	}}static voidbtree_xlog_split(bool redo, bool onleft, bool isroot,				 XLogRecPtr lsn, XLogRecord *record){	xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);	Relation	reln;	BlockNumber targetblk;	BlockNumber leftsib;	BlockNumber rightsib;	Buffer		buffer;	Page		page;	BTPageOpaque pageop;	char	   *op = (redo) ? "redo" : "undo";	reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);	if (!RelationIsValid(reln))		return;	targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid));	leftsib = (onleft) ? targetblk : xlrec->otherblk;	rightsib = (onleft) ? xlrec->otherblk : targetblk;	/* Left (original) sibling */	buffer = XLogReadBuffer(false, reln, leftsib);	if (!BufferIsValid(buffer))		elog(PANIC, "btree_split_%s: lost left sibling", op);	page = (Page) BufferGetPage(buffer);	if (redo)		_bt_pageinit(page, BufferGetPageSize(buffer));	else if (PageIsNew((PageHeader) page))		elog(PANIC, "btree_split_undo: uninitialized left sibling");	pageop = (BTPageOpaque) PageGetSpecialPointer(page);	if (redo)	{		pageop->btpo_prev = xlrec->leftblk;		pageop->btpo_next = rightsib;		pageop->btpo.level = xlrec->level;		pageop->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;		_bt_restore_page(page,						 (char *) xlrec + SizeOfBtreeSplit,						 xlrec->leftlen);		PageSetLSN(page, lsn);		PageSetSUI(page, ThisStartUpID);		UnlockAndWriteBuffer(buffer);	}	else	{		/* undo */		if (XLByteLT(PageGetLSN(page), lsn))			elog(PANIC, "btree_split_undo: bad left sibling LSN");		elog(PANIC, "btree_split_undo: unimplemented");	}	/* Right (new) sibling */	buffer = XLogReadBuffer((redo) ? true : false, reln, rightsib);	if (!BufferIsValid(buffer))		elog(PANIC, "btree_split_%s: lost right sibling", op);	page = (Page) BufferGetPage(buffer);	if (redo)		_bt_pageinit(page, BufferGetPageSize(buffer));	else if (PageIsNew((PageHeader) page))		elog(PANIC, "btree_split_undo: uninitialized right sibling");	pageop = (BTPageOpaque) PageGetSpecialPointer(page);	if (redo)	{		pageop->btpo_prev = leftsib;		pageop->btpo_next = xlrec->rightblk;		pageop->btpo.level = xlrec->level;		pageop->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;		_bt_restore_page(page,					  (char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,					 record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);		PageSetLSN(page, lsn);		PageSetSUI(page, ThisStartUpID);		UnlockAndWriteBuffer(buffer);	}	else	{		/* undo */		if (XLByteLT(PageGetLSN(page), lsn))			elog(PANIC, "btree_split_undo: bad right sibling LSN");		elog(PANIC, "btree_split_undo: unimplemented");	}	/* Fix left-link of right (next) page */	if (redo && !(record->xl_info & XLR_BKP_BLOCK_1))	{		if (xlrec->rightblk != P_NONE)		{			buffer = XLogReadBuffer(false, reln, xlrec->rightblk);			if (!BufferIsValid(buffer))				elog(PANIC, "btree_split_redo: lost next right page");			page = (Page) BufferGetPage(buffer);			if (PageIsNew((PageHeader) page))				elog(PANIC, "btree_split_redo: uninitialized next right page");			if (XLByteLE(lsn, PageGetLSN(page)))				UnlockAndReleaseBuffer(buffer);			else			{				pageop = (BTPageOpaque) PageGetSpecialPointer(page);				pageop->btpo_prev = rightsib;				PageSetLSN(page, lsn);				PageSetSUI(page, ThisStartUpID);				UnlockAndWriteBuffer(buffer);			}		}	}	/* Forget any split this insertion completes */	if (redo && xlrec->level > 0 && incomplete_splits != NIL)	{		forget_matching_split(reln, xlrec->target.node,						 ItemPointerGetBlockNumber(&(xlrec->target.tid)),						ItemPointerGetOffsetNumber(&(xlrec->target.tid)),							  false);	}	/* The job ain't done till the parent link is inserted... */	log_incomplete_split(xlrec->target.node,						 leftsib, rightsib, isroot);}static voidbtree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record){	xl_btree_delete *xlrec;	Relation	reln;	Buffer		buffer;	Page		page;	if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))		return;	xlrec = (xl_btree_delete *) XLogRecGetData(record);	reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);	if (!RelationIsValid(reln))		return;	buffer = XLogReadBuffer(false, reln, xlrec->block);	if (!BufferIsValid(buffer))		elog(PANIC, "btree_delete_redo: block unfound");	page = (Page) BufferGetPage(buffer);	if (PageIsNew((PageHeader) page))		elog(PANIC, "btree_delete_redo: uninitialized page");	if (XLByteLE(lsn, PageGetLSN(page)))	{		UnlockAndReleaseBuffer(buffer);		return;	}	if (record->xl_len > SizeOfBtreeDelete)	{		OffsetNumber *unused;		OffsetNumber *unend;		unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);		unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);		/* be careful to delete from back to front */		while (unused < unend)		{			unend--;			PageIndexTupleDelete(page, *unend);		}	}	PageSetLSN(page, lsn);	PageSetSUI(page, ThisStartUpID);	UnlockAndWriteBuffer(buffer);}static voidbtree_xlog_delete_page(bool redo, bool ismeta,					   XLogRecPtr lsn, XLogRecord *record){	xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);	Relation	reln;	BlockNumber parent;	BlockNumber target;	BlockNumber leftsib;	BlockNumber rightsib;	Buffer		buffer;	Page		page;	BTPageOpaque pageop;	char	   *op = (redo) ? "redo" : "undo";	reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);	if (!RelationIsValid(reln))		return;	parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));	target = xlrec->deadblk;	leftsib = xlrec->leftblk;	rightsib = xlrec->rightblk;	/* parent page */	if (redo && !(record->xl_info & XLR_BKP_BLOCK_1))	{		buffer = XLogReadBuffer(false, reln, parent);		if (!BufferIsValid(buffer))			elog(PANIC, "btree_delete_page_redo: parent block unfound");		page = (Page) BufferGetPage(buffer);		pageop = (BTPageOpaque) PageGetSpecialPointer(page);		if (PageIsNew((PageHeader) page))			elog(PANIC, "btree_delete_page_redo: uninitialized parent page");		if (XLByteLE(lsn, PageGetLSN(page)))			UnlockAndReleaseBuffer(buffer);		else		{			OffsetNumber poffset;			poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));			if (poffset >= PageGetMaxOffsetNumber(page))			{				Assert(poffset == P_FIRSTDATAKEY(pageop));				PageIndexTupleDelete(page, poffset);				pageop->btpo_flags |= BTP_HALF_DEAD;			}			else			{				ItemId		itemid;				BTItem		btitem;				OffsetNumber nextoffset;				itemid = PageGetItemId(page, poffset);				btitem = (BTItem) PageGetItem(page, itemid);				ItemPointerSet(&(btitem->bti_itup.t_tid), rightsib, P_HIKEY);				nextoffset = OffsetNumberNext(poffset);				PageIndexTupleDelete(page, nextoffset);			}			PageSetLSN(page, lsn);			PageSetSUI(page, ThisStartUpID);			UnlockAndWriteBuffer(buffer);		}	}	/* Fix left-link of right sibling */	if (redo && !(record->xl_info & XLR_BKP_BLOCK_2))	{		buffer = XLogReadBuffer(false, reln, rightsib);		if (!BufferIsValid(buffer))			elog(PANIC, "btree_delete_page_redo: lost right sibling");		page = (Page) BufferGetPage(buffer);		if (PageIsNew((PageHeader) page))

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -