⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 file.cpp

📁 FastDb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 3 页
字号:
//-< FILE.CPP >------------------------------------------------------*--------*// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *// (Main Memory Database Management System)                          *   /\|  *//                                                                   *  /  \  *//                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *//                          Last update: 10-Dec-98    K.A. Knizhnik  * GARRET *//-------------------------------------------------------------------*--------*// System dependent implementation of mapped on memory file//-------------------------------------------------------------------*--------*#define INSIDE_FASTDB#include "stdtp.h"#include "file.h"#include "database.h"BEGIN_FASTDB_NAMESPACE#if FUZZY_CHECKPOINTstruct dbWriteQuery {    dbWriteQuery* next;    size_t        pos;    char          page[dbPageSize];};    #define DEFAULT_QUEUE_LENGTH_LIMIT 16*1024class dbFileWriter {   private:    dbLocalSemaphore event;    dbMutex          mutex;    dbWriteQuery*    last;    dbWriteQuery*    first;    dbWriteQuery*    free;    int              queueLength;    int              queueLengthLimit;    int              underflow;    int              overflow;    int              running;    dbThread         thread;    dbFile*          file;    static void thread_proc writeLoop(void* arg) {         ((dbFileWriter*)arg)->write();    }    void write() {         mutex.lock();        while (true) {            dbWriteQuery* query = first;            if (query != NULL) {                queueLength -= 1;                if (overflow) {                     overflow = false;                    event.signal();                }                                    first = query->next;                mutex.unlock();                file->write(query->pos, query->page, dbPageSize);                mutex.lock();                query->next = free;                free = query;            } else {                 if (!running) {                     break;                }                underflow = true;                event.wait(mutex);            }        }        mutex.unlock();    }  public:    dbFileWriter(dbFile* file) {        this->file = file;        queueLength = 0;        queueLengthLimit = DEFAULT_QUEUE_LENGTH_LIMIT;        event.open();        first = last = NULL;        free = NULL;        overflow = underflow = false;        running = true;        thread.create(&writeLoop, this);    }    void setQueueLimit(size_t limit) {         queueLengthLimit = limit;    }             ~dbFileWriter() {        mutex.lock();                running = false;        event.signal();        mutex.unlock();        thread.join();        dbWriteQuery* query = first;         while (query != NULL) {            dbWriteQuery* next = query->next;            delete query;            query = next;        }        event.close();    }        void put(size_t pos, void* page, size_t size) {         char* beg = (char*)page;        char* end = beg + size;        while (beg < end) {            put(pos, beg);            pos += dbPageSize;            beg += dbPageSize;        }    }    void put(size_t pos, void* page) {         mutex.lock();        while (queueLength >= queueLengthLimit) {             overflow = true;            event.wait(mutex);        }        dbWriteQuery* query = free;        if (query == NULL) {             query = new dbWriteQuery();        } else {                                            free = query->next;        }        if (first == NULL) {             first = query;        } else {             last->next = query;        }        queueLength += 1;        last = query;        query->next = NULL;        query->pos = pos;        memcpy(query->page, page, dbPageSize);        if (underflow) {             underflow = false;            event.signal();        }        mutex.unlock();    }};void dbFile::setCheckpointBufferSize(size_t nPages) {     writer->setQueueLimit(nPages);}#endifdbFile::dbFile(){    sharedName = NULL;    mmapAddr = NULL;    mmapSize = 0;    readonly = false;#ifdef REPLICATION_SUPPORT    currUpdateCount = NULL;     diskUpdateCount = NULL;    rootPage = NULL;    db = NULL;#endif}dbFile::~dbFile(){}#if defined(REPLICATION_SUPPORT) || defined(NO_MMAP)const int dbMaxSyncSegmentSize = 128*1024 / dbModMapBlockSize; #endif#ifdef REPLICATION_SUPPORT#include "database.h"int dbFile::dbSyncTimeout = 1000; // one secondbool dbFile::updatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size){    if (pos + size > mmapSize) {         size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;        setSize(newSize, sharedName);    }    if (s->read(mmapAddr + pos, size)) {                int pageNo = pos >> dbModMapBlockBits;        if (updateCounter < pageUpdateCounter) {             updateCounter = pageUpdateCounter;        }        while (size > 0) {             currUpdateCount[pageNo++] = pageUpdateCounter;            size -= dbModMapBlockSize;        }        return true;    }    return false;}bool dbFile::concurrentUpdatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size){    if (pos + size > mmapSize) {         size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;        db->beginTransaction(dbDatabase::dbCommitLock);        setSize(newSize, sharedName);        ((dbHeader*)mmapAddr)->size = newSize;        db->baseAddr = (byte*)mmapAddr;        db->header = (dbHeader*)mmapAddr;        db->version = db->monitor->version += 1;    }    if (pos == 0 && size <= pageSize) {         if (!s->read(rootPage, size)) {                     return false;        }                if (((dbHeader*)rootPage)->curr != ((dbHeader*)mmapAddr)->curr) {             db->beginTransaction(dbDatabase::dbCommitLock);            memcpy(mmapAddr, rootPage, size);            // now readers will see updated data            db->monitor->curr ^= 1;            db->endTransaction();        } else {             memcpy(mmapAddr, rootPage, size);        }    } else {         if (!s->read(mmapAddr + pos, size)) {               return false;        }    }    int pageNo = pos >> dbModMapBlockBits;    if (updateCounter < pageUpdateCounter) {         updateCounter = pageUpdateCounter;    }    while (size > 0) {         currUpdateCount[pageNo++] = pageUpdateCounter;        size -= dbModMapBlockSize;    }    return true;}int dbFile::getUpdateCountTableSize() {     int nPages = mmapSize >> dbModMapBlockBits;    while (--nPages >= 0 && diskUpdateCount[nPages] == 0);    return nPages + 1;}int dbFile::getMaxPages() {     return 1 << (dbDatabaseOffsetBits - dbModMapBlockBits);}void dbFile::startSync(){#ifndef DISKLESS_CONFIGURATION    doSync = true;    closing = false;    syncEvent.reset();    syncThread.create(startSyncToDisk, this);#endif}void dbFile::stopSync(){#ifndef DISKLESS_CONFIGURATION    doSync = false;    syncEvent.signal();    syncThread.join();#endif}void thread_proc dbFile::startSyncToDisk(void* arg){    ((dbFile*)arg)->syncToDisk();}void thread_proc dbFile::startRecovery(void* arg){        RecoveryRequest* rr = (RecoveryRequest*)arg;    rr->file->doRecovery(rr->nodeId, rr->updateCounters, rr->nPages);    {         dbCriticalSection cs(rr->file->replCS);        if (--rr->file->nRecovered == 0) {             rr->file->recoveredEvent.signal();        }    }    delete rr;}void dbFile::recovery(int nodeId, int* updateCounters, int nPages){    RecoveryRequest* rr = new RecoveryRequest;    rr->nodeId = nodeId;    rr->updateCounters = updateCounters;    rr->nPages = nPages;    rr->file = this;    {         dbCriticalSection cs(replCS);        if (nRecovered++ == 0) {             recoveredEvent.reset();        }    }    dbThread recoveryThread;    recoveryThread.create(startRecovery, rr);    recoveryThread.setPriority(dbThread::THR_PRI_HIGH);}void dbFile::doRecovery(int nodeId, int* updateCounters, int nPages){    ReplicationRequest rr;    memset(updateCounters+nPages, 0, (getMaxPages() - nPages)*sizeof(int));    int i, j, n;     if (db->con[nodeId].reqSock == NULL) {         char buf[256];        socket_t* s = socket_t::connect(db->serverURL[nodeId],                                         socket_t::sock_global_domain,                                         dbReplicatedDatabase::dbRecoveryConnectionAttempts);        if (!s->is_ok()) {             s->get_error_text(buf, sizeof buf);            dbTrace("Failed to establish connection with node %d: %s\n",                    nodeId, buf);            delete s;            return;        }         rr.op = ReplicationRequest::RR_GET_STATUS;        rr.nodeId = db->id;        if (!s->write(&rr, sizeof rr) || !s->read(&rr, sizeof rr)) {             s->get_error_text(buf, sizeof buf);            dbTrace("Connection with node %d is broken: %s\n",                    nodeId, buf);            delete s;            return;        }        if (rr.op != ReplicationRequest::RR_STATUS && rr.status != dbReplicatedDatabase::ST_STANDBY) {             dbTrace("Unexpected response from standby node %d: code %d status %d\n",                      nodeId, rr.op, rr.status);            delete s;            return;        } else {            db->addConnection(nodeId, s);        }    }    while (true) {         int maxUpdateCount = 0;        {             dbCriticalSection cs(syncCS);            TRACE_MSG(("Database synchronization: mmapSize=%ld\n", mmapSize));            for (i = 0, j = 0, n = mmapSize >> dbModMapBlockBits; i < n; i++) {                 if (updateCounters[i] > currUpdateCount[i]) {                     updateCounters[i] = 0;                } else {                     if (updateCounters[i] > maxUpdateCount) {                         maxUpdateCount = updateCounters[i];                    }                }                if (i > j && (currUpdateCount[i] <= updateCounters[i] || i-j >= dbMaxSyncSegmentSize                              || currUpdateCount[i] != currUpdateCount[j]))                 {                                       rr.op = ReplicationRequest::RR_UPDATE_PAGE;                    rr.nodeId = nodeId;                    rr.size = (i-j)*dbModMapBlockSize;                    rr.page.offs = (size_t)j << dbModMapBlockBits;                    rr.page.updateCount = currUpdateCount[j];                    TRACE_MSG(("Send segment [%lx, %ld]\n", rr.page.offs, rr.size));                     if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size)) {                        delete[] updateCounters;                        return;                    }                    j = i;                }                if (i >= nPages || currUpdateCount[i] > updateCounters[i]) {                     if (currUpdateCount[i] > maxUpdateCount) {                         maxUpdateCount = currUpdateCount[i];                    }                    updateCounters[i] = currUpdateCount[i];                } else {                     j = i + 1;                }            }                  if (i != j) {                 rr.op = ReplicationRequest::RR_UPDATE_PAGE;                rr.nodeId = nodeId;                rr.size = (i-j)*dbModMapBlockSize;                rr.page.offs = (size_t)j << dbModMapBlockBits;                rr.page.updateCount = currUpdateCount[j];                TRACE_MSG(("Send segment [%lx, %ld]\n", rr.page.offs, rr.size));                if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size)) {                    delete[] updateCounters;                    return;                }            }           }        {             dbCriticalSection cs(replCS);            if (maxUpdateCount == updateCounter) {                 dbTrace("Complete recovery of node %d\n", nodeId);                delete[] updateCounters;                rr.op = ReplicationRequest::RR_STATUS;                rr.nodeId = nodeId;                db->con[nodeId].status = rr.status = dbReplicatedDatabase::ST_STANDBY;                for (i = 0, n = db->nServers; i < n; i++) {                                     if (db->con[i].status != dbReplicatedDatabase::ST_OFFLINE && i != db->id) {                        db->writeReq(i, rr);                     }                }                return;            }        }    }}#endifbool dbFile::write(void const* buf, size_t size){    size_t writtenBytes;    bool result = write(buf, writtenBytes, size) == ok && writtenBytes == size;    return result;}#ifdef _WIN32class OS_info : public OSVERSIONINFO {   public:     OS_info() {         dwOSVersionInfoSize = sizeof(OSVERSIONINFO);        GetVersionEx(this);    }};static OS_info osinfo;#define BAD_POS 0xFFFFFFFF // returned by SetFilePointer and GetFileSizeint dbFile::erase(){    return ok;}int dbFile::open(char const* fileName, char const* sharedName, bool readonly,                 size_t initSize, bool replicationSupport){    int status;    size_t fileSize;    this->readonly = readonly;#ifndef DISKLESS_CONFIGURATION    fh = CreateFile(W32_STRING(fileName), readonly ? GENERIC_READ : (GENERIC_READ|GENERIC_WRITE),                     FILE_SHARE_READ | FILE_SHARE_WRITE, FASTDB_SECURITY_ATTRIBUTES,                     readonly ? OPEN_EXISTING : OPEN_ALWAYS,#ifdef _WINCE                    FILE_ATTRIBUTE_NORMAL#else                    FILE_FLAG_RANDOM_ACCESS#ifdef NO_MMAP                    |FILE_FLAG_NO_BUFFERING#endif#if 0 // not needed as we do explicit flush ???                    |FILE_FLAG_WRITE_THROUGH#endif#endif                    , NULL);    if (fh == INVALID_HANDLE_VALUE) {        return GetLastError();    }    DWORD highSize;    fileSize = GetFileSize(fh, &highSize);    if (fileSize == BAD_POS && (status = GetLastError()) != ok) {        CloseHandle(fh);        return status;    }    assert(highSize == 0);    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -