📄 nodehash.c
字号:
*numbuckets = nbuckets; *numbatches = nbatch;}/* ---------------------------------------------------------------- * ExecHashTableDestroy * * destroy a hash table * ---------------------------------------------------------------- */voidExecHashTableDestroy(HashJoinTable hashtable){ int i; /* * Make sure all the temp files are closed. We skip batch 0, since it * can't have any temp files (and the arrays might not even exist if * nbatch is only 1). */ for (i = 1; i < hashtable->nbatch; i++) { if (hashtable->innerBatchFile[i]) BufFileClose(hashtable->innerBatchFile[i]); if (hashtable->outerBatchFile[i]) BufFileClose(hashtable->outerBatchFile[i]); } /* Release working memory (batchCxt is a child, so it goes away too) */ MemoryContextDelete(hashtable->hashCxt); /* And drop the control block */ pfree(hashtable);}/* * ExecHashIncreaseNumBatches * increase the original number of batches in order to reduce * current memory consumption */static voidExecHashIncreaseNumBatches(HashJoinTable hashtable){ int oldnbatch = hashtable->nbatch; int curbatch = hashtable->curbatch; int nbatch; int i; MemoryContext oldcxt; long ninmemory; long nfreed; /* do nothing if we've decided to shut off growth */ if (!hashtable->growEnabled) return; /* safety check to avoid overflow */ if (oldnbatch > INT_MAX / 2) return; nbatch = oldnbatch * 2; Assert(nbatch > 1);#ifdef HJDEBUG printf("Increasing nbatch to %d because space = %lu\n", nbatch, (unsigned long) hashtable->spaceUsed);#endif oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); if (hashtable->innerBatchFile == NULL) { /* we had no file arrays before */ hashtable->innerBatchFile = (BufFile **) palloc0(nbatch * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) palloc0(nbatch * sizeof(BufFile *)); } else { /* enlarge arrays and zero out added entries */ hashtable->innerBatchFile = (BufFile **) repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *)); MemSet(hashtable->innerBatchFile + oldnbatch, 0, (nbatch - oldnbatch) * sizeof(BufFile *)); MemSet(hashtable->outerBatchFile + oldnbatch, 0, (nbatch - oldnbatch) * sizeof(BufFile *)); } MemoryContextSwitchTo(oldcxt); hashtable->nbatch = nbatch; /* * Scan through the existing hash table entries and dump out any that are * no longer of the current batch. */ ninmemory = nfreed = 0; for (i = 0; i < hashtable->nbuckets; i++) { HashJoinTuple prevtuple; HashJoinTuple tuple; prevtuple = NULL; tuple = hashtable->buckets[i]; while (tuple != NULL) { /* save link in case we delete */ HashJoinTuple nexttuple = tuple->next; int bucketno; int batchno; ninmemory++; ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue, &bucketno, &batchno); Assert(bucketno == i); if (batchno == curbatch) { /* keep tuple */ prevtuple = tuple; } else { /* dump it out */ Assert(batchno > curbatch); ExecHashJoinSaveTuple(&tuple->htup, tuple->hashvalue, &hashtable->innerBatchFile[batchno]); /* and remove from hash table */ if (prevtuple) prevtuple->next = nexttuple; else hashtable->buckets[i] = nexttuple; /* prevtuple doesn't change */ hashtable->spaceUsed -= MAXALIGN(sizeof(HashJoinTupleData)) + tuple->htup.t_len; pfree(tuple); nfreed++; } tuple = nexttuple; } }#ifdef HJDEBUG printf("Freed %ld of %ld tuples, space now %lu\n", nfreed, ninmemory, (unsigned long) hashtable->spaceUsed);#endif /* * If we dumped out either all or none of the tuples in the table, disable * further expansion of nbatch. This situation implies that we have * enough tuples of identical hashvalues to overflow spaceAllowed. * Increasing nbatch will not fix it since there's no way to subdivide the * group any more finely. We have to just gut it out and hope the server * has enough RAM. */ if (nfreed == 0 || nfreed == ninmemory) { hashtable->growEnabled = false;#ifdef HJDEBUG printf("Disabling further increase of nbatch\n");#endif }}/* * ExecHashTableInsert * insert a tuple into the hash table depending on the hash value * it may just go to a temp file for later batches */voidExecHashTableInsert(HashJoinTable hashtable, HeapTuple tuple, uint32 hashvalue){ int bucketno; int batchno; ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); /* * decide whether to put the tuple in the hash table or a temp file */ if (batchno == hashtable->curbatch) { /* * put the tuple in hash table */ HashJoinTuple hashTuple; int hashTupleSize; hashTupleSize = MAXALIGN(sizeof(HashJoinTupleData)) + tuple->t_len; hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, hashTupleSize); hashTuple->hashvalue = hashvalue; memcpy((char *) &hashTuple->htup, (char *) tuple, sizeof(hashTuple->htup)); hashTuple->htup.t_datamcxt = hashtable->batchCxt; hashTuple->htup.t_data = (HeapTupleHeader) (((char *) hashTuple) + MAXALIGN(sizeof(HashJoinTupleData))); memcpy((char *) hashTuple->htup.t_data, (char *) tuple->t_data, tuple->t_len); hashTuple->next = hashtable->buckets[bucketno]; hashtable->buckets[bucketno] = hashTuple; hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } else { /* * put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(tuple, hashvalue, &hashtable->innerBatchFile[batchno]); }}/* * ExecHashGetHashValue * Compute the hash value for a tuple * * The tuple to be tested must be in either econtext->ecxt_outertuple or * econtext->ecxt_innertuple. Vars in the hashkeys expressions reference * either OUTER or INNER. */uint32ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys){ uint32 hashkey = 0; ListCell *hk; int i = 0; MemoryContext oldContext; /* * We reset the eval context each time to reclaim any memory leaked in the * hashkey expressions. */ ResetExprContext(econtext); oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); foreach(hk, hashkeys) { ExprState *keyexpr = (ExprState *) lfirst(hk); Datum keyval; bool isNull; /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); /* * Get the join attribute value of the tuple */ keyval = ExecEvalExpr(keyexpr, econtext, &isNull, NULL); /* * Compute the hash function */ if (!isNull) /* treat nulls as having hash key 0 */ { uint32 hkey; hkey = DatumGetUInt32(FunctionCall1(&hashtable->hashfunctions[i], keyval)); hashkey ^= hkey; } i++; } MemoryContextSwitchTo(oldContext); return hashkey;}/* * ExecHashGetBucketAndBatch * Determine the bucket number and batch number for a hash value * * Note: on-the-fly increases of nbatch must not change the bucket number * for a given hash code (since we don't move tuples to different hash * chains), and must only cause the batch number to remain the same or * increase. Our algorithm is * bucketno = hashvalue MOD nbuckets * batchno = (hashvalue DIV nbuckets) MOD nbatch * where nbuckets should preferably be prime so that all bits of the * hash value can affect both bucketno and batchno. * nbuckets doesn't change over the course of the join. * * nbatch is always a power of 2; we increase it only by doubling it. This * effectively adds one more bit to the top of the batchno. */voidExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno){ uint32 nbuckets = (uint32) hashtable->nbuckets; uint32 nbatch = (uint32) hashtable->nbatch; if (nbatch > 1) { *bucketno = hashvalue % nbuckets; /* since nbatch is a power of 2, can do MOD by masking */ *batchno = (hashvalue / nbuckets) & (nbatch - 1); } else { *bucketno = hashvalue % nbuckets; *batchno = 0; }}/* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple * * The current outer tuple must be stored in econtext->ecxt_outertuple. */HeapTupleExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext){ List *hjclauses = hjstate->hashclauses; HashJoinTable hashtable = hjstate->hj_HashTable; HashJoinTuple hashTuple = hjstate->hj_CurTuple; uint32 hashvalue = hjstate->hj_CurHashValue; /* * hj_CurTuple is NULL to start scanning a new bucket, or the address of * the last tuple returned from the current bucket. */ if (hashTuple == NULL) hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; else hashTuple = hashTuple->next; while (hashTuple != NULL) { if (hashTuple->hashvalue == hashvalue) { HeapTuple heapTuple = &hashTuple->htup; TupleTableSlot *inntuple; /* insert hashtable's tuple into exec slot so ExecQual sees it */ inntuple = ExecStoreTuple(heapTuple, hjstate->hj_HashTupleSlot, InvalidBuffer, false); /* do not pfree */ econtext->ecxt_innertuple = inntuple; /* reset temp memory each time to avoid leaks from qual expr */ ResetExprContext(econtext); if (ExecQual(hjclauses, econtext, false)) { hjstate->hj_CurTuple = hashTuple; return heapTuple; } } hashTuple = hashTuple->next; } /* * no match */ return NULL;}/* * ExecHashTableReset * * reset hash table header for new batch */voidExecHashTableReset(HashJoinTable hashtable){ MemoryContext oldcxt; int nbuckets = hashtable->nbuckets; /* * Release all the hash buckets and tuples acquired in the prior pass, and * reinitialize the context for a new pass. */ MemoryContextReset(hashtable->batchCxt); oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); /* Reallocate and reinitialize the hash bucket headers. */ hashtable->buckets = (HashJoinTuple *) palloc0(nbuckets * sizeof(HashJoinTuple)); hashtable->spaceUsed = 0; MemoryContextSwitchTo(oldcxt);}voidExecReScanHash(HashState *node, ExprContext *exprCtxt){ /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (((PlanState *) node)->lefttree->chgParam == NULL) ExecReScan(((PlanState *) node)->lefttree, exprCtxt);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -