📄 gdk_posix.mx
字号:
* Examples are sorting or clustering on huge datasets. * (c) if RSS pressure is due to large read-maps, rather than * intermediate results. * * Two crude suggestions: * - If we are under RSS pressure without unloadable tiles and with * savable tiles, we should consider suspending *all* other threads * until we manage to unload a tile. * - if there are no savable tiles (or in case of read-only maps) * we could resort to saving and unloading random tiles. * * To do better, our BAT algorithms should provide even more detailed * advise on their access patterns, which may even consist of pointers * to the cursors (i.e. pointers to b->batBuns->free or the cursors * in radix-cluster), which an enhanced version of this thread might * take into account. */#ifdef HAVE_PTHREAD_H/* pthread.h on Windows includes config.h if HAVE_CONFIG_H is set */#undef HAVE_CONFIG_H#include <sched.h>#include <pthread.h>#endif#ifdef HAVE_SEMAPHORE_H#include <semaphore.h>#endiftypedef struct { char path[128]; /* mapped file, retained for debugging */ char *base; /* base address */ size_t len; /* length of map */ size_t first_tile; /* from here we started saving tiles */ size_t save_tile; /* next tile to save */ size_t unload_tile; /* next tile to unload */ int last_tile; int fd; /* open fd (==-1 for anon vm), retained to give posix_fadvise */ int pincnt; /* incremented while a MIL command uses heap with a random pattern */ int writable; int next;} MT_mmap_t;#ifdef HAVE_POSIX_FADVISEstatic int do_not_use_posix_fadvise = 0;#endif#define MT_MMAP_TILE (1<<27)#define MT_MMAP_BUFSIZE 100MT_mmap_t MT_mmap_tab[MT_MMAP_BUFSIZE];int MT_mmap_cur = -1, MT_mmap_first = -1, MT_mmap_free = 0;pthread_mutex_t MT_mmap_lock;static voidMT_mmap_empty(int i){ MT_mmap_tab[i].path[0] = 0; MT_mmap_tab[i].base = NULL; MT_mmap_tab[i].len = 0; MT_mmap_tab[i].writable = 0; MT_mmap_tab[i].fd = -1; MT_mmap_tab[i].pincnt = -1;}static voidMT_mmap_init(void){ int i; /* create lock */ pthread_mutex_init(&MT_mmap_lock, 0); for (i = 0; i < MT_MMAP_BUFSIZE; i++) { MT_mmap_tab[i].next = i + 1; MT_mmap_empty(i); } MT_mmap_tab[i-1].next = -1;}/* returns previous element (to facilitate deletion) */static intMT_mmap_find(void *base){ /* maybe consider a hash table iso linked list?? */ int i, prev = MT_MMAP_BUFSIZE; for (i = MT_mmap_first; i >= 0; i = MT_mmap_tab[i].next) { if (MT_mmap_tab[i].base == base) { return prev; } prev = i; } return i;}static intMT_mmap_idx(void *base, size_t len){ if (len > MT_MMAP_TILE) { int i = MT_mmap_find(base); if (i >= 0) { if (i == MT_MMAP_BUFSIZE) { return MT_mmap_first; } else { return MT_mmap_tab[i].next; } } } return -1;}static intMT_mmap_new(char *path, void *base, size_t len, int fd, int writable){ (void) pthread_mutex_lock(&MT_mmap_lock); if (len > MT_MMAP_TILE && MT_mmap_free >= 0) { int i = MT_mmap_free; MT_mmap_free = MT_mmap_tab[i].next; MT_mmap_tab[i].next = MT_mmap_first; MT_mmap_first = i; if (MT_mmap_cur == -1) MT_mmap_cur = i;#ifdef MMAP_DEBUG stream_printf(GDKerr, "MT_mmap_new: %s fd=%d\n", path, fd);#endif strncpy(MT_mmap_tab[i].path, path, 128); MT_mmap_tab[i].base = base; MT_mmap_tab[i].len = len; MT_mmap_tab[i].save_tile = 1; MT_mmap_tab[i].last_tile = 0; MT_mmap_tab[i].first_tile = 0; MT_mmap_tab[i].unload_tile = 0; MT_mmap_tab[i].writable = writable; MT_mmap_tab[i].fd = fd; MT_mmap_tab[i].pincnt = 0; fd = -1; } (void) pthread_mutex_unlock(&MT_mmap_lock); return fd;}static voidMT_mmap_del(void *base, size_t len){ if (len > MT_MMAP_TILE) { int victim = 0, prev; (void) pthread_mutex_lock(&MT_mmap_lock); prev = MT_mmap_find(base); if (prev >= 0) { int ret; if (prev == MT_MMAP_BUFSIZE) { victim = MT_mmap_first; MT_mmap_first = MT_mmap_tab[MT_mmap_first].next; } else { victim = MT_mmap_tab[prev].next; MT_mmap_tab[prev].next = MT_mmap_tab[victim].next; } if (MT_mmap_cur == victim) { MT_mmap_cur = MT_mmap_first; }#ifdef HAVE_POSIX_FADVISE if (!do_not_use_posix_fadvise && MT_mmap_tab[victim].fd >= 0) { /* tell the OS quite clearly that you want to drop this */ ret = posix_fadvise(MT_mmap_tab[victim].fd, 0LL, MT_mmap_tab[victim].len, POSIX_FADV_DONTNEED);#ifdef MMAP_DEBUG stream_printf(GDKerr, "MT_mmap_del: posix_fadvise(%s,fd=%d,%uMB,POSIX_FADV_DONTNEED) = %d\n", MT_mmap_tab[victim].path, MT_mmap_tab[victim].fd, (unsigned int) (MT_mmap_tab[victim].len >> 20), ret);#endif }#endif ret = close(MT_mmap_tab[victim].fd);#ifdef MMAP_DEBUG stream_printf(GDKerr, "MT_mmap_del: close(%s fd=%d) = %d\n", MT_mmap_tab[victim].path, MT_mmap_tab[victim].fd, ret);#endif MT_mmap_tab[victim].next = MT_mmap_free; MT_mmap_empty(victim); MT_mmap_free = victim; (void) ret; } (void) pthread_mutex_unlock(&MT_mmap_lock); }}static intMT_fadvise(void *base, size_t len, int advice){ int ret = 0;#ifdef HAVE_POSIX_FADVISE if (!do_not_use_posix_fadvise) { int i; (void) pthread_mutex_lock(&MT_mmap_lock); i = MT_mmap_idx(base, len); if (i >= 0) { if (MT_mmap_tab[i].fd >= 0) { ret = posix_fadvise(MT_mmap_tab[i].fd, 0, len, advice);#ifdef MMAP_DEBUG stream_printf(GDKerr, "MT_fadvise: posix_fadvise(%s,fd=%d,%uMB,%d) = %d\n", MT_mmap_tab[i].path, MT_mmap_tab[i].fd, (unsigned int) (len >> 20), advice, ret);#endif } } (void) pthread_mutex_unlock(&MT_mmap_lock); }#else (void) base; (void) len; (void) advice;#endif return ret;}static voidMT_mmap_unload_tile(int i, size_t off, stream* err) { /* tell Linux to please stop caching this stuff */ int ret = posix_madvise(MT_mmap_tab[i].base+off, MT_MMAP_TILE, POSIX_MADV_DONTNEED); if (err) { stream_printf(err, "MT_mmap_unload_tile: posix_madvise(%s,off=%uMB,%uMB,fd=%d,POSIX_MADV_DONTNEED) = %d\n", MT_mmap_tab[i].path, (unsigned int) (off>>20), (unsigned int) (MT_MMAP_TILE>>20), MT_mmap_tab[i].fd, ret); }#ifdef HAVE_POSIX_FADVISE if (!do_not_use_posix_fadvise) { /* tell the OS quite clearly that you want to drop this */ ret = posix_fadvise(MT_mmap_tab[i].fd, off, MT_MMAP_TILE, POSIX_FADV_DONTNEED); if (err) { stream_printf(err, "MT_mmap_unload_tile: posix_fadvise(%s,off=%uMB,%uMB,fd=%d,POSIX_MADV_DONTNEED) = %d\n", MT_mmap_tab[i].path, (unsigned int) (off>>20), (unsigned int) (MT_MMAP_TILE>>20), MT_mmap_tab[i].fd, ret); } }#endif}static intMT_mmap_save_tile(int i, size_t tile, stream* err) { int t, ret; /* save to disk an 128MB tile, and observe how long this takes */ if (err) { stream_printf(err, "MT_mmap_save_tile: msync(%s,off=%uM,%u,SYNC)...\n", MT_mmap_tab[i].path, (unsigned int) (tile>>20), (unsigned int) (MT_MMAP_TILE>>20)); } t = GDKms(); ret = MT_msync(MT_mmap_tab[i].base+tile, MT_MMAP_TILE, MMAP_SYNC); t = GDKms() - t; if (err) { stream_printf(err, "MT_mmap_save_tile: msync(%s,tile=%uM,%uM,SYNC) = %d (%dms)\n", MT_mmap_tab[i].path, (unsigned int) (tile>>20), (unsigned int) (MT_MMAP_TILE>>20), ret, t); } if (t > 200) { /* this took time; so we should report back on our actions and await new orders */ if (MT_mmap_tab[i].save_tile == 1) { MT_mmap_tab[i].first_tile = tile; /* leave first tile for later sequential use pass (start unloading after it) */ MT_mmap_tab[i].unload_tile = tile + MT_MMAP_TILE; } MT_mmap_tab[i].save_tile = tile + MT_MMAP_TILE; (void) pthread_mutex_unlock(&MT_mmap_lock); return 1; } return 0;}/* round-robin next. this is to ensure some fairness if multiple large results are produced simultaneously */static intMT_mmap_next(int i) { if (i != -1) { i = MT_mmap_tab[i].next; if (i == -1) i = MT_mmap_first; } return i;}intMT_mmap_trim(size_t target, void *fp){ stream *err = (stream *) fp; size_t off, rss = MT_getrss(); int i, worry = (rss*4 > target*3); (void) pthread_mutex_lock(&MT_mmap_lock); if (err) { stream_printf(err, "MT_mmap_trim(%u MB): rss = %u MB\n", (unsigned int) ((target) >> 20), (unsigned int) (rss >> 20)); } if (rss > target) { /* try to selectively unload pages from the writable regions */ size_t delta = ((rss - target) + 4*MT_MMAP_TILE - 1) & ~(MT_MMAP_TILE-1); for(i = MT_mmap_next(MT_mmap_cur); delta && i != MT_mmap_cur; i = MT_mmap_next(i)) { if (MT_mmap_tab[i].fd >= 0 && MT_mmap_tab[i].writable) { if (MT_mmap_tab[i].unload_tile >= MT_mmap_tab[i].save_tile) MT_mmap_tab[i].unload_tile = MT_mmap_tab[i].first_tile; while(MT_mmap_tab[i].unload_tile < MT_mmap_tab[i].save_tile) { MT_mmap_unload_tile(i, MT_mmap_tab[i].unload_tile, err); MT_mmap_tab[i].unload_tile += MT_MMAP_TILE; if ((delta -= MT_MMAP_TILE) == 0) break; } } } } if (worry) { /* schedule background saves of tiles */ for(i = MT_mmap_next(MT_mmap_cur); i != MT_mmap_cur; i = MT_mmap_next(i)) { if (MT_mmap_tab[i].fd >= 0 && MT_mmap_tab[i].writable && (MT_mmap_tab[i].pincnt == 0 || MT_mmap_tab[i].len > target)) { if (MT_mmap_tab[i].save_tile == 1) { /* first run, walk backwards until we hit an unsaved tile */ for(off=MT_mmap_tab[i].len; off>=MT_MMAP_TILE; off-=MT_MMAP_TILE) if (MT_mmap_save_tile(i, off, err)) return 1; } else { /* save the next tile */ for(off=MT_mmap_tab[i].save_tile; off+MT_MMAP_TILE<MT_mmap_tab[i].len; off+=MT_MMAP_TILE) { if (MT_mmap_save_tile(i, off, err)) return 1; } /* we seem to have run through all savable tiles */ if (!MT_mmap_tab[i].last_tile) { MT_mmap_tab[i].last_tile = 1; MT_mmap_tab[i].unload_tile = MT_mmap_tab[i].first_tile; } } } } MT_mmap_cur = i; } (void) pthread_mutex_unlock(&MT_mmap_lock); return (worry);}voidMT_mmap_pin(void *base, size_t len){ int i; (void) pthread_mutex_lock(&MT_mmap_lock); i = MT_mmap_idx(base, len); if (i >= 0) { MT_mmap_tab[i].pincnt++; } (void) pthread_mutex_unlock(&MT_mmap_lock);}voidMT_mmap_unpin(void *base, size_t len){ int i; (void) pthread_mutex_lock(&MT_mmap_lock); i = MT_mmap_idx(base, len); if (i >= 0) { MT_mmap_tab[i].pincnt--; } (void) pthread_mutex_unlock(&MT_mmap_lock);}void *MT_mmap(char *path, int mode, off_t off, size_t len){ MT_mmap_hdl hdl; void *ret = MT_mmap_open(&hdl, path, mode, off, len, 0); MT_mmap_close(&hdl); return ret;}#ifndef NATIVE_WIN32#ifdef HAVE_POSIX_FADVISE#ifdef HAVE_UNAME#include <sys/utsname.h>#endif#endif#if !defined(WIN32) && defined(PROFILE) #ifdef HAVE_PTHREAD_H#undef pthread_create/* for profiling purposes (btw configure with --enable-profile *and* --disable-shared --enable-static) * without setting the ITIMER_PROF per thread, all profiling info for everything except the main thread is lost. */#include <stdlib.h>/* Our data structure passed to the wrapper */typedef struct wrapper_s{ void * (*start_routine)(void *); void * arg; pthread_mutex_t lock; pthread_cond_t wait; struct itimerval itimer;} wrapper_t;/* The wrapper function in charge for setting the itimer value */static void * wrapper_routine(void * data){ /* Put user data in thread-local variables */ void * (*start_routine)(void *) = ((wrapper_t*)data)->start_routine; void * arg = ((wrapper_t*)data)->arg; /* Set the profile timer value */ setitimer(ITIMER_PROF, &((wrapper_t*)data)->itimer, NULL); /* Tell the calling thread that we don't need its data anymore */ pthread_mutex_lock(&((wrapper_t*)data)->lock); pthread_cond_signal(&((wrapper_t*)data)->wait); pthread_mutex_unlock(&((wrapper_t*)data)->lock); /* Call the real function */ return start_routine(arg);}/* Our wrapper function for the real pthread_create() */int gprof_pthread_create(pthread_t *__restrict thread, __const pthread_attr_t *__restrict attr, void * (*start_routine)(void *), void *__restrict arg){ wrapper_t wrapper_data; int i_return; /* Initialize the wrapper structure */ wrapper_data.start_routine = start_routine; wrapper_data.arg = arg; getitimer(ITIMER_PROF, &wrapper_data.itimer); pthread_cond_init(&wrapper_data.wait, NULL); pthread_mutex_init(&wrapper_data.lock, NULL); pthread_mutex_lock(&wrapper_data.lock); /* The real pthread_create call */ i_return = pthread_create(thread, attr, &wrapper_routine, &wrapper_data); /* If the thread was successfully spawned, wait for the data to be released */ if(i_return == 0) { pthread_cond_wait(&wrapper_data.wait, &wrapper_data.lock); } pthread_mutex_unlock(&wrapper_data.lock); pthread_mutex_destroy(&wrapper_data.lock); pthread_cond_destroy(&wrapper_data.wait); return i_return;}#endif#endifvoidMT_init_posix(int alloc_map){#ifdef HAVE_POSIX_FADVISE#ifdef HAVE_UNAME struct utsname ubuf; /* do not use posix_fadvise on Linux systems running a 2.4 or older kernel */ do_not_use_posix_fadvise = uname(&ubuf) == 0 && strcmp(ubuf.sysname, "Linux") == 0 && strncmp(ubuf.release, "2.4", 3) <= 0;#endif#endif MT_heapbase = (char *) sbrk(0);#ifdef DEBUG_ALLOC static void MT_alloc_init(void); if (alloc_map) MT_alloc_init();#else (void)alloc_map;#endif MT_mmap_init();}size_tMT_getrss(void){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -