📄 nodehash.c
字号:
{ i++; sprintf(myPortalName, "<hashtable %d>", i); myPortal = GetPortalByName(myPortalName); } while (PortalIsValid(myPortal)); myPortal = CreatePortal(myPortalName); Assert(PortalIsValid(myPortal)); hashtable->myPortal = (void *) myPortal; /* kluge for circular * includes */ hashtable->hashCxt = (MemoryContext) PortalGetVariableMemory(myPortal); hashtable->batchCxt = (MemoryContext) PortalGetHeapMemory(myPortal); /* Allocate data that will live for the life of the hashjoin */ oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); if (nbatch > 0) { /* --------------- * allocate and initialize the file arrays in hashCxt * --------------- */ hashtable->innerBatchFile = (BufFile **) palloc(nbatch * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) palloc(nbatch * sizeof(BufFile *)); hashtable->innerBatchSize = (long *) palloc(nbatch * sizeof(long)); hashtable->outerBatchSize = (long *) palloc(nbatch * sizeof(long)); for (i = 0; i < nbatch; i++) { hashtable->innerBatchFile[i] = NULL; hashtable->outerBatchFile[i] = NULL; hashtable->innerBatchSize[i] = 0; hashtable->outerBatchSize[i] = 0; } /* The files will not be opened until later... */ } /* * Prepare portal for the first-scan space allocations; allocate the * hashbucket array therein, and set each bucket "empty". */ MemoryContextSwitchTo(hashtable->batchCxt); StartPortalAllocMode(DefaultAllocMode, 0); hashtable->buckets = (HashJoinTuple *) palloc(nbuckets * sizeof(HashJoinTuple)); if (hashtable->buckets == NULL) elog(ERROR, "Insufficient memory for hash table."); for (i = 0; i < nbuckets; i++) hashtable->buckets[i] = NULL; MemoryContextSwitchTo(oldcxt); return hashtable;}/* ---------------------------------------------------------------- * ExecHashTableDestroy * * destroy a hash table * ---------------------------------------------------------------- */voidExecHashTableDestroy(HashJoinTable hashtable){ int i; /* Make sure all the temp files are closed */ for (i = 0; i < hashtable->nbatch; i++) { if (hashtable->innerBatchFile[i]) BufFileClose(hashtable->innerBatchFile[i]); if (hashtable->outerBatchFile[i]) BufFileClose(hashtable->outerBatchFile[i]); } /* Destroy the portal to release all working memory */ /* cast here is a kluge for circular includes... */ PortalDestroy((Portal *) &hashtable->myPortal); /* And drop the control block */ pfree(hashtable);}/* ---------------------------------------------------------------- * ExecHashTableInsert * * insert a tuple into the hash table depending on the hash value * it may just go to a tmp file for other batches * ---------------------------------------------------------------- */voidExecHashTableInsert(HashJoinTable hashtable, ExprContext *econtext, Var *hashkey){ int bucketno = ExecHashGetBucket(hashtable, econtext, hashkey); TupleTableSlot *slot = econtext->ecxt_innertuple; HeapTuple heapTuple = slot->val; /* ---------------- * decide whether to put the tuple in the hash table or a tmp file * ---------------- */ if (bucketno < hashtable->nbuckets) { /* --------------- * put the tuple in hash table * --------------- */ HashJoinTuple hashTuple; int hashTupleSize; hashTupleSize = MAXALIGN(sizeof(*hashTuple)) + heapTuple->t_len; hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, hashTupleSize); if (hashTuple == NULL) elog(ERROR, "Insufficient memory for hash table."); memcpy((char *) &hashTuple->htup, (char *) heapTuple, sizeof(hashTuple->htup)); hashTuple->htup.t_data = (HeapTupleHeader) (((char *) hashTuple) + MAXALIGN(sizeof(*hashTuple))); memcpy((char *) hashTuple->htup.t_data, (char *) heapTuple->t_data, heapTuple->t_len); hashTuple->next = hashtable->buckets[bucketno]; hashtable->buckets[bucketno] = hashTuple; } else { /* ----------------- * put the tuple into a tmp file for other batches * ----------------- */ int batchno = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) / (hashtable->totalbuckets - hashtable->nbuckets); hashtable->innerBatchSize[batchno]++; ExecHashJoinSaveTuple(heapTuple, hashtable->innerBatchFile[batchno]); }}/* ---------------------------------------------------------------- * ExecHashGetBucket * * Get the hash value for a tuple * ---------------------------------------------------------------- */intExecHashGetBucket(HashJoinTable hashtable, ExprContext *econtext, Var *hashkey){ int bucketno; Datum keyval; bool isNull; /* ---------------- * Get the join attribute value of the tuple * * ...It's quick hack - use ExecEvalExpr instead of ExecEvalVar: * hashkey may be T_ArrayRef, not just T_Var. - vadim 04/22/97 * ---------------- */ keyval = ExecEvalExpr((Node *) hashkey, econtext, &isNull, NULL); /* * keyval could be null, so we better point it to something valid * before trying to run hashFunc on it. --djm 8/17/96 */ if (isNull) { execConstByVal = 0; execConstLen = 0; keyval = (Datum) ""; } /* ------------------ * compute the hash function * ------------------ */ bucketno = hashFunc(keyval, execConstLen, execConstByVal) % hashtable->totalbuckets;#ifdef HJDEBUG if (bucketno >= hashtable->nbuckets) printf("hash(%d) = %d SAVED\n", keyval, bucketno); else printf("hash(%d) = %d\n", keyval, bucketno);#endif return bucketno;}/* ---------------------------------------------------------------- * ExecScanHashBucket * * scan a hash bucket of matches * ---------------------------------------------------------------- */HeapTupleExecScanHashBucket(HashJoinState *hjstate, List *hjclauses, ExprContext *econtext){ HashJoinTable hashtable = hjstate->hj_HashTable; HashJoinTuple hashTuple = hjstate->hj_CurTuple; /* * hj_CurTuple is NULL to start scanning a new bucket, or the address * of the last tuple returned from the current bucket. */ if (hashTuple == NULL) hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; else hashTuple = hashTuple->next; while (hashTuple != NULL) { HeapTuple heapTuple = &hashTuple->htup; TupleTableSlot *inntuple; bool qualResult; /* insert hashtable's tuple into exec slot so ExecQual sees it */ inntuple = ExecStoreTuple(heapTuple, /* tuple to store */ hjstate->hj_HashTupleSlot, /* slot */ InvalidBuffer, false); /* do not pfree this tuple */ econtext->ecxt_innertuple = inntuple; qualResult = ExecQual(hjclauses, econtext); if (qualResult) { hjstate->hj_CurTuple = hashTuple; return heapTuple; } hashTuple = hashTuple->next; } /* ---------------- * no match * ---------------- */ return NULL;}/* ---------------------------------------------------------------- * hashFunc * * the hash function, copied from Margo * ---------------------------------------------------------------- */static inthashFunc(Datum key, int len, bool byVal){ unsigned int h = 0; unsigned char *k; if (byVal) { /* * If it's a by-value data type, use the 'len' least significant * bytes of the Datum value. This should do the right thing on * either bigendian or littleendian hardware --- see the Datum * access macros in c.h. */ while (len-- > 0) { h = (h * PRIME1) ^ (key & 0xFF); key >>= 8; } } else { /* * If this is a variable length type, then 'k' points to a "struct * varlena" and len == -1. NOTE: VARSIZE returns the "real" data * length plus the sizeof the "vl_len" attribute of varlena (the * length information). 'k' points to the beginning of the varlena * struct, so we have to use "VARDATA" to find the beginning of * the "real" data. */ if (len == -1) { len = VARSIZE(key) - VARHDRSZ; k = (unsigned char *) VARDATA(key); } else k = (unsigned char *) key; while (len-- > 0) h = (h * PRIME1) ^ (*k++); } return h % PRIME2;}/* ---------------------------------------------------------------- * ExecHashTableReset * * reset hash table header for new batch * * ntuples is the number of tuples in the inner relation's batch * (which we currently don't actually use...) * ---------------------------------------------------------------- */voidExecHashTableReset(HashJoinTable hashtable, long ntuples){ MemoryContext oldcxt; int nbuckets = hashtable->nbuckets; int i; /* * Release all the hash buckets and tuples acquired in the prior pass, * and reinitialize the portal for a new pass. */ oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); EndPortalAllocMode(); StartPortalAllocMode(DefaultAllocMode, 0); /* * We still use the same number of physical buckets as in the first * pass. (It could be different; but we already decided how many * buckets would be appropriate for the allowed memory, so stick with * that number.) We MUST set totalbuckets to equal nbuckets, because * from now on no tuples will go out to temp files; there are no more * virtual buckets, only real buckets. (This implies that tuples will * go into different bucket numbers than they did on the first pass, * but that's OK.) */ hashtable->totalbuckets = nbuckets; /* Reallocate and reinitialize the hash bucket headers. */ hashtable->buckets = (HashJoinTuple *) palloc(nbuckets * sizeof(HashJoinTuple)); if (hashtable->buckets == NULL) elog(ERROR, "Insufficient memory for hash table."); for (i = 0; i < nbuckets; i++) hashtable->buckets[i] = NULL; MemoryContextSwitchTo(oldcxt);}voidExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent){ /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (((Plan *) node)->lefttree->chgParam == NULL) ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -