📄 file.cpp
字号:
#ifndef NO_MMAP#include <sys/mman.h>#endif#ifndef O_SYNC#define O_SYNC O_FSYNC#endif#ifndef O_DSYNC#define O_DSYNC O_SYNC#endif#ifndef O_DIRECT#define O_DIRECT 0#endifint dbFile::open(char const* name, char const*, bool readonly, size_t initSize, bool replicationSupport){ this->readonly = readonly;#if defined(USE_SYSV_SHARED_MEMORY) && defined(DISKLESS_CONFIGURATION) if (!shmem.open(name, initSize)) { return errno; } mmapSize = initSize; mmapAddr = shmem.get_base(); fd = -1;#else#ifndef NO_MMAP int mmap_attr = MAP_SHARED;#endif int status;#ifdef DISKLESS_CONFIGURATION#ifndef MAP_ANONYMOUS fd = ::open("/dev/zero", O_RDWR, 0);#else fd = -1; mmap_attr |= MAP_ANONYMOUS;#endif // MAP_ANONYMOUS mmapSize = initSize;#else // DISKLESS_CONFIGURATION fd = ::open(name, readonly ? O_RDONLY : O_RDWR/*|O_DSYNC*/|O_CREAT#ifdef NO_MMAP |O_DIRECT#endif , 0666); if (fd < 0) { return errno; }#if defined(__sun) directio(fd, DIRECTIO_ON);#endif mmapSize = lseek(fd, 0, SEEK_END); if (!readonly && mmapSize == 0) { mmapSize = initSize; if (ftruncate(fd, mmapSize) != ok) { status = errno; if (fd >= 0) { ::close(fd); } return status; } }#endif // DISKLESS_CONFIGURATION#ifdef NO_MMAP size_t fileSize = mmapSize; if (!readonly && mmapSize < initSize) { mmapSize = initSize; }#ifdef USE_SYSV_SHARED_MEMORY if (!shmem.open(name, initSize)) { status = errno; if (fd >= 0) { ::close(fd); } return status; } mmapSize = initSize; mmapAddr = shmem.get_base();#else mmapAddr = (char*)valloc(mmapSize); if (mmapAddr == NULL) { status = errno; if (fd >= 0) { ::close(fd); } return status; }#endif lseek(fd, 0, SEEK_SET); if ((size_t)::read(fd, mmapAddr, fileSize) != fileSize) { #ifdef USE_SYSV_SHARED_MEMORY shmem.close();#else free(mmapAddr);#endif mmapAddr = NULL; status = errno; if (fd >= 0) { ::close(fd); } return status; }#else // NO_MMAP mmapAddr = (char*)mmap(NULL, mmapSize, readonly ? PROT_READ : PROT_READ|PROT_WRITE, mmap_attr, fd, 0); if (mmapAddr == (char*)-1) { status = errno; mmapAddr = NULL; if (fd >= 0) { ::close(fd); } return status; }#endif // NO_MMAP#endif // USE_SYSV_SHARED_MEMORY && DISKLESS_CONIFIGURATION#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT) pageSize = getpagesize(); pageMapSize = (mmapSize + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5); pageMap = new int[pageMapSize]; memset(pageMap, 0, pageMapSize*sizeof(int));#endif#if defined(REPLICATION_SUPPORT) db = NULL; int nPages = getMaxPages(); currUpdateCount = new int[nPages]; if (replicationSupport) { char* cFileName = new char[strlen(name) + 5]; strcat(strcpy(cFileName, name), ".cnt");#ifndef DISKLESS_CONFIGURATION cfd = ::open(cFileName, O_RDWR|O_DSYNC|O_CREAT, 0666); delete[] cFileName; if (cfd < 0) { return errno; } if (ftruncate(cfd, nPages*sizeof(int)) != ok) { status = errno; ::close(cfd); return status; }#else int mmap_attr = MAP_SHARED;#ifndef MAP_ANONYMOUS cfd = ::open("/dev/zero", O_RDONLY, 0);#else cfd = -1; mmap_attr |= MAP_ANONYMOUS;#endif #endif diskUpdateCount = (int*)mmap(NULL, nPages*sizeof(int), PROT_READ|PROT_WRITE, mmap_attr, cfd, 0); if (diskUpdateCount == (int*)-1) { int status = errno; diskUpdateCount = NULL; if (cfd >= 0) { ::close(cfd); } return status; } int maxCount = 0; rootPage = dbMalloc(pageSize); for (int i = 0; i < nPages; i++) { int count = diskUpdateCount[i]; currUpdateCount[i] = count; if (count > maxCount) { maxCount = count; } } updateCounter = maxCount; nRecovered = 0; recoveredEvent.open(true); syncEvent.open(); startSync(); }#endif#ifdef FUZZY_CHECKPOINT writer = new dbFileWriter(this);#endif return ok;}bool dbFile::write(size_t pos, void const* ptr, size_t size) { if ((size_t)lseek(fd, pos, SEEK_SET) != pos || (size_t)::write(fd, mmapAddr + pos, size) != size) { dbTrace("Failed to save page to the disk, position=%ld, size=%ld, error=%d\n", (long)pos, (long)size, errno); return false; } return true;}#if defined(REPLICATION_SUPPORT)void dbFile::syncToDisk(){ syncThread.setPriority(dbThread::THR_PRI_LOW); dbCriticalSection cs(syncCS); while (doSync) { int i, j, k; int maxUpdated = 0; for (i = 0; i < int(mmapSize >> dbModMapBlockBits);) { int updateCounters[dbMaxSyncSegmentSize]; for (j=i; j < (int)(mmapSize >> dbModMapBlockBits) && j-i < dbMaxSyncSegmentSize && currUpdateCount[j] > diskUpdateCount[j]; j++) { updateCounters[j-i] = currUpdateCount[j]; } if (i != j) { size_t pos = (i << dbModMapBlockBits) & ~(pageSize-1); size_t size = (((j-i) << dbModMapBlockBits) + pageSize - 1) & ~(pageSize-1);#ifdef NO_MMAP write(pos, mmapAddr + pos, size);#else msync(mmapAddr + pos, size, MS_SYNC);#endif for (k = 0; i < j; k++, i++) { diskUpdateCount[i] = updateCounters[k]; } maxUpdated = i; } else { i += 1; } if (!doSync) { return; } } if (maxUpdated != 0) { msync((char*)diskUpdateCount, maxUpdated*sizeof(int), MS_SYNC); } if (closing && maxUpdated == 0) { return; } else { syncEvent.wait(syncCS, dbSyncTimeout); } }}#endifint dbFile::create(const char* name, bool){ mmapAddr = NULL; fd = ::open(name, O_RDWR|O_TRUNC|O_CREAT, 0666); if (fd < 0) { return errno; } return ok;}int dbFile::read(void* buf, size_t& readBytes, size_t size){ long rc = ::read(fd, buf, size); if (rc < 0) { readBytes = 0; return errno; } readBytes = rc; return ok;}int dbFile::write(void const* buf, size_t& writtenBytes, size_t size){ long rc = ::write(fd, buf, size); if (rc < 0) { writtenBytes = 0; return errno; } writtenBytes = rc; return ok;}int dbFile::setSize(size_t size, char const*, bool){#ifdef REPLICATION_SUPPORT dbCriticalSection cs1(syncCS); dbCriticalSection cs2(replCS);#endif#if defined(DISKLESS_CONFIGURATION) || defined(USE_SYSV_SHARED_MEMORY) assert(false);#else#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT) int newPageMapSize = (size + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5); int* newPageMap = new int[newPageMapSize]; memcpy(newPageMap, pageMap, pageMapSize*sizeof(int)); memset(newPageMap + pageMapSize, 0, (newPageMapSize-pageMapSize)*sizeof(int)); delete[] pageMap; pageMapSize = newPageMapSize; pageMap = newPageMap; #endif#ifdef NO_MMAP char* newBuf = (char*)valloc(size); if (newBuf == NULL) { return errno; } memcpy(newBuf, mmapAddr, mmapSize); free(mmapAddr); mmapAddr = newBuf; mmapSize = size; if (ftruncate(fd, size) != ok) { return errno; }#else if (munmap(mmapAddr, mmapSize) != ok || (!readonly && ftruncate(fd, size) != ok) || (mmapAddr = (char*)mmap(NULL, size, readonly ? PROT_READ : PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0)) == (char*)-1) { return errno; }#endif mmapSize = size;#endif return ok;}int dbFile::flush(bool physical) {#if defined(REPLICATION_SUPPORT) dbCriticalSection cs(replCS); if (db == NULL) { physical = true; } if (!physical) { updateCounter += 1; }#endif#if defined(REPLICATION_SUPPORT) || (defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION)) int* map = pageMap; for (int i = 0, n = pageMapSize; i < n; i++) { if (map[i] != 0) { size_t pos = (size_t)i << (dbModMapBlockBits + 5); unsigned mask = map[i]; int count = 0; do { int size = 0; while ((mask & 1) == 0) { pos += dbModMapBlockSize; mask >>= 1; count += 1; } while (true) { do { #ifdef REPLICATION_SUPPORT if (!physical) { currUpdateCount[(pos + size) >> dbModMapBlockBits] = updateCounter; }#endif size += dbModMapBlockSize; mask >>= 1; count += 1; } while ((mask & 1) != 0); if (i+1 < n && count == 32 && size < dbMaxSyncSegmentSize*dbModMapBlockSize && (map[i+1] & 1) != 0) { map[i] = 0; mask = map[++i]; count = 0; } else { break; } }#if defined(REPLICATION_SUPPORT) if (db != NULL) { if (!physical) { for (int j = db->nServers; --j >= 0;) { if (db->con[j].status == dbReplicatedDatabase::ST_STANDBY) { ReplicationRequest rr; rr.op = ReplicationRequest::RR_UPDATE_PAGE; rr.nodeId = db->id; rr.page.updateCount = updateCounter; rr.page.offs = pos; rr.size = size; db->writeReq(j, rr, mmapAddr + pos, size); } } } pos += size; continue; }#else#ifdef FUZZY_CHECKPOINT writer->put(pos, mmapAddr + pos, size);#else if (!write(pos, mmapAddr + pos, size)) { return errno; }#endif#endif pos += size; } while (mask != 0); map[i] = 0; } }#endif#if !defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION) && !defined(REPLICATION_SUPPORT) && !defined(NO_FLUSH_ON_COMMIT) if (msync(mmapAddr, mmapSize, MS_SYNC) != ok) { return errno; }#endif return ok;}int dbFile::erase(){#ifdef USE_SYSV_SHARED_MEMORY shmem.erase();#endif return ok;}int dbFile::close(){#if defined(REPLICATION_SUPPORT) if (db != NULL) { closing = true; stopSync(); { dbCriticalSection cs(replCS); if (nRecovered != 0) { recoveredEvent.wait(replCS); } } syncEvent.close(); recoveredEvent.close(); munmap((char*)diskUpdateCount, getMaxPages()*sizeof(int)); if (cfd >= 0) { ::close(cfd); } } delete[] currUpdateCount; currUpdateCount = NULL; dbFree(rootPage); rootPage = NULL;#endif // REPLICATION_SUPPORT if (mmapAddr != NULL) { #ifdef NO_MMAP int rc = flush(); if (rc != ok) { return rc; }#ifdef FUZZY_CHECKPOINT delete writer;#endif#endif#ifdef USE_SYSV_SHARED_MEMORY shmem.close();#elif defined(NO_MMAP) free(mmapAddr); #else if (munmap(mmapAddr, mmapSize) != ok) { return errno; }#endif mmapAddr = NULL;#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT) delete[] pageMap;#endif } return fd >= 0 && ::close(fd) != ok ? errno : ok;}char* dbFile::errorText(int code, char* buf, size_t bufSize){ strncpy(buf, strerror(code), bufSize-1); buf[bufSize-1] = '\0'; return buf;}#endifEND_FASTDB_NAMESPACE
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -