📄 group.mx
字号:
id++; } *dst++ = buf->o; *dst++ = id; buf++; } *idp = id; return dst;}static intrefine(BAT **res, BAT *b, BAT *a, int rv){ str rev = rv ? "_rev" : ""; BAT *bn = NULL; if (BATcount(b) != BATcount(a)) { GDKerror("CTrefine%s: both BATs must have the same cardinality and their heads must form a 1-1 match.\n", rev); return GDK_FAIL; }@( /* checking only the key property is too strict, * as it might not be set although it does hold; * exhaustively checking keyness is too expensive; * hence, we just don't check, and keep our fingers crossed... */ if (!(b->hkey && a->hkey)) { if (a->hkey) { GDKerror("CTrefine%s: head of first BAT is not unique (key);", rev); } else if (b->hkey) { GDKerror("CTrefine%s: head of second BAT is not unique (key);", rev); } else { GDKerror("CTrefine%s: heads of both BATs are not unique (key);", rev); } GDKerror("CTrefine%s: heads of both BATs must be unique (key) to form a 1-1 match.\n", rev); return GDK_FAIL; }@) if (b->tkey) { /* if key, no further refinements can take place */ bn = BATmark(b, 0); } else { int (*cmp) (ptr, ptr) = BATatoms[b->ttype].atomCmp; BUN p, q, r, last = BUNfirst(b), base = a->theap ? NULL : a->batBuns->base, this = NULL; struct refine *buf, *cur, *end; bit a_void = (a->ttype == TYPE_void); int xx, tpe = a_void ? TYPE_oid : a->ttype; size_t size = DEFAULT_SIZE; oid *dst, o, *op = &o, id = 0; /* create tmp BAT that holds one cluster; estimate required size using sampling */ if (BATcount(b) > DEFAULT_SIZE) { BAT *histo = NULL, *sample = BATsample(b, DEFAULT_SIZE); if (sample) { histo = BAThistogram(sample); if (histo) { BATmax(histo, &xx); if (xx > 1) size = MAX(size, (size_t) (xx * (((float) BATcount(b)) / DEFAULT_SIZE))); BBPreclaim(histo); } BBPreclaim(sample); } if (histo == NULL) return GDK_FAIL; } /* create a temporary BAT of the estimated size holding pointers to the a tail atoms */ buf = cur = (struct refine *) GDKmalloc(size * sizeof(struct refine)); end = buf + size; if (buf == NULL) return GDK_FAIL; if (a_void) { base = this = (BUN) GDKmalloc(size * sizeof(oid)); if (base == NULL) { GDKfree(buf); return GDK_FAIL; } } /* create result BAT */ bn = BATnew(TYPE_oid, TYPE_oid, BATcount(b)); if (bn == NULL) { GDKfree(buf); if (a_void) GDKfree(base); return GDK_FAIL; } bn->hsorted = bn->tsorted = FALSE; dst = (oid *) BUNfirst(bn); if (a_void) { @:refine_loop(@:refine_void_1@,@:refine_void_2@,GDKfree(base);,this = base;)@ } else { @:refine_loop(r += a->tloc;)@ }@= refine_void_2 off = (size_t) (this - base); base = (BUN) GDKrealloc(base, size * sizeof(oid)); this = base + off;@= refine_void_1 /* TYPE_void: BUNtail(b,p) = (BUN)BUNtpos(b,p) */ *(oid*)this = *(oid*)(BUN)BUNtpos(a, r); r = this;@= refine_loop /* merge-scan tail of b, finding chunks with equal values; then sort each chunk on a */ BATloopFast(b, p, q, xx) { if ((*cmp) (BUNtail(b, last), BUNtail(b, p))) { dst = sort_flush(buf, (size_t) (cur - buf), tpe, base ? base : a->theap->base, dst, &id, rv); last = p; cur = buf; @4 } o = *(oid *) BUNhead(b, p); BUNfndOID(r, a, op); if (r == NULL) { GDKerror("CTrefine%s: value "SZFMT"@0 not found in head of second BAT;\n" "CTrefine%s: heads of both BATs do not form a 1-1 match.\n", rev, o, rev); BBPreclaim(bn); GDKfree(buf); @3 return GDK_FAIL; } if (cur >= end) { size_t off = (size_t) (cur - buf); buf = (struct refine *) GDKrealloc(buf, (size *= 2) * sizeof(struct refine)); end = buf + size; cur = buf + off; @2 } @1 cur->off = base ? (var_t) (r - base) : *(var_t *) r; cur->o = o; cur++; this += sizeof(oid); }@c dst = sort_flush(buf, (size_t) (cur - buf), tpe, base ? base : a->theap->base, dst, &id, rv); GDKfree(buf); if (a_void) GDKfree(base); bn->batBuns->free = ((BUN) dst) - bn->batBuns->base; BATsetcount(bn, bn->batBuns->free/BUNsize(bn)); bn->tsorted = GDK_SORTED; } *res = bn; return GDK_SUCCEED;}intCTrefine(BAT **res, BAT *b, BAT *a){ return refine(res, b, a, FALSE);}intCTrefine_rev(BAT **res, BAT *b, BAT *a){ return refine(res, b, a, TRUE);}@- WrappingWrapping the version 4 xtables code base@c#include "mal.h"#include "mal_exception.h"#include "mal_atom.h"#ifdef WIN32#ifndef LIBGROUP#define group_export extern __declspec(dllimport)#else#define group_export extern __declspec(dllexport)#endif#else#define group_export extern#endif@-Recall that we only support void- and oid- typed heads.This is captured as a runtime check in the implementation.@= chkHeader if(@1->htype != TYPE_oid && @1->htype != TYPE_void) throw(MAL, "group.@2", "(v)oid head required\n");@cgroup_export str GRPprelude(void);strGRPprelude(void){ /* printf("#init group\n"); */ TYPE_mapentry = malAtomFixed(sizeof(mapentry_t), sizeof(oid), "mapentry"); TYPE_idxentry = malAtomFixed(sizeof(idxentry_t), sizeof(oid), "idxentry"); return MAL_SUCCEED;}group_export str GRPgroup0(int *ret, int *bid, int *start, int *incr, int *grpsize);strGRPgroup0(int *ret, int *bid, int *start, int *incr, int *grpsize){ BAT *result, *b; if ((b = BATdescriptor(*bid)) == NULL) { throw(MAL, "group.group", "Cannot access descriptor"); } result = BATgroup(b, *start, *incr, *grpsize); if (result == 0) throw(MAL, "GRPgroup0", "Failed to group"); *ret = result->batCacheid; BBPkeepref(*ret); BBPreleaseref(b->batCacheid); return MAL_SUCCEED;}group_export str GRPgroup_custom(int *rethisto, int *retbid, int *bid, int *N, int *rng);group_export str GRPgroup(int *rethisto, int *retbid, int *bid);strGRPgroup(int *rethisto, int *retbid, int *bid){ BAT *histo = 0, *b, *bn = 0; if ((b = BATdescriptor(*bid)) == NULL) { throw(MAL, "group.group", "Cannot access descriptor"); } if (BATcount(b) > 1024*1024 && (b->ttype == TYPE_int || b->ttype == TYPE_lng)) { int N = BATcount(b), one = 1; assert(N); if (b->ttype == TYPE_int) { int h, l; BATmax(b,&h); BATmin(b,&l); if (h != int_nil && l != int_nil && h-l < N && h-l > 0) N = h-l; } else if (b->ttype == TYPE_lng) { lng h, l; BATmax(b,&h); BATmin(b,&l); if (h != lng_nil && l != lng_nil && (int)(h-l) < N && (int)(h-l) > 0) N = (int)(h-l); } N = bits(N); assert(N); BBPunfix(b->batCacheid); return GRPgroup_custom(rethisto, retbid, bid, &N, &one); } @:chkHeader(b,GRPgroup)@ CTgroup(&bn, &histo, b); *rethisto = histo ? histo->batCacheid : 0; *retbid = bn->batCacheid; BBPreleaseref(b->batCacheid); return MAL_SUCCEED;}strGRPgroup_custom(int *rethisto, int *retbid, int *bid, int *N, int *rng){ BAT *histo = 0, *b, *bn = 0; if ((b = BATdescriptor(*bid)) == NULL) { throw(MAL, "group.group", "Cannot access descriptor"); } @:chkHeader(b,GRPgroup)@ CTgroup_custom(&bn, &histo, b, N, rng); *rethisto = histo ? histo->batCacheid : 0; *retbid = bn->batCacheid; BBPreleaseref(b->batCacheid); return MAL_SUCCEED;}group_export str GRPderive(int *hid, int *mid, int *ct_histoid, int *ct_mapid, int *bid);strGRPderive(int *hid, int *mid, int *ct_histoid, int *ct_mapid, int *bid){ BAT *ct_map, *ct_histo, *b; BAT *bn = NULL, *histo = NULL; if ((ct_map = BATdescriptor(*ct_mapid)) == NULL) { throw(MAL, "group.derive", "Cannot access descriptor"); } @:chkHeader(ct_map,GRPderive)@ if ((ct_histo = BATdescriptor(*ct_histoid)) == NULL) { BBPreleaseref(ct_map->batCacheid); throw(MAL, "group.derive", "Cannot access descriptor"); } @:chkHeader(ct_histo,GRPderive)@ if ((b = BATdescriptor(*bid)) == NULL) { BBPreleaseref(ct_map->batCacheid); BBPreleaseref(ct_histo->batCacheid); throw(MAL, "group.derive", "Cannot access descriptor"); } @:chkHeader(b,GRPderive)@ if( derive(&histo, &bn, ct_histo, ct_map, b, tailtype(b, TRUE), NULL) == GDK_FAIL){ BBPreleaseref(b->batCacheid); BBPreleaseref(ct_map->batCacheid); BBPreleaseref(ct_histo->batCacheid); throw(MAL, "group.derive","Could not derive group"); } *mid = bn->batCacheid; *hid = histo->batCacheid; BBPreleaseref(b->batCacheid); BBPreleaseref(ct_map->batCacheid); BBPreleaseref(ct_histo->batCacheid); return MAL_SUCCEED;}group_export str GRPsubhisto(int *retid, int *selid, int *grpid, int *domid);strGRPsubhisto(int *retid, int *selid, int *grpid, int *domid){ BAT *sel, *grp, *dom; BAT *bn; if ((sel = BATdescriptor(*selid)) == NULL) { throw(MAL, "group.subhisto", "Cannot access descriptor"); } if ((grp = BATdescriptor(*grpid)) == NULL) { BBPreleaseref(sel->batCacheid); throw(MAL, "group.subhisto", "Cannot access descriptor"); } @:chkHeader(grp,GRPsubhisto)@ if ((dom = BATdescriptor(*domid)) == NULL) { throw(MAL, "group.subhisto", "Cannot access descriptor"); } if ((dom = BATdescriptor(*domid)) == NULL) { BBPreleaseref(sel->batCacheid); BBPreleaseref(grp->batCacheid); throw(MAL, "group.subhisto", "Cannot access descriptor"); } CTsubhisto(&bn, sel, grp, dom); *retid = bn->batCacheid; BBPkeepref(*retid); BBPreleaseref(sel->batCacheid); BBPreleaseref(grp->batCacheid); BBPreleaseref(dom->batCacheid); return MAL_SUCCEED;}group_export str GRPrefine(int *retid, int *bid, int *aid);strGRPrefine(int *retid, int *bid, int *aid){ BAT *b, *a; BAT *bn; if ((b = BATdescriptor(*bid)) == NULL) { throw(MAL, "group.refine", "Cannot access descriptor"); } @:chkHeader(b,GRPrefine)@ if ((a = BATdescriptor(*aid)) == NULL) { BBPreleaseref(b->batCacheid); throw(MAL, "group.refine", "Cannot access descriptor"); } @:chkHeader(a,GRPrefine)@ CTrefine(&bn, b, a); *retid = bn->batCacheid; BBPkeepref(*retid); BBPreleaseref(b->batCacheid); BBPreleaseref(a->batCacheid); return MAL_SUCCEED;}group_export str GRPrefine_rev(int *retid, int *bid, int *aid);strGRPrefine_rev(int *retid, int *bid, int *aid){ BAT *b, *a; BAT *bn; if ((b = BATdescriptor(*bid)) == NULL) { throw(MAL, "group.refine", "Cannot access descriptor"); } @:chkHeader(b,GRPrefine_rev)@ if ((a = BATdescriptor(*aid)) == NULL) { BBPreleaseref(b->batCacheid); throw(MAL, "group.refine", "Cannot access descriptor"); } @:chkHeader(a,GRPrefine_rev)@ CTrefine_rev(&bn, b, a); *retid = bn->batCacheid; BBPkeepref(*retid); BBPreleaseref(b->batCacheid); BBPreleaseref(a->batCacheid); return MAL_SUCCEED;}@-These implementations need just one scan and a simple hash-maintained datastructure to compute a group of common aggregates.With group OIDs spanning a range of less SMALL_AGGR_MAX (the actual numberof groups might be even less, in case there are "holes" in the group OIDrange), we use a simple array as temporary sum/cnt table on order to benefitfrom positional lookups; with size of sum <= 8 bytes and size of cnt == 4bytes, we stay below 16 KBytes, i.e., within (almost) any L1 cache@c#define SMALL_AGGR_MAX 1024@-@= large_aggr_sum (void) BATprepareHash(bn); BATloopFast(b, p, q, xx) { oid *h = (oid*) BUNhead(b,p); @1 *t = (@1*) BUN@2(b,p); BUNfndOID(r, bn, h); if (r) { @1 *dst = (@1*) BUN@2(bn, r); if (*dst != @1_nil) { if (*t == @1_nil) { *dst = @1_nil; } else { *dst += *t; } } } }@-@= small_aggr_sum sums = (@1*) GDKmalloc(range*sizeof(@1)); for (i = 0; i < range; i++) sums[i] = zero; BATloopFast(b, p, q, xx) { int h = (int)(*(oid*) BUNhead(b,p)) - min; @1 *t = (@1*) BUN@2(b,p); if (h >= 0 && h < range) { @1 *dst = sums + h; if (*dst != @1_nil) { if (*t == @1_nil) { *dst = @1_nil; } else { *dst += *t; } } } } BATloopFast(bn, p, q, xx) { int h = (int)(*(oid*) BUNhead(bn,p)) - min; *(@1*)BUN@2(bn, p) = sums[h]; } GDKfree(sums);@-@= large_aggr_avg cnt = (int*) GDKmalloc(BATcount(e)*sizeof(int)); memset(cnt, 0, BATcount(e)*sizeof(int)); (void) BATprepareHash(bn); BATloopFast(b, p, q, xx) { oid *h = (oid*) BUNhead(b,p); @1 *t = (@1*) BUN@2(b,p); BUNfndOID(r,bn,h); if (r) { @1 *dst = (@1*) BUN@2(bn, r); if (*dst != @1_nil) { if (*t == @1_nil) { *dst = @1_nil; } else { *dst += *t; } cnt[BUNindex(bn,r)-off]++; } } } /* postprocess by dividing sums by counts */ BATloopFast(bn, p, q, xx) { @1 *dst = (@1*) BUN@2(bn, p); if (cnt[yy] == 0) { *dst = @1_nil; } else if (*dst != @1_nil) { *dst /= cnt[yy]; } yy++; } GDKfree(cnt);@-@= small_aggr_avg sums = (@1*) GDKmalloc(range*sizeof(@1)); cnt = (int*) GDKmalloc(range*sizeof(int)); for (i = 0; i < range; i++) sums[i] = zero; memset(cnt, 0, range*sizeof(int)); BATloopFast(b, p, q, xx) { int h = (int)(*(oid*) BUNhead(b,p)) - min; @1 *t = (@1*) BUN@2(b,p); if (h >= 0 && h < range) { @1 *dst = sums + h; if (*dst != @1_nil) { if (*t == @1_nil) { *dst = @1_nil; } else { *dst += *t;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -