dynahash.c

来自「postgresql8.3.4源码,开源数据库」· C语言 代码 · 共 1,529 行 · 第 1/3 页

C
1,529
字号
/* * hash_seq_init/_search/_term *			Sequentially search through hash table and return *			all the elements one by one, return NULL when no more. * * hash_seq_term should be called if and only if the scan is abandoned before * completion; if hash_seq_search returns NULL then it has already done the * end-of-scan cleanup. * * NOTE: caller may delete the returned element before continuing the scan. * However, deleting any other element while the scan is in progress is * UNDEFINED (it might be the one that curIndex is pointing at!).  Also, * if elements are added to the table while the scan is in progress, it is * unspecified whether they will be visited by the scan or not. * * NOTE: it is possible to use hash_seq_init/hash_seq_search without any * worry about hash_seq_term cleanup, if the hashtable is first locked against * further insertions by calling hash_freeze.  This is used by nodeAgg.c, * wherein it is inconvenient to track whether a scan is still open, and * there's no possibility of further insertions after readout has begun. * * NOTE: to use this with a partitioned hashtable, caller had better hold * at least shared lock on all partitions of the table throughout the scan! * We can cope with insertions or deletions by our own backend, but *not* * with concurrent insertions or deletions by another. */voidhash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp){	status->hashp = hashp;	status->curBucket = 0;	status->curEntry = NULL;	if (!hashp->frozen)		register_seq_scan(hashp);}void *hash_seq_search(HASH_SEQ_STATUS *status){	HTAB	   *hashp;	HASHHDR    *hctl;	uint32		max_bucket;	long		ssize;	long		segment_num;	long		segment_ndx;	HASHSEGMENT segp;	uint32		curBucket;	HASHELEMENT *curElem;	if ((curElem = status->curEntry) != NULL)	{		/* Continuing scan of curBucket... */		status->curEntry = curElem->link;		if (status->curEntry == NULL)	/* end of this bucket */			++status->curBucket;		return (void *) ELEMENTKEY(curElem);	}	/*	 * Search for next nonempty bucket starting at curBucket.	 */	curBucket = status->curBucket;	hashp = status->hashp;	hctl = hashp->hctl;	ssize = hashp->ssize;	max_bucket = hctl->max_bucket;	if (curBucket > max_bucket)	{		hash_seq_term(status);		return NULL;			/* search is done */	}	/*	 * first find the right segment in the table directory.	 */	segment_num = curBucket >> hashp->sshift;	segment_ndx = MOD(curBucket, ssize);	segp = hashp->dir[segment_num];	/*	 * Pick up the first item in this bucket's chain.  If chain is not empty	 * we can begin searching it.  Otherwise we have to advance to find the	 * next nonempty bucket.  We try to optimize that case since searching a	 * near-empty hashtable has to iterate this loop a lot.	 */	while ((curElem = segp[segment_ndx]) == NULL)	{		/* empty bucket, advance to next */		if (++curBucket > max_bucket)		{			status->curBucket = curBucket;			hash_seq_term(status);			return NULL;		/* search is done */		}		if (++segment_ndx >= ssize)		{			segment_num++;			segment_ndx = 0;			segp = hashp->dir[segment_num];		}	}	/* Begin scan of curBucket... */	status->curEntry = curElem->link;	if (status->curEntry == NULL)		/* end of this bucket */		++curBucket;	status->curBucket = curBucket;	return (void *) ELEMENTKEY(curElem);}voidhash_seq_term(HASH_SEQ_STATUS *status){	if (!status->hashp->frozen)		deregister_seq_scan(status->hashp);}/* * hash_freeze *			Freeze a hashtable against future insertions (deletions are *			still allowed) * * The reason for doing this is that by preventing any more bucket splits, * we no longer need to worry about registering hash_seq_search scans, * and thus caller need not be careful about ensuring hash_seq_term gets * called at the right times. * * Multiple calls to hash_freeze() are allowed, but you can't freeze a table * with active scans (since hash_seq_term would then do the wrong thing). */voidhash_freeze(HTAB *hashp){	if (hashp->isshared)		elog(ERROR, "cannot freeze shared hashtable \"%s\"", hashp->tabname);	if (!hashp->frozen && has_seq_scans(hashp))		elog(ERROR, "cannot freeze hashtable \"%s\" because it has active scans",			 hashp->tabname);	hashp->frozen = true;}/********************************* UTILITIES ************************//* * Expand the table by adding one more hash bucket. */static boolexpand_table(HTAB *hashp){	HASHHDR    *hctl = hashp->hctl;	HASHSEGMENT old_seg,				new_seg;	long		old_bucket,				new_bucket;	long		new_segnum,				new_segndx;	long		old_segnum,				old_segndx;	HASHBUCKET *oldlink,			   *newlink;	HASHBUCKET	currElement,				nextElement;	Assert(!IS_PARTITIONED(hctl));#ifdef HASH_STATISTICS	hash_expansions++;#endif	new_bucket = hctl->max_bucket + 1;	new_segnum = new_bucket >> hashp->sshift;	new_segndx = MOD(new_bucket, hashp->ssize);	if (new_segnum >= hctl->nsegs)	{		/* Allocate new segment if necessary -- could fail if dir full */		if (new_segnum >= hctl->dsize)			if (!dir_realloc(hashp))				return false;		if (!(hashp->dir[new_segnum] = seg_alloc(hashp)))			return false;		hctl->nsegs++;	}	/* OK, we created a new bucket */	hctl->max_bucket++;	/*	 * *Before* changing masks, find old bucket corresponding to same hash	 * values; values in that bucket may need to be relocated to new bucket.	 * Note that new_bucket is certainly larger than low_mask at this point,	 * so we can skip the first step of the regular hash mask calc.	 */	old_bucket = (new_bucket & hctl->low_mask);	/*	 * If we crossed a power of 2, readjust masks.	 */	if ((uint32) new_bucket > hctl->high_mask)	{		hctl->low_mask = hctl->high_mask;		hctl->high_mask = (uint32) new_bucket | hctl->low_mask;	}	/*	 * Relocate records to the new bucket.	NOTE: because of the way the hash	 * masking is done in calc_bucket, only one old bucket can need to be	 * split at this point.  With a different way of reducing the hash value,	 * that might not be true!	 */	old_segnum = old_bucket >> hashp->sshift;	old_segndx = MOD(old_bucket, hashp->ssize);	old_seg = hashp->dir[old_segnum];	new_seg = hashp->dir[new_segnum];	oldlink = &old_seg[old_segndx];	newlink = &new_seg[new_segndx];	for (currElement = *oldlink;		 currElement != NULL;		 currElement = nextElement)	{		nextElement = currElement->link;		if ((long) calc_bucket(hctl, currElement->hashvalue) == old_bucket)		{			*oldlink = currElement;			oldlink = &currElement->link;		}		else		{			*newlink = currElement;			newlink = &currElement->link;		}	}	/* don't forget to terminate the rebuilt hash chains... */	*oldlink = NULL;	*newlink = NULL;	return true;}static booldir_realloc(HTAB *hashp){	HASHSEGMENT *p;	HASHSEGMENT *old_p;	long		new_dsize;	long		old_dirsize;	long		new_dirsize;	if (hashp->hctl->max_dsize != NO_MAX_DSIZE)		return false;	/* Reallocate directory */	new_dsize = hashp->hctl->dsize << 1;	old_dirsize = hashp->hctl->dsize * sizeof(HASHSEGMENT);	new_dirsize = new_dsize * sizeof(HASHSEGMENT);	old_p = hashp->dir;	CurrentDynaHashCxt = hashp->hcxt;	p = (HASHSEGMENT *) hashp->alloc((Size) new_dirsize);	if (p != NULL)	{		memcpy(p, old_p, old_dirsize);		MemSet(((char *) p) + old_dirsize, 0, new_dirsize - old_dirsize);		hashp->dir = p;		hashp->hctl->dsize = new_dsize;		/* XXX assume the allocator is palloc, so we know how to free */		Assert(hashp->alloc == DynaHashAlloc);		pfree(old_p);		return true;	}	return false;}static HASHSEGMENTseg_alloc(HTAB *hashp){	HASHSEGMENT segp;	CurrentDynaHashCxt = hashp->hcxt;	segp = (HASHSEGMENT) hashp->alloc(sizeof(HASHBUCKET) * hashp->ssize);	if (!segp)		return NULL;	MemSet(segp, 0, sizeof(HASHBUCKET) * hashp->ssize);	return segp;}/* * allocate some new elements and link them into the free list */static boolelement_alloc(HTAB *hashp, int nelem){	/* use volatile pointer to prevent code rearrangement */	volatile HASHHDR *hctlv = hashp->hctl;	Size		elementSize;	HASHELEMENT *firstElement;	HASHELEMENT *tmpElement;	HASHELEMENT *prevElement;	int			i;	/* Each element has a HASHELEMENT header plus user data. */	elementSize = MAXALIGN(sizeof(HASHELEMENT)) + MAXALIGN(hctlv->entrysize);	CurrentDynaHashCxt = hashp->hcxt;	firstElement = (HASHELEMENT *) hashp->alloc(nelem * elementSize);	if (!firstElement)		return false;	/* prepare to link all the new entries into the freelist */	prevElement = NULL;	tmpElement = firstElement;	for (i = 0; i < nelem; i++)	{		tmpElement->link = prevElement;		prevElement = tmpElement;		tmpElement = (HASHELEMENT *) (((char *) tmpElement) + elementSize);	}	/* if partitioned, must lock to touch freeList */	if (IS_PARTITIONED(hctlv))		SpinLockAcquire(&hctlv->mutex);	/* freelist could be nonempty if two backends did this concurrently */	firstElement->link = hctlv->freeList;	hctlv->freeList = prevElement;	if (IS_PARTITIONED(hctlv))		SpinLockRelease(&hctlv->mutex);	return true;}/* complain when we have detected a corrupted hashtable */static voidhash_corrupted(HTAB *hashp){	/*	 * If the corruption is in a shared hashtable, we'd better force a	 * systemwide restart.	Otherwise, just shut down this one backend.	 */	if (hashp->isshared)		elog(PANIC, "hash table \"%s\" corrupted", hashp->tabname);	else		elog(FATAL, "hash table \"%s\" corrupted", hashp->tabname);}/* calculate ceil(log base 2) of num */intmy_log2(long num){	int			i;	long		limit;	for (i = 0, limit = 1; limit < num; i++, limit <<= 1)		;	return i;}/************************* SEQ SCAN TRACKING ************************//* * We track active hash_seq_search scans here.	The need for this mechanism * comes from the fact that a scan will get confused if a bucket split occurs * while it's in progress: it might visit entries twice, or even miss some * entirely (if it's partway through the same bucket that splits).  Hence * we want to inhibit bucket splits if there are any active scans on the * table being inserted into.  This is a fairly rare case in current usage, * so just postponing the split until the next insertion seems sufficient. * * Given present usages of the function, only a few scans are likely to be * open concurrently; so a finite-size stack of open scans seems sufficient, * and we don't worry that linear search is too slow.  Note that we do * allow multiple scans of the same hashtable to be open concurrently. * * This mechanism can support concurrent scan and insertion in a shared * hashtable if it's the same backend doing both.  It would fail otherwise, * but locking reasons seem to preclude any such scenario anyway, so we don't * worry. * * This arrangement is reasonably robust if a transient hashtable is deleted * without notifying us.  The absolute worst case is we might inhibit splits * in another table created later at exactly the same address.	We will give * a warning at transaction end for reference leaks, so any bugs leading to * lack of notification should be easy to catch. */#define MAX_SEQ_SCANS 100static HTAB *seq_scan_tables[MAX_SEQ_SCANS];	/* tables being scanned */static int	seq_scan_level[MAX_SEQ_SCANS];		/* subtransaction nest level */static int	num_seq_scans = 0;/* Register a table as having an active hash_seq_search scan */static voidregister_seq_scan(HTAB *hashp){	if (num_seq_scans >= MAX_SEQ_SCANS)		elog(ERROR, "too many active hash_seq_search scans, cannot start one on \"%s\"",			 hashp->tabname);	seq_scan_tables[num_seq_scans] = hashp;	seq_scan_level[num_seq_scans] = GetCurrentTransactionNestLevel();	num_seq_scans++;}/* Deregister an active scan */static voidderegister_seq_scan(HTAB *hashp){	int			i;	/* Search backward since it's most likely at the stack top */	for (i = num_seq_scans - 1; i >= 0; i--)	{		if (seq_scan_tables[i] == hashp)		{			seq_scan_tables[i] = seq_scan_tables[num_seq_scans - 1];			seq_scan_level[i] = seq_scan_level[num_seq_scans - 1];			num_seq_scans--;			return;		}	}	elog(ERROR, "no hash_seq_search scan for hash table \"%s\"",		 hashp->tabname);}/* Check if a table has any active scan */static boolhas_seq_scans(HTAB *hashp){	int			i;	for (i = 0; i < num_seq_scans; i++)	{		if (seq_scan_tables[i] == hashp)			return true;	}	return false;}/* Clean up any open scans at end of transaction */voidAtEOXact_HashTables(bool isCommit){	/*	 * During abort cleanup, open scans are expected; just silently clean 'em	 * out.  An open scan at commit means someone forgot a hash_seq_term()	 * call, so complain.	 *	 * Note: it's tempting to try to print the tabname here, but refrain for	 * fear of touching deallocated memory.  This isn't a user-facing message	 * anyway, so it needn't be pretty.	 */	if (isCommit)	{		int			i;		for (i = 0; i < num_seq_scans; i++)		{			elog(WARNING, "leaked hash_seq_search scan for hash table %p",				 seq_scan_tables[i]);		}	}	num_seq_scans = 0;}/* Clean up any open scans at end of subtransaction */voidAtEOSubXact_HashTables(bool isCommit, int nestDepth){	int			i;	/*	 * Search backward to make cleanup easy.  Note we must check all entries,	 * not only those at the end of the array, because deletion technique	 * doesn't keep them in order.	 */	for (i = num_seq_scans - 1; i >= 0; i--)	{		if (seq_scan_level[i] >= nestDepth)		{			if (isCommit)				elog(WARNING, "leaked hash_seq_search scan for hash table %p",					 seq_scan_tables[i]);			seq_scan_tables[i] = seq_scan_tables[num_seq_scans - 1];			seq_scan_level[i] = seq_scan_level[num_seq_scans - 1];			num_seq_scans--;		}	}}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?