dynahash.c
来自「postgresql8.3.4源码,开源数据库」· C语言 代码 · 共 1,529 行 · 第 1/3 页
C
1,529 行
/* * hash_seq_init/_search/_term * Sequentially search through hash table and return * all the elements one by one, return NULL when no more. * * hash_seq_term should be called if and only if the scan is abandoned before * completion; if hash_seq_search returns NULL then it has already done the * end-of-scan cleanup. * * NOTE: caller may delete the returned element before continuing the scan. * However, deleting any other element while the scan is in progress is * UNDEFINED (it might be the one that curIndex is pointing at!). Also, * if elements are added to the table while the scan is in progress, it is * unspecified whether they will be visited by the scan or not. * * NOTE: it is possible to use hash_seq_init/hash_seq_search without any * worry about hash_seq_term cleanup, if the hashtable is first locked against * further insertions by calling hash_freeze. This is used by nodeAgg.c, * wherein it is inconvenient to track whether a scan is still open, and * there's no possibility of further insertions after readout has begun. * * NOTE: to use this with a partitioned hashtable, caller had better hold * at least shared lock on all partitions of the table throughout the scan! * We can cope with insertions or deletions by our own backend, but *not* * with concurrent insertions or deletions by another. */voidhash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp){ status->hashp = hashp; status->curBucket = 0; status->curEntry = NULL; if (!hashp->frozen) register_seq_scan(hashp);}void *hash_seq_search(HASH_SEQ_STATUS *status){ HTAB *hashp; HASHHDR *hctl; uint32 max_bucket; long ssize; long segment_num; long segment_ndx; HASHSEGMENT segp; uint32 curBucket; HASHELEMENT *curElem; if ((curElem = status->curEntry) != NULL) { /* Continuing scan of curBucket... */ status->curEntry = curElem->link; if (status->curEntry == NULL) /* end of this bucket */ ++status->curBucket; return (void *) ELEMENTKEY(curElem); } /* * Search for next nonempty bucket starting at curBucket. */ curBucket = status->curBucket; hashp = status->hashp; hctl = hashp->hctl; ssize = hashp->ssize; max_bucket = hctl->max_bucket; if (curBucket > max_bucket) { hash_seq_term(status); return NULL; /* search is done */ } /* * first find the right segment in the table directory. */ segment_num = curBucket >> hashp->sshift; segment_ndx = MOD(curBucket, ssize); segp = hashp->dir[segment_num]; /* * Pick up the first item in this bucket's chain. If chain is not empty * we can begin searching it. Otherwise we have to advance to find the * next nonempty bucket. We try to optimize that case since searching a * near-empty hashtable has to iterate this loop a lot. */ while ((curElem = segp[segment_ndx]) == NULL) { /* empty bucket, advance to next */ if (++curBucket > max_bucket) { status->curBucket = curBucket; hash_seq_term(status); return NULL; /* search is done */ } if (++segment_ndx >= ssize) { segment_num++; segment_ndx = 0; segp = hashp->dir[segment_num]; } } /* Begin scan of curBucket... */ status->curEntry = curElem->link; if (status->curEntry == NULL) /* end of this bucket */ ++curBucket; status->curBucket = curBucket; return (void *) ELEMENTKEY(curElem);}voidhash_seq_term(HASH_SEQ_STATUS *status){ if (!status->hashp->frozen) deregister_seq_scan(status->hashp);}/* * hash_freeze * Freeze a hashtable against future insertions (deletions are * still allowed) * * The reason for doing this is that by preventing any more bucket splits, * we no longer need to worry about registering hash_seq_search scans, * and thus caller need not be careful about ensuring hash_seq_term gets * called at the right times. * * Multiple calls to hash_freeze() are allowed, but you can't freeze a table * with active scans (since hash_seq_term would then do the wrong thing). */voidhash_freeze(HTAB *hashp){ if (hashp->isshared) elog(ERROR, "cannot freeze shared hashtable \"%s\"", hashp->tabname); if (!hashp->frozen && has_seq_scans(hashp)) elog(ERROR, "cannot freeze hashtable \"%s\" because it has active scans", hashp->tabname); hashp->frozen = true;}/********************************* UTILITIES ************************//* * Expand the table by adding one more hash bucket. */static boolexpand_table(HTAB *hashp){ HASHHDR *hctl = hashp->hctl; HASHSEGMENT old_seg, new_seg; long old_bucket, new_bucket; long new_segnum, new_segndx; long old_segnum, old_segndx; HASHBUCKET *oldlink, *newlink; HASHBUCKET currElement, nextElement; Assert(!IS_PARTITIONED(hctl));#ifdef HASH_STATISTICS hash_expansions++;#endif new_bucket = hctl->max_bucket + 1; new_segnum = new_bucket >> hashp->sshift; new_segndx = MOD(new_bucket, hashp->ssize); if (new_segnum >= hctl->nsegs) { /* Allocate new segment if necessary -- could fail if dir full */ if (new_segnum >= hctl->dsize) if (!dir_realloc(hashp)) return false; if (!(hashp->dir[new_segnum] = seg_alloc(hashp))) return false; hctl->nsegs++; } /* OK, we created a new bucket */ hctl->max_bucket++; /* * *Before* changing masks, find old bucket corresponding to same hash * values; values in that bucket may need to be relocated to new bucket. * Note that new_bucket is certainly larger than low_mask at this point, * so we can skip the first step of the regular hash mask calc. */ old_bucket = (new_bucket & hctl->low_mask); /* * If we crossed a power of 2, readjust masks. */ if ((uint32) new_bucket > hctl->high_mask) { hctl->low_mask = hctl->high_mask; hctl->high_mask = (uint32) new_bucket | hctl->low_mask; } /* * Relocate records to the new bucket. NOTE: because of the way the hash * masking is done in calc_bucket, only one old bucket can need to be * split at this point. With a different way of reducing the hash value, * that might not be true! */ old_segnum = old_bucket >> hashp->sshift; old_segndx = MOD(old_bucket, hashp->ssize); old_seg = hashp->dir[old_segnum]; new_seg = hashp->dir[new_segnum]; oldlink = &old_seg[old_segndx]; newlink = &new_seg[new_segndx]; for (currElement = *oldlink; currElement != NULL; currElement = nextElement) { nextElement = currElement->link; if ((long) calc_bucket(hctl, currElement->hashvalue) == old_bucket) { *oldlink = currElement; oldlink = &currElement->link; } else { *newlink = currElement; newlink = &currElement->link; } } /* don't forget to terminate the rebuilt hash chains... */ *oldlink = NULL; *newlink = NULL; return true;}static booldir_realloc(HTAB *hashp){ HASHSEGMENT *p; HASHSEGMENT *old_p; long new_dsize; long old_dirsize; long new_dirsize; if (hashp->hctl->max_dsize != NO_MAX_DSIZE) return false; /* Reallocate directory */ new_dsize = hashp->hctl->dsize << 1; old_dirsize = hashp->hctl->dsize * sizeof(HASHSEGMENT); new_dirsize = new_dsize * sizeof(HASHSEGMENT); old_p = hashp->dir; CurrentDynaHashCxt = hashp->hcxt; p = (HASHSEGMENT *) hashp->alloc((Size) new_dirsize); if (p != NULL) { memcpy(p, old_p, old_dirsize); MemSet(((char *) p) + old_dirsize, 0, new_dirsize - old_dirsize); hashp->dir = p; hashp->hctl->dsize = new_dsize; /* XXX assume the allocator is palloc, so we know how to free */ Assert(hashp->alloc == DynaHashAlloc); pfree(old_p); return true; } return false;}static HASHSEGMENTseg_alloc(HTAB *hashp){ HASHSEGMENT segp; CurrentDynaHashCxt = hashp->hcxt; segp = (HASHSEGMENT) hashp->alloc(sizeof(HASHBUCKET) * hashp->ssize); if (!segp) return NULL; MemSet(segp, 0, sizeof(HASHBUCKET) * hashp->ssize); return segp;}/* * allocate some new elements and link them into the free list */static boolelement_alloc(HTAB *hashp, int nelem){ /* use volatile pointer to prevent code rearrangement */ volatile HASHHDR *hctlv = hashp->hctl; Size elementSize; HASHELEMENT *firstElement; HASHELEMENT *tmpElement; HASHELEMENT *prevElement; int i; /* Each element has a HASHELEMENT header plus user data. */ elementSize = MAXALIGN(sizeof(HASHELEMENT)) + MAXALIGN(hctlv->entrysize); CurrentDynaHashCxt = hashp->hcxt; firstElement = (HASHELEMENT *) hashp->alloc(nelem * elementSize); if (!firstElement) return false; /* prepare to link all the new entries into the freelist */ prevElement = NULL; tmpElement = firstElement; for (i = 0; i < nelem; i++) { tmpElement->link = prevElement; prevElement = tmpElement; tmpElement = (HASHELEMENT *) (((char *) tmpElement) + elementSize); } /* if partitioned, must lock to touch freeList */ if (IS_PARTITIONED(hctlv)) SpinLockAcquire(&hctlv->mutex); /* freelist could be nonempty if two backends did this concurrently */ firstElement->link = hctlv->freeList; hctlv->freeList = prevElement; if (IS_PARTITIONED(hctlv)) SpinLockRelease(&hctlv->mutex); return true;}/* complain when we have detected a corrupted hashtable */static voidhash_corrupted(HTAB *hashp){ /* * If the corruption is in a shared hashtable, we'd better force a * systemwide restart. Otherwise, just shut down this one backend. */ if (hashp->isshared) elog(PANIC, "hash table \"%s\" corrupted", hashp->tabname); else elog(FATAL, "hash table \"%s\" corrupted", hashp->tabname);}/* calculate ceil(log base 2) of num */intmy_log2(long num){ int i; long limit; for (i = 0, limit = 1; limit < num; i++, limit <<= 1) ; return i;}/************************* SEQ SCAN TRACKING ************************//* * We track active hash_seq_search scans here. The need for this mechanism * comes from the fact that a scan will get confused if a bucket split occurs * while it's in progress: it might visit entries twice, or even miss some * entirely (if it's partway through the same bucket that splits). Hence * we want to inhibit bucket splits if there are any active scans on the * table being inserted into. This is a fairly rare case in current usage, * so just postponing the split until the next insertion seems sufficient. * * Given present usages of the function, only a few scans are likely to be * open concurrently; so a finite-size stack of open scans seems sufficient, * and we don't worry that linear search is too slow. Note that we do * allow multiple scans of the same hashtable to be open concurrently. * * This mechanism can support concurrent scan and insertion in a shared * hashtable if it's the same backend doing both. It would fail otherwise, * but locking reasons seem to preclude any such scenario anyway, so we don't * worry. * * This arrangement is reasonably robust if a transient hashtable is deleted * without notifying us. The absolute worst case is we might inhibit splits * in another table created later at exactly the same address. We will give * a warning at transaction end for reference leaks, so any bugs leading to * lack of notification should be easy to catch. */#define MAX_SEQ_SCANS 100static HTAB *seq_scan_tables[MAX_SEQ_SCANS]; /* tables being scanned */static int seq_scan_level[MAX_SEQ_SCANS]; /* subtransaction nest level */static int num_seq_scans = 0;/* Register a table as having an active hash_seq_search scan */static voidregister_seq_scan(HTAB *hashp){ if (num_seq_scans >= MAX_SEQ_SCANS) elog(ERROR, "too many active hash_seq_search scans, cannot start one on \"%s\"", hashp->tabname); seq_scan_tables[num_seq_scans] = hashp; seq_scan_level[num_seq_scans] = GetCurrentTransactionNestLevel(); num_seq_scans++;}/* Deregister an active scan */static voidderegister_seq_scan(HTAB *hashp){ int i; /* Search backward since it's most likely at the stack top */ for (i = num_seq_scans - 1; i >= 0; i--) { if (seq_scan_tables[i] == hashp) { seq_scan_tables[i] = seq_scan_tables[num_seq_scans - 1]; seq_scan_level[i] = seq_scan_level[num_seq_scans - 1]; num_seq_scans--; return; } } elog(ERROR, "no hash_seq_search scan for hash table \"%s\"", hashp->tabname);}/* Check if a table has any active scan */static boolhas_seq_scans(HTAB *hashp){ int i; for (i = 0; i < num_seq_scans; i++) { if (seq_scan_tables[i] == hashp) return true; } return false;}/* Clean up any open scans at end of transaction */voidAtEOXact_HashTables(bool isCommit){ /* * During abort cleanup, open scans are expected; just silently clean 'em * out. An open scan at commit means someone forgot a hash_seq_term() * call, so complain. * * Note: it's tempting to try to print the tabname here, but refrain for * fear of touching deallocated memory. This isn't a user-facing message * anyway, so it needn't be pretty. */ if (isCommit) { int i; for (i = 0; i < num_seq_scans; i++) { elog(WARNING, "leaked hash_seq_search scan for hash table %p", seq_scan_tables[i]); } } num_seq_scans = 0;}/* Clean up any open scans at end of subtransaction */voidAtEOSubXact_HashTables(bool isCommit, int nestDepth){ int i; /* * Search backward to make cleanup easy. Note we must check all entries, * not only those at the end of the array, because deletion technique * doesn't keep them in order. */ for (i = num_seq_scans - 1; i >= 0; i--) { if (seq_scan_level[i] >= nestDepth) { if (isCommit) elog(WARNING, "leaked hash_seq_search scan for hash table %p", seq_scan_tables[i]); seq_scan_tables[i] = seq_scan_tables[num_seq_scans - 1]; seq_scan_level[i] = seq_scan_level[num_seq_scans - 1]; num_seq_scans--; } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?