⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 file.cpp

📁 最新版本!fastdb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 4 页
字号:
//-< FILE.CPP >------------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *
//                          Last update: 10-Dec-98    K.A. Knizhnik  * GARRET *
//-------------------------------------------------------------------*--------*
// System dependent implementation of mapped on memory file
//-------------------------------------------------------------------*--------*

#ifdef UNIX
#include <unistd.h>
#include <fcntl.h>

static int set_fd_close_on_exec (int fd)
{
    int oldflags = ::fcntl(fd, F_GETFD, 0);

    if (oldflags < 0)
        return oldflags;

    oldflags |= FD_CLOEXEC;

    return ::fcntl(fd, F_SETFD, oldflags);
}

#ifdef VXWORKS
#define ftruncate ftruncate_shim_implementation

int ftruncate_shim_implementation(int fd, unsigned int size) {
    int cur = lseek(fd, 0, SEEK_CUR);
    int ret = lseek(fd, size-1, SEEK_END);

    if( ret == -1)
        return -1;

    ::write(fd, "0", 1);

    // Set the seek position back
    ret = lseek(fd, cur, SEEK_SET);

    return (ret == -1) ? -1 : 0;
}
#endif // VXWORKS

#endif // UNIX

#define INSIDE_FASTDB

#include "stdtp.h"
#include "file.h"
#include "database.h"

#ifdef VXWORKS
#include "fastdbShim.h"
#endif // VXWORKS

BEGIN_FASTDB_NAMESPACE

const size_t MAX_READ_BUFFER_SIZE = 16*1024*1024;

#if FUZZY_CHECKPOINT
struct dbWriteQuery {
    dbWriteQuery* next;
    size_t        pos;
    char          page[dbPageSize];
};
    
const int DEFAULT_QUEUE_LENGTH_LIMIT = 16*1024;

class dbFileWriter { 
  private:
    dbLocalSemaphore event;
    dbMutex          mutex;
    dbWriteQuery*    last;
    dbWriteQuery*    first;
    dbWriteQuery*    free;
    int              queueLength;
    int              queueLengthLimit;
    int              underflow;
    int              overflow;
    int              running;
    dbThread         thread;
    dbFile*          file;

    static void thread_proc writeLoop(void* arg) { 
        ((dbFileWriter*)arg)->write();
    }

    void write() { 
        mutex.lock();
        while (true) {
            dbWriteQuery* query = first;
            if (query != NULL) {
                queueLength -= 1;
                if (overflow) { 
                    overflow = false;
                    event.signal();
                }                    
                first = query->next;
                mutex.unlock();
                file->write(query->pos, query->page, dbPageSize);
                mutex.lock();
                query->next = free;
                free = query;
            } else { 
                if (!running) { 
                    break;
                }
                underflow = true;
                event.wait(mutex);
            }
        }
        mutex.unlock();
    }

  public:
    dbFileWriter(dbFile* file) {
        this->file = file;
        queueLength = 0;
        queueLengthLimit = DEFAULT_QUEUE_LENGTH_LIMIT;
        event.open();
        first = last = NULL;
        free = NULL;
        overflow = underflow = false;
        running = true;
        thread.create(&writeLoop, this);
    }

    void setQueueLimit(size_t limit) { 
        queueLengthLimit = limit;
    }
         
    ~dbFileWriter() {
        mutex.lock();        
        running = false;
        event.signal();
        mutex.unlock();
        thread.join();
        dbWriteQuery* query = free; 
        while (query != NULL) {
            dbWriteQuery* next = query->next;
            delete query;
            query = next;
        }
        event.close();
    }
    
    void put(size_t pos, void* page, size_t size) { 
        char* beg = (char*)page;
        char* end = beg + size;
        while (beg < end) {
            put(pos, beg);
            pos += dbPageSize;
            beg += dbPageSize;
        }
    }

    void put(size_t pos, void* page) { 
        mutex.lock();
        while (queueLength >= queueLengthLimit) { 
            overflow = true;
            event.wait(mutex);
        }
        dbWriteQuery* query = free;
        if (query == NULL) { 
            query = new dbWriteQuery();
        } else {                                
            free = query->next;
        }
        if (first == NULL) { 
            first = query;
        } else { 
            last->next = query;
        }
        queueLength += 1;
        last = query;
        query->next = NULL;
        query->pos = pos;
        memcpy(query->page, page, dbPageSize);
        if (underflow) { 
            underflow = false;
            event.signal();
        }
        mutex.unlock();
    }
};

void dbFile::setCheckpointBufferSize(size_t nPages) 
{ 
    writer->setQueueLimit(nPages);
}

#endif

dbFile::dbFile()
{
    sharedName = NULL;
    mmapAddr = NULL;
    mmapSize = 0;
    readonly = false;

#ifdef REPLICATION_SUPPORT
    currUpdateCount = NULL; 
    diskUpdateCount = NULL;
    rootPage = NULL;
    db = NULL;
#endif
}

dbFile::~dbFile()
{
}

#if defined(REPLICATION_SUPPORT) || defined(NO_MMAP)
const size_t dbMaxSyncSegmentSize = 128*1024 / dbModMapBlockSize; 
#endif

#ifdef REPLICATION_SUPPORT

#include "database.h"

int dbFile::dbSyncTimeout = 1000; // one second

bool dbFile::updatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
    if (pos + size > mmapSize) { 
        size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
        setSize(newSize, sharedName);
        ((dbHeader*)mmapAddr)->size = newSize;
        db->baseAddr = (byte*)mmapAddr;
        db->header = (dbHeader*)mmapAddr;
        db->version = db->monitor->version += 1;
    }
    if (s->read(mmapAddr + pos, size)) {        
        int pageNo = pos >> dbModMapBlockBits;
        if (updateCounter < pageUpdateCounter) { 
            updateCounter = pageUpdateCounter;
        }
        while (size > 0) { 
            currUpdateCount[pageNo++] = pageUpdateCounter;
            size -= dbModMapBlockSize;
        }
        return true;
    }
    return false;
}

bool dbFile::concurrentUpdatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
    if (pos + size > mmapSize) { 
        size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
        TRACE_IMSG(("Extend database from %ld to %ld\n", (long)mmapSize, (long)newSize));
        db->beginTransaction(dbDatabase::dbCommitLock);
        setSize(newSize, sharedName);
        ((dbHeader*)mmapAddr)->size = newSize;
#ifdef PROTECT_DATABASE
        protect(0, mmapSize);
#endif
        db->baseAddr = (byte*)mmapAddr;
        db->header = (dbHeader*)mmapAddr;
        db->version = db->monitor->version += 1;
        db->endTransaction();
    }
    if (pos == 0 && size <= pageSize) { 
        if (!s->read(rootPage, size)) {         
            return false;
        }        
        if (((dbHeader*)rootPage)->curr != ((dbHeader*)mmapAddr)->curr) { 
            TRACE_MSG(("Commit transaction at replica\n"));
            db->beginTransaction(dbDatabase::dbCommitLock);
#ifdef PROTECT_DATABASE
            unprotect(pos, size);
#endif
            memcpy(mmapAddr, rootPage, size);
            // now readers will see updated data
            db->monitor->curr = ((dbHeader*)mmapAddr)->curr;
            db->endTransaction();
#ifdef SYNCHRONOUS_REPLICATION
            ReplicationRequest rr;
            rr.op = ReplicationRequest::RR_COMMITTED;
            rr.size = 0;
            s->write(&rr, sizeof rr);
#endif
        } else { 
#ifdef PROTECT_DATABASE
            unprotect(pos, size);
#endif
            memcpy(mmapAddr, rootPage, size);
        }
    } else { 
#ifdef PROTECT_DATABASE
        unprotect(pos, size);
#endif
        if (!s->read(mmapAddr + pos, size)) {   
#ifdef PROTECT_DATABASE
            protect(pos, size);
#endif
            return false;
        }
        assert(((dbHeader*)mmapAddr)->curr == db->monitor->curr);
    }
#ifdef PROTECT_DATABASE
    protect(pos, size);
#endif
    int pageNo = pos >> dbModMapBlockBits;
    if (updateCounter < pageUpdateCounter) { 
        updateCounter = pageUpdateCounter;
    }
    while (size > 0) { 
        currUpdateCount[pageNo++] = pageUpdateCounter;
        size -= dbModMapBlockSize;
    }
    return true;
}

int dbFile::getUpdateCountTableSize() { 
    int nPages = mmapSize >> dbModMapBlockBits;
    while (--nPages >= 0 && diskUpdateCount[nPages] == 0);
    return nPages + 1;
}

int dbFile::getMaxPages() 
{ 
    return 1 << (dbDatabaseOffsetBits - dbModMapBlockBits);
}


void dbFile::startSync()
{
#ifndef DISKLESS_CONFIGURATION
    doSync = true;
    closing = false;
    syncEvent.reset();
    syncThread.create(startSyncToDisk, this);
#endif
}

void dbFile::stopSync()
{
#ifndef DISKLESS_CONFIGURATION
    doSync = false;
    syncEvent.signal();
    syncThread.join();
#endif
}

void thread_proc dbFile::startSyncToDisk(void* arg)
{
    ((dbFile*)arg)->syncToDisk();
}

void thread_proc dbFile::startRecovery(void* arg)
{    
    RecoveryRequest* rr = (RecoveryRequest*)arg;
    rr->file->doRecovery(rr->nodeId, rr->updateCounters, rr->nPages);
    { 
        dbCriticalSection cs(rr->file->replCS);
        if (--rr->file->nRecovered == 0) { 
            rr->file->recoveredEvent.signal();
        }
    }
    delete rr;
}

void dbFile::recovery(int nodeId, int* updateCounters, int nPages)
{
    RecoveryRequest* rr = new RecoveryRequest;
    rr->nodeId = nodeId;
    rr->updateCounters = updateCounters;
    rr->nPages = nPages;
    rr->file = this;
    { 
        dbCriticalSection cs(replCS);
        if (nRecovered++ == 0) { 
            recoveredEvent.reset();
        }
    }
    dbThread recoveryThread;
    recoveryThread.create(startRecovery, rr);
    recoveryThread.setPriority(dbThread::THR_PRI_HIGH);
}


int dbFile::sendChanges(int nodeId, int* updateCounters, int nPages)
{
    ReplicationRequest rr;
    int maxUpdateCount = 0;
    TRACE_MSG(("Database synchronization: mmapSize=%ld\n", mmapSize));
    size_t i, j, n;

    for (i = 0, j = 0, n = mmapSize >> dbModMapBlockBits; i < n; i++) { 
        if (updateCounters[i] > currUpdateCount[i]) { 
            updateCounters[i] = 0;
        } else { 
            if (updateCounters[i] > maxUpdateCount) { 
                maxUpdateCount = updateCounters[i];
            }
        }
        if (i > j && (currUpdateCount[i] <= updateCounters[i] || i-j >= dbMaxSyncSegmentSize
                      || currUpdateCount[i] != currUpdateCount[j])) 
        {                    
            db->con[nodeId].nRecoveredPages += (i - j);
            rr.op = ReplicationRequest::RR_RECOVER_PAGE;
            rr.nodeId = nodeId;
            rr.size = (i-j)*dbModMapBlockSize;
            rr.page.offs = j << dbModMapBlockBits;
            rr.page.updateCount = currUpdateCount[j];
            TRACE_MSG(("Send segment [%lx, %ld]\n", rr.page.offs, rr.size));
            if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size)) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -