⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 file.cpp

📁 FastDb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 3 页
字号:
#ifndef NO_MMAP#include <sys/mman.h>#endif#ifndef O_SYNC#define O_SYNC  O_FSYNC#endif#ifndef O_DSYNC#define O_DSYNC O_SYNC#endif#ifndef O_DIRECT#define O_DIRECT 0#endifint dbFile::open(char const* name, char const*, bool readonly, size_t initSize, bool replicationSupport){    this->readonly = readonly;#if defined(USE_SYSV_SHARED_MEMORY) && defined(DISKLESS_CONFIGURATION)    if (!shmem.open(name, initSize)) {         return errno;    }    mmapSize = initSize;               mmapAddr = shmem.get_base();    fd = -1;#else#ifndef NO_MMAP    int mmap_attr = MAP_SHARED;#endif    int status;#ifdef DISKLESS_CONFIGURATION#ifndef MAP_ANONYMOUS     fd = ::open("/dev/zero", O_RDWR, 0);#else    fd = -1;     mmap_attr |= MAP_ANONYMOUS;#endif // MAP_ANONYMOUS     mmapSize = initSize;#else // DISKLESS_CONFIGURATION    fd = ::open(name, readonly ? O_RDONLY : O_RDWR/*|O_DSYNC*/|O_CREAT#ifdef NO_MMAP                |O_DIRECT#endif                , 0666);    if (fd < 0) {         return errno;    }#if defined(__sun)    directio(fd, DIRECTIO_ON);#endif    mmapSize = lseek(fd, 0, SEEK_END);     if (!readonly && mmapSize == 0) {         mmapSize = initSize;        if (ftruncate(fd, mmapSize) != ok) {            status = errno;            if (fd >= 0) {                 ::close(fd);            }            return status;        }    }#endif // DISKLESS_CONFIGURATION#ifdef NO_MMAP    size_t fileSize = mmapSize;    if (!readonly && mmapSize < initSize) {         mmapSize = initSize;    }#ifdef USE_SYSV_SHARED_MEMORY    if (!shmem.open(name, initSize)) {         status = errno;        if (fd >= 0) {             ::close(fd);        }        return status;    }    mmapSize = initSize;               mmapAddr = shmem.get_base();#else    mmapAddr = (char*)valloc(mmapSize);    if (mmapAddr == NULL) {         status = errno;        if (fd >= 0) {             ::close(fd);        }        return status;    }#endif    lseek(fd, 0, SEEK_SET);     if ((size_t)::read(fd, mmapAddr, fileSize) != fileSize) { #ifdef USE_SYSV_SHARED_MEMORY        shmem.close();#else        free(mmapAddr);#endif        mmapAddr = NULL;        status = errno;        if (fd >= 0) {             ::close(fd);        }        return status;    }#else  // NO_MMAP    mmapAddr = (char*)mmap(NULL, mmapSize,                            readonly ? PROT_READ : PROT_READ|PROT_WRITE,                            mmap_attr, fd, 0);    if (mmapAddr == (char*)-1) {         status = errno;        mmapAddr = NULL;        if (fd >= 0) {             ::close(fd);        }        return status;    }#endif // NO_MMAP#endif // USE_SYSV_SHARED_MEMORY && DISKLESS_CONIFIGURATION#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)    pageSize = getpagesize();    pageMapSize = (mmapSize + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);    pageMap = new int[pageMapSize];    memset(pageMap, 0, pageMapSize*sizeof(int));#endif#if defined(REPLICATION_SUPPORT)    db = NULL;    int nPages = getMaxPages();            currUpdateCount = new int[nPages];    if (replicationSupport) {         char* cFileName = new char[strlen(name) + 5];        strcat(strcpy(cFileName, name), ".cnt");#ifndef DISKLESS_CONFIGURATION        cfd = ::open(cFileName, O_RDWR|O_DSYNC|O_CREAT, 0666);        delete[] cFileName;        if (cfd < 0) {             return errno;        }        if (ftruncate(cfd, nPages*sizeof(int)) != ok) {            status = errno;            ::close(cfd);            return status;              }#else        int mmap_attr = MAP_SHARED;#ifndef MAP_ANONYMOUS         cfd = ::open("/dev/zero", O_RDONLY, 0);#else        cfd = -1;         mmap_attr |= MAP_ANONYMOUS;#endif    #endif        diskUpdateCount = (int*)mmap(NULL, nPages*sizeof(int),                                      PROT_READ|PROT_WRITE, mmap_attr, cfd, 0);        if (diskUpdateCount == (int*)-1) {             int status = errno;            diskUpdateCount = NULL;            if (cfd >= 0) {                 ::close(cfd);            }            return status;        }        int maxCount = 0;        rootPage = dbMalloc(pageSize);        for (int i = 0; i < nPages; i++) {                  int count = diskUpdateCount[i];            currUpdateCount[i] = count;            if (count > maxCount) {                 maxCount = count;            }        }        updateCounter = maxCount;        nRecovered = 0;        recoveredEvent.open(true);        syncEvent.open();        startSync();    }#endif#ifdef FUZZY_CHECKPOINT    writer = new dbFileWriter(this);#endif    return ok;}bool dbFile::write(size_t pos, void const* ptr, size_t size) {    if ((size_t)lseek(fd, pos, SEEK_SET) != pos        || (size_t)::write(fd, mmapAddr + pos, size) != size)     {         dbTrace("Failed to save page to the disk, position=%ld, size=%ld, error=%d\n",                (long)pos, (long)size, errno);        return false;    }    return true;}#if defined(REPLICATION_SUPPORT)void dbFile::syncToDisk(){    syncThread.setPriority(dbThread::THR_PRI_LOW);    dbCriticalSection cs(syncCS);    while (doSync) {         int i, j, k;         int maxUpdated = 0;        for (i = 0; i < int(mmapSize >> dbModMapBlockBits);) {             int updateCounters[dbMaxSyncSegmentSize];            for (j=i; j < (int)(mmapSize >> dbModMapBlockBits) && j-i < dbMaxSyncSegmentSize                      && currUpdateCount[j] > diskUpdateCount[j]; j++)            {                updateCounters[j-i] = currUpdateCount[j];            }            if (i != j) {                size_t pos = (i << dbModMapBlockBits) & ~(pageSize-1);                size_t size = (((j-i) << dbModMapBlockBits) + pageSize - 1) & ~(pageSize-1);#ifdef NO_MMAP                write(pos, mmapAddr + pos, size);#else                 msync(mmapAddr + pos, size, MS_SYNC);#endif                for (k = 0; i < j; k++, i++) {                      diskUpdateCount[i] = updateCounters[k];                }                maxUpdated = i;            } else {                 i += 1;            }            if (!doSync) {                 return;            }        }        if (maxUpdated != 0) {             msync((char*)diskUpdateCount, maxUpdated*sizeof(int), MS_SYNC);        }        if (closing && maxUpdated == 0) {             return;        } else {             syncEvent.wait(syncCS, dbSyncTimeout);        }    }}#endifint dbFile::create(const char* name, bool){    mmapAddr = NULL;    fd = ::open(name, O_RDWR|O_TRUNC|O_CREAT, 0666);    if (fd < 0) {         return errno;    }    return ok;}int dbFile::read(void* buf, size_t& readBytes, size_t size){      long rc = ::read(fd, buf, size);    if (rc < 0) {         readBytes = 0;        return errno;    }    readBytes = rc;    return ok;}int dbFile::write(void const* buf, size_t& writtenBytes, size_t size){      long rc = ::write(fd, buf, size);    if (rc < 0) {         writtenBytes = 0;        return errno;    }    writtenBytes = rc;    return ok;}int dbFile::setSize(size_t size, char const*, bool){#ifdef REPLICATION_SUPPORT    dbCriticalSection cs1(syncCS);    dbCriticalSection cs2(replCS);#endif#if defined(DISKLESS_CONFIGURATION) || defined(USE_SYSV_SHARED_MEMORY)    assert(false);#else#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)    int newPageMapSize = (size + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);    int* newPageMap = new int[newPageMapSize];    memcpy(newPageMap, pageMap, pageMapSize*sizeof(int));    memset(newPageMap + pageMapSize, 0,            (newPageMapSize-pageMapSize)*sizeof(int));    delete[] pageMap;    pageMapSize = newPageMapSize;        pageMap = newPageMap;    #endif#ifdef NO_MMAP    char* newBuf = (char*)valloc(size);    if (newBuf == NULL) {         return errno;    }    memcpy(newBuf, mmapAddr, mmapSize);    free(mmapAddr);    mmapAddr = newBuf;    mmapSize = size;    if (ftruncate(fd, size) != ok) {         return errno;    }#else    if (munmap(mmapAddr, mmapSize) != ok ||        (!readonly && ftruncate(fd, size) != ok) ||        (mmapAddr = (char*)mmap(NULL, size, readonly ? PROT_READ : PROT_READ|PROT_WRITE,                                MAP_SHARED, fd, 0)) == (char*)-1)    {           return errno;    }#endif    mmapSize = size;#endif    return ok;}int dbFile::flush(bool physical) {#if defined(REPLICATION_SUPPORT)    dbCriticalSection cs(replCS);    if (db == NULL) {         physical = true;    }       if (!physical) {        updateCounter += 1;     }#endif#if defined(REPLICATION_SUPPORT) || (defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION))    int* map = pageMap;    for (int i = 0, n = pageMapSize; i < n; i++) {         if (map[i] != 0) {             size_t pos = (size_t)i << (dbModMapBlockBits + 5);            unsigned mask = map[i];            int count = 0;            do {                 int size = 0;                while ((mask & 1) == 0) {                     pos += dbModMapBlockSize;                    mask >>= 1;                    count += 1;                }                while (true) {                      do { #ifdef REPLICATION_SUPPORT                        if (!physical) {                            currUpdateCount[(pos + size) >> dbModMapBlockBits] = updateCounter;                        }#endif                        size += dbModMapBlockSize;                        mask >>= 1;                        count += 1;                    } while ((mask & 1) != 0);                    if (i+1 < n && count == 32 && size < dbMaxSyncSegmentSize*dbModMapBlockSize                         && (map[i+1] & 1) != 0)                     {                         map[i] = 0;                        mask = map[++i];                        count = 0;                    } else {                         break;                    }                }#if defined(REPLICATION_SUPPORT)                if (db != NULL) {                     if (!physical) {                         for (int j = db->nServers; --j >= 0;) {                             if (db->con[j].status == dbReplicatedDatabase::ST_STANDBY) {                                 ReplicationRequest rr;                                rr.op = ReplicationRequest::RR_UPDATE_PAGE;                                rr.nodeId = db->id;                                rr.page.updateCount = updateCounter;                                rr.page.offs = pos;                                rr.size = size;                                db->writeReq(j, rr, mmapAddr + pos, size);                            }                        }                    }                    pos += size;                    continue;                }#else#ifdef FUZZY_CHECKPOINT                writer->put(pos, mmapAddr + pos, size);#else                if (!write(pos, mmapAddr + pos, size)) {                     return errno;                }#endif#endif                pos += size;            } while (mask != 0);            map[i] = 0;        }    }#endif#if !defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION) && !defined(REPLICATION_SUPPORT) && !defined(NO_FLUSH_ON_COMMIT)    if (msync(mmapAddr, mmapSize, MS_SYNC) != ok) {         return errno;    }#endif    return ok;}int dbFile::erase(){#ifdef USE_SYSV_SHARED_MEMORY    shmem.erase();#endif    return ok;}int dbFile::close(){#if defined(REPLICATION_SUPPORT)    if (db != NULL) {         closing = true;        stopSync();        {             dbCriticalSection cs(replCS);            if (nRecovered != 0) {                 recoveredEvent.wait(replCS);            }        }        syncEvent.close();        recoveredEvent.close();        munmap((char*)diskUpdateCount, getMaxPages()*sizeof(int));        if (cfd >= 0) {             ::close(cfd);        }    }    delete[] currUpdateCount;    currUpdateCount = NULL;    dbFree(rootPage);    rootPage = NULL;#endif // REPLICATION_SUPPORT    if (mmapAddr != NULL) { #ifdef NO_MMAP        int rc = flush();        if (rc != ok) {             return rc;        }#ifdef FUZZY_CHECKPOINT        delete writer;#endif#endif#ifdef USE_SYSV_SHARED_MEMORY        shmem.close();#elif defined(NO_MMAP)        free(mmapAddr);    #else        if (munmap(mmapAddr, mmapSize) != ok) {             return errno;        }#endif        mmapAddr = NULL;#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)        delete[] pageMap;#endif    }    return fd >= 0 && ::close(fd) != ok ? errno : ok;}char* dbFile::errorText(int code, char* buf, size_t bufSize){    strncpy(buf, strerror(code), bufSize-1);    buf[bufSize-1] = '\0';    return buf;}#endifEND_FASTDB_NAMESPACE

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -