⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nbtxlog.c

📁 postgresql8.3.4源码,开源数据库
💻 C
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------- * * nbtxlog.c *	  WAL replay logic for btrees. * * * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.50 2008/01/01 19:45:46 momjian Exp $ * *------------------------------------------------------------------------- */#include "postgres.h"#include "access/nbtree.h"#include "access/transam.h"/* * We must keep track of expected insertions due to page splits, and apply * them manually if they are not seen in the WAL log during replay.  This * makes it safe for page insertion to be a multiple-WAL-action process. * * Similarly, deletion of an only child page and deletion of its parent page * form multiple WAL log entries, and we have to be prepared to follow through * with the deletion if the log ends between. * * The data structure is a simple linked list --- this should be good enough, * since we don't expect a page split or multi deletion to remain incomplete * for long.  In any case we need to respect the order of operations. */typedef struct bt_incomplete_action{	RelFileNode node;			/* the index */	bool		is_split;		/* T = pending split, F = pending delete */	/* these fields are for a split: */	bool		is_root;		/* we split the root */	BlockNumber leftblk;		/* left half of split */	BlockNumber rightblk;		/* right half of split */	/* these fields are for a delete: */	BlockNumber delblk;			/* parent block to be deleted */} bt_incomplete_action;static List *incomplete_actions;static voidlog_incomplete_split(RelFileNode node, BlockNumber leftblk,					 BlockNumber rightblk, bool is_root){	bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));	action->node = node;	action->is_split = true;	action->is_root = is_root;	action->leftblk = leftblk;	action->rightblk = rightblk;	incomplete_actions = lappend(incomplete_actions, action);}static voidforget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root){	ListCell   *l;	foreach(l, incomplete_actions)	{		bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);		if (RelFileNodeEquals(node, action->node) &&			action->is_split &&			downlink == action->rightblk)		{			if (is_root != action->is_root)				elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",					 action->is_root, is_root);			incomplete_actions = list_delete_ptr(incomplete_actions, action);			pfree(action);			break;				/* need not look further */		}	}}static voidlog_incomplete_deletion(RelFileNode node, BlockNumber delblk){	bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));	action->node = node;	action->is_split = false;	action->delblk = delblk;	incomplete_actions = lappend(incomplete_actions, action);}static voidforget_matching_deletion(RelFileNode node, BlockNumber delblk){	ListCell   *l;	foreach(l, incomplete_actions)	{		bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);		if (RelFileNodeEquals(node, action->node) &&			!action->is_split &&			delblk == action->delblk)		{			incomplete_actions = list_delete_ptr(incomplete_actions, action);			pfree(action);			break;				/* need not look further */		}	}}/* * _bt_restore_page -- re-enter all the index tuples on a page * * The page is freshly init'd, and *from (length len) is a copy of what * had been its upper part (pd_upper to pd_special).  We assume that the * tuples had been added to the page in item-number order, and therefore * the one with highest item number appears first (lowest on the page). * * NOTE: the way this routine is coded, the rebuilt page will have the items * in correct itemno sequence, but physically the opposite order from the * original, because we insert them in the opposite of itemno order.  This * does not matter in any current btree code, but it's something to keep an * eye on.	Is it worth changing just on general principles?  See also the * notes in btree_xlog_split(). */static void_bt_restore_page(Page page, char *from, int len){	IndexTupleData itupdata;	Size		itemsz;	char	   *end = from + len;	for (; from < end;)	{		/* Need to copy tuple header due to alignment considerations */		memcpy(&itupdata, from, sizeof(IndexTupleData));		itemsz = IndexTupleDSize(itupdata);		itemsz = MAXALIGN(itemsz);		if (PageAddItem(page, (Item) from, itemsz, FirstOffsetNumber,						false, false) == InvalidOffsetNumber)			elog(PANIC, "_bt_restore_page: cannot add item to page");		from += itemsz;	}}static void_bt_restore_meta(Relation reln, XLogRecPtr lsn,				 BlockNumber root, uint32 level,				 BlockNumber fastroot, uint32 fastlevel){	Buffer		metabuf;	Page		metapg;	BTMetaPageData *md;	BTPageOpaque pageop;	metabuf = XLogReadBuffer(reln, BTREE_METAPAGE, true);	Assert(BufferIsValid(metabuf));	metapg = BufferGetPage(metabuf);	_bt_pageinit(metapg, BufferGetPageSize(metabuf));	md = BTPageGetMeta(metapg);	md->btm_magic = BTREE_MAGIC;	md->btm_version = BTREE_VERSION;	md->btm_root = root;	md->btm_level = level;	md->btm_fastroot = fastroot;	md->btm_fastlevel = fastlevel;	pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);	pageop->btpo_flags = BTP_META;	/*	 * Set pd_lower just past the end of the metadata.	This is not essential	 * but it makes the page look compressible to xlog.c.	 */	((PageHeader) metapg)->pd_lower =		((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;	PageSetLSN(metapg, lsn);	PageSetTLI(metapg, ThisTimeLineID);	MarkBufferDirty(metabuf);	UnlockReleaseBuffer(metabuf);}static voidbtree_xlog_insert(bool isleaf, bool ismeta,				  XLogRecPtr lsn, XLogRecord *record){	xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);	Relation	reln;	Buffer		buffer;	Page		page;	char	   *datapos;	int			datalen;	xl_btree_metadata md;	BlockNumber downlink = 0;	datapos = (char *) xlrec + SizeOfBtreeInsert;	datalen = record->xl_len - SizeOfBtreeInsert;	if (!isleaf)	{		memcpy(&downlink, datapos, sizeof(BlockNumber));		datapos += sizeof(BlockNumber);		datalen -= sizeof(BlockNumber);	}	if (ismeta)	{		memcpy(&md, datapos, sizeof(xl_btree_metadata));		datapos += sizeof(xl_btree_metadata);		datalen -= sizeof(xl_btree_metadata);	}	if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)		return;					/* nothing to do */	reln = XLogOpenRelation(xlrec->target.node);	if (!(record->xl_info & XLR_BKP_BLOCK_1))	{		buffer = XLogReadBuffer(reln,							 ItemPointerGetBlockNumber(&(xlrec->target.tid)),								false);		if (BufferIsValid(buffer))		{			page = (Page) BufferGetPage(buffer);			if (XLByteLE(lsn, PageGetLSN(page)))			{				UnlockReleaseBuffer(buffer);			}			else			{				if (PageAddItem(page, (Item) datapos, datalen,							ItemPointerGetOffsetNumber(&(xlrec->target.tid)),								false, false) == InvalidOffsetNumber)					elog(PANIC, "btree_insert_redo: failed to add item");				PageSetLSN(page, lsn);				PageSetTLI(page, ThisTimeLineID);				MarkBufferDirty(buffer);				UnlockReleaseBuffer(buffer);			}		}	}	if (ismeta)		_bt_restore_meta(reln, lsn,						 md.root, md.level,						 md.fastroot, md.fastlevel);	/* Forget any split this insertion completes */	if (!isleaf)		forget_matching_split(xlrec->target.node, downlink, false);}static voidbtree_xlog_split(bool onleft, bool isroot,				 XLogRecPtr lsn, XLogRecord *record){	xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);	Relation	reln;	Buffer		rbuf;	Page		rpage;	BTPageOpaque ropaque;	char	   *datapos;	int			datalen;	OffsetNumber newitemoff = 0;	Item		newitem = NULL;	Size		newitemsz = 0;	Item		left_hikey = NULL;	Size		left_hikeysz = 0;	reln = XLogOpenRelation(xlrec->node);	datapos = (char *) xlrec + SizeOfBtreeSplit;	datalen = record->xl_len - SizeOfBtreeSplit;	/* Forget any split this insertion completes */	if (xlrec->level > 0)	{		/* we assume SizeOfBtreeSplit is at least 16-bit aligned */		BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos);		datapos += sizeof(BlockIdData);		datalen -= sizeof(BlockIdData);		forget_matching_split(xlrec->node, downlink, false);		/* Extract left hikey and its size (still assuming 16-bit alignment) */		if (!(record->xl_info & XLR_BKP_BLOCK_1))		{			/* We assume 16-bit alignment is enough for IndexTupleSize */			left_hikey = (Item) datapos;			left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));			datapos += left_hikeysz;			datalen -= left_hikeysz;		}	}	/* Extract newitem and newitemoff, if present */	if (onleft)	{		/* Extract the offset (still assuming 16-bit alignment) */		memcpy(&newitemoff, datapos, sizeof(OffsetNumber));		datapos += sizeof(OffsetNumber);		datalen -= sizeof(OffsetNumber);	}	if (onleft && !(record->xl_info & XLR_BKP_BLOCK_1))	{		/*		 * We assume that 16-bit alignment is enough to apply IndexTupleSize		 * (since it's fetching from a uint16 field) and also enough for		 * PageAddItem to insert the tuple.		 */		newitem = (Item) datapos;		newitemsz = MAXALIGN(IndexTupleSize(newitem));		datapos += newitemsz;		datalen -= newitemsz;	}	/* Reconstruct right (new) sibling from scratch */	rbuf = XLogReadBuffer(reln, xlrec->rightsib, true);	Assert(BufferIsValid(rbuf));	rpage = (Page) BufferGetPage(rbuf);	_bt_pageinit(rpage, BufferGetPageSize(rbuf));	ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);	ropaque->btpo_prev = xlrec->leftsib;	ropaque->btpo_next = xlrec->rnext;	ropaque->btpo.level = xlrec->level;	ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;	ropaque->btpo_cycleid = 0;	_bt_restore_page(rpage, datapos, datalen);	/*	 * On leaf level, the high key of the left page is equal to the	 * first key on the right page.	 */	if (xlrec->level == 0)	{		ItemId		hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));		left_hikey = PageGetItem(rpage, hiItemId);		left_hikeysz = ItemIdGetLength(hiItemId);	}	PageSetLSN(rpage, lsn);	PageSetTLI(rpage, ThisTimeLineID);	MarkBufferDirty(rbuf);	/* don't release the buffer yet; we touch right page's first item below */	/*	 * Reconstruct left (original) sibling if needed.  Note that this code	 * ensures that the items remaining on the left page are in the correct	 * item number order, but it does not reproduce the physical order they	 * would have had.	Is this worth changing?  See also _bt_restore_page().	 */	if (!(record->xl_info & XLR_BKP_BLOCK_1))	{		Buffer		lbuf = XLogReadBuffer(reln, xlrec->leftsib, false);		if (BufferIsValid(lbuf))		{			Page		lpage = (Page) BufferGetPage(lbuf);			BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);			if (!XLByteLE(lsn, PageGetLSN(lpage)))			{				OffsetNumber off;				OffsetNumber maxoff = PageGetMaxOffsetNumber(lpage);				OffsetNumber deletable[MaxOffsetNumber];				int			ndeletable = 0;				/*				 * Remove the items from the left page that were copied to the				 * right page.	Also remove the old high key, if any. (We must				 * remove everything before trying to insert any items, else				 * we risk not having enough space.)				 */				if (!P_RIGHTMOST(lopaque))				{					deletable[ndeletable++] = P_HIKEY;					/*					 * newitemoff is given to us relative to the original					 * page's item numbering, so adjust it for this deletion.					 */					newitemoff--;				}				for (off = xlrec->firstright; off <= maxoff; off++)					deletable[ndeletable++] = off;				if (ndeletable > 0)					PageIndexMultiDelete(lpage, deletable, ndeletable);				/*				 * Add the new item if it was inserted on left page.				 */				if (onleft)				{					if (PageAddItem(lpage, newitem, newitemsz, newitemoff,									false, false) == InvalidOffsetNumber)						elog(PANIC, "failed to add new item to left page after split");				}				/* Set high key */				if (PageAddItem(lpage, left_hikey, left_hikeysz,								P_HIKEY, false, false) == InvalidOffsetNumber)					elog(PANIC, "failed to add high key to left page after split");				/* Fix opaque fields */				lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;				lopaque->btpo_next = xlrec->rightsib;				lopaque->btpo_cycleid = 0;				PageSetLSN(lpage, lsn);				PageSetTLI(lpage, ThisTimeLineID);				MarkBufferDirty(lbuf);			}			UnlockReleaseBuffer(lbuf);		}	}	/* We no longer need the right buffer */	UnlockReleaseBuffer(rbuf);	/* Fix left-link of the page to the right of the new right sibling */	if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))	{		Buffer		buffer = XLogReadBuffer(reln, xlrec->rnext, false);		if (BufferIsValid(buffer))		{			Page		page = (Page) BufferGetPage(buffer);			if (!XLByteLE(lsn, PageGetLSN(page)))			{				BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);				pageop->btpo_prev = xlrec->rightsib;				PageSetLSN(page, lsn);				PageSetTLI(page, ThisTimeLineID);				MarkBufferDirty(buffer);			}			UnlockReleaseBuffer(buffer);		}	}	/* The job ain't done till the parent link is inserted... */	log_incomplete_split(xlrec->node,						 xlrec->leftsib, xlrec->rightsib, isroot);}static voidbtree_xlog_delete(XLogRecPtr lsn, XLogRecord *record){	xl_btree_delete *xlrec;	Relation	reln;	Buffer		buffer;	Page		page;	BTPageOpaque opaque;	if (record->xl_info & XLR_BKP_BLOCK_1)		return;	xlrec = (xl_btree_delete *) XLogRecGetData(record);	reln = XLogOpenRelation(xlrec->node);	buffer = XLogReadBuffer(reln, xlrec->block, false);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -