⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 file.cpp

📁 用于嵌入式环境的数据库
💻 CPP
📖 第 1 页 / 共 3 页
字号:
//-< FILE.CPP >------------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *
//                          Last update: 10-Dec-98    K.A. Knizhnik  * GARRET *
//-------------------------------------------------------------------*--------*
// System dependent implementation of mapped on memory file
//-------------------------------------------------------------------*--------*

#define INSIDE_FASTDB

#include "stdtp.h"
#include "file.h"

dbFile::dbFile()
{
    sharedName = NULL;
    mmapAddr = NULL;
    mmapSize = 0;

#ifdef REPLICATION_SUPPORT
    currUpdateCount = NULL; 
    diskUpdateCount = NULL;
    rootPage = NULL;
    db = NULL;
#endif
}

#if defined(REPLICATION_SUPPORT) || defined(NO_MMAP)
const int dbMaxSyncSegmentSize = 128*1024 / dbModMapBlockSize; 
#endif

#ifdef REPLICATION_SUPPORT

#include "database.h"

int dbFile::dbSyncTimeout = 1000; // one second

bool dbFile::updatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
    if (pos + size > mmapSize) { 
        size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
        setSize(newSize, sharedName);
    }
    if (s->read(mmapAddr + pos, size)) {        
        int pageNo = pos >> dbModMapBlockBits;
        if (updateCounter < pageUpdateCounter) { 
            updateCounter = pageUpdateCounter;
        }
        while (size > 0) { 
            currUpdateCount[pageNo++] = pageUpdateCounter;
            size -= dbModMapBlockSize;
        }
        return true;
    }
    return false;
}

bool dbFile::concurrentUpdatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
    if (pos + size > mmapSize) { 
        size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
        db->beginTransaction(dbDatabase::dbCommitLock);
        setSize(newSize, sharedName);
        ((dbHeader*)mmapAddr)->size = newSize;
        db->version = db->monitor->version += 1;
    }
    if (pos == 0 && size <= pageSize) { 
        if (!s->read(rootPage, size)) {         
            return false;
        }        
        if (((dbHeader*)rootPage)->curr != ((dbHeader*)mmapAddr)->curr) { 
            db->beginTransaction(dbDatabase::dbCommitLock);
            memcpy(mmapAddr, rootPage, size);
            // now readers will see updated data
            db->monitor->curr ^= 1;
            db->endTransaction();
        } else { 
            memcpy(mmapAddr, rootPage, size);
        }
    } else { 
        if (!s->read(mmapAddr + pos, size)) {   
            return false;
        }
    }
    int pageNo = pos >> dbModMapBlockBits;
    if (updateCounter < pageUpdateCounter) { 
        updateCounter = pageUpdateCounter;
    }
    while (size > 0) { 
        currUpdateCount[pageNo++] = pageUpdateCounter;
        size -= dbModMapBlockSize;
    }
    return true;
}

int dbFile::getUpdateCountTableSize() { 
    int nPages = mmapSize >> dbModMapBlockBits;
    while (--nPages >= 0 && diskUpdateCount[nPages] == 0);
    return nPages + 1;
}

int dbFile::getMaxPages() 
{ 
    return 1 << (dbDatabaseOffsetBits - dbModMapBlockBits);
}


void dbFile::startSync()
{
#ifndef DISKLESS_CONFIGURATION
    doSync = true;
    closing = false;
    syncEvent.reset();
    syncThread.create(startSyncToDisk, this);
#endif
}

void dbFile::stopSync()
{
#ifndef DISKLESS_CONFIGURATION
    doSync = false;
    syncEvent.signal();
    syncThread.join();
#endif
}

void thread_proc dbFile::startSyncToDisk(void* arg)
{
    ((dbFile*)arg)->syncToDisk();
}

void thread_proc dbFile::startRecovery(void* arg)
{    
    RecoveryRequest* rr = (RecoveryRequest*)arg;
    rr->file->doRecovery(rr->nodeId, rr->updateCounters, rr->nPages);
    { 
        dbCriticalSection cs(rr->file->replCS);
        if (--rr->file->nRecovered == 0) { 
            rr->file->recoveredEvent.signal();
        }
    }
    delete rr;
}

void dbFile::recovery(int nodeId, int* updateCounters, int nPages)
{
    RecoveryRequest* rr = new RecoveryRequest;
    rr->nodeId = nodeId;
    rr->updateCounters = updateCounters;
    rr->nPages = nPages;
    rr->file = this;
    { 
        dbCriticalSection cs(replCS);
        if (nRecovered++ == 0) { 
            recoveredEvent.reset();
        }
    }
    dbThread recoveryThread;
    recoveryThread.create(startRecovery, rr);
    recoveryThread.setPriority(dbThread::THR_PRI_HIGH);
}

void dbFile::doRecovery(int nodeId, int* updateCounters, int nPages)
{
    ReplicationRequest rr;
    memset(updateCounters+nPages, 0, (getMaxPages() - nPages)*sizeof(int));
    int i, j, n; 

    if (db->con[nodeId].reqSock == NULL) { 
        char buf[256];
        socket_t* s = socket_t::connect(db->serverURL[nodeId], 
                                        socket_t::sock_global_domain, 
                                        dbReplicatedDatabase::dbRecoveryConnectionAttempts);
        if (!s->is_ok()) { 
            s->get_error_text(buf, sizeof buf);
            dbTrace("Failed to establish connection with node %d: %s\n",
                    nodeId, buf);
            delete s;
            return;
        } 
        rr.op = ReplicationRequest::RR_GET_STATUS;
        rr.nodeId = db->id;
        if (!s->write(&rr, sizeof rr) || !s->read(&rr, sizeof rr)) { 
            s->get_error_text(buf, sizeof buf);
            dbTrace("Connection with node %d is broken: %s\n",
                    nodeId, buf);
            delete s;
            return;
        }
        if (rr.op != ReplicationRequest::RR_STATUS && rr.status != dbReplicatedDatabase::ST_STANDBY) { 
            dbTrace("Unexpected response from standby node %d: code %d status %d\n", 
                     nodeId, rr.op, rr.status);
            delete s;
            return;
        } else {
            db->addConnection(nodeId, s);
        }
    }

    while (true) { 
        int maxUpdateCount = 0;
        { 
            dbCriticalSection cs(syncCS);
            for (i = 0, j = 0, n = mmapSize >> dbModMapBlockBits; i < n; i++) { 
                if (updateCounters[i] > currUpdateCount[i]) { 
                    updateCounters[i] = 0;
                } else { 
                    if (updateCounters[i] > maxUpdateCount) { 
                        maxUpdateCount = updateCounters[i];
                    }
                }
                if (i > j && (currUpdateCount[i] <= updateCounters[i] || i-j >= dbMaxSyncSegmentSize
                              || currUpdateCount[i] != currUpdateCount[j])) 
                {                   
                    rr.op = ReplicationRequest::RR_UPDATE_PAGE;
                    rr.nodeId = nodeId;
                    rr.size = (i-j)*dbModMapBlockSize;
                    rr.page.offs = (size_t)j << dbModMapBlockBits;
                    rr.page.updateCount = currUpdateCount[j];
                    if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size)) {
                        delete[] updateCounters;
                        return;
                    }
                    j = i;
                }
                if (currUpdateCount[i] > updateCounters[i]) { 
                    if (currUpdateCount[i] > maxUpdateCount) { 
                        maxUpdateCount = currUpdateCount[i];
                    }
                    updateCounters[i] = currUpdateCount[i];
                } else { 
                    j = i + 1;
                }
            }      
            if (i != j) { 
                rr.op = ReplicationRequest::RR_UPDATE_PAGE;
                rr.nodeId = nodeId;
                rr.size = (i-j)*dbModMapBlockSize;
                rr.page.offs = (size_t)j << dbModMapBlockBits;
                rr.page.updateCount = currUpdateCount[j];
                if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size)) {
                    delete[] updateCounters;
                    return;
                }
            }   
        }
        { 
            dbCriticalSection cs(replCS);
            if (maxUpdateCount == updateCounter) { 
                dbTrace("Complete recovery of node %d\n", nodeId);
                delete[] updateCounters;
                rr.op = ReplicationRequest::RR_STATUS;
                rr.nodeId = nodeId;
                db->con[nodeId].status = rr.status = dbReplicatedDatabase::ST_STANDBY;
                for (i = 0, n = db->nServers; i < n; i++) {                 
                    if (db->con[i].status != dbReplicatedDatabase::ST_OFFLINE && i != db->id) {
                        db->writeReq(i, rr); 
                    }
                }
                return;
            }
        }
    }
}


#endif

bool dbFile::write(void const* buf, size_t size)
{
    size_t writtenBytes;
    bool result = write(buf, writtenBytes, size) == ok && writtenBytes == size;
    return result;
}

#ifdef _WIN32

class OS_info : public OSVERSIONINFO { 
  public: 
    OS_info() { 
        dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
        GetVersionEx(this);
    }
};

static OS_info osinfo;

#define BAD_POS 0xFFFFFFFF // returned by SetFilePointer and GetFileSize


int dbFile::erase()
{
    return ok;
}

int dbFile::open(char const* fileName, char const* sharedName, bool readonly,
                 size_t initSize, bool replicationSupport)
{
    int status;
    size_t fileSize;
#ifndef DISKLESS_CONFIGURATION
    fh = CreateFile(fileName, readonly ? GENERIC_READ : (GENERIC_READ|GENERIC_WRITE), 
                    FILE_SHARE_READ | FILE_SHARE_WRITE, FASTDB_SECURITY_ATTRIBUTES, 
                    readonly ? OPEN_EXISTING : OPEN_ALWAYS,
                    FILE_FLAG_RANDOM_ACCESS
#ifdef NO_MMAP
                    |FILE_FLAG_NO_BUFFERING
#endif
#if 0 // not needed as we do explicit flush ???
                    |FILE_FLAG_WRITE_THROUGH
#endif
                    , NULL);
    if (fh == INVALID_HANDLE_VALUE) {
        return GetLastError();
    }
    DWORD highSize;
    fileSize = GetFileSize(fh, &highSize);
    if (fileSize == BAD_POS && (status = GetLastError()) != ok) {
        CloseHandle(fh);
        return status;
    }
    assert(highSize == 0);
    
    mmapSize = fileSize;

    this->sharedName = new char[strlen(sharedName) + 1];
    strcpy(this->sharedName, sharedName);

    if (!readonly && fileSize == 0) { 
        mmapSize = initSize;
    }
#else
    fh = INVALID_HANDLE_VALUE;
    this->sharedName = NULL;
    mmapSize = fileSize = initSize;
#endif
#if defined(NO_MMAP)
    if (fileSize < mmapSize && !readonly) { 
        if (SetFilePointer(fh, mmapSize, NULL, FILE_BEGIN) != mmapSize || !SetEndOfFile(fh)) {
            status = GetLastError();
            CloseHandle(fh);
            return status;
        }
    }
    mmapAddr = (char*)VirtualAlloc(NULL, mmapSize, MEM_COMMIT|MEM_RESERVE, 
                                   PAGE_READWRITE);
           
#ifdef DISKLESS_CONFIGURATION
    if (mmapAddr == NULL) 
#else
    DWORD readBytes;
    if (mmapAddr == NULL
        || !ReadFile(fh, mmapAddr, fileSize, &readBytes, NULL) || readBytes != fileSize) 
#endif    
    {  
        status = GetLastError();
        if (fh != INVALID_HANDLE_VALUE) { 
            CloseHandle(fh);
        }
        return status;
    } 
    memset(mmapAddr+fileSize, 0, mmapSize - fileSize);
    mh = NULL;
#else
    mh = CreateFileMapping(fh, FASTDB_SECURITY_ATTRIBUTES, readonly ? PAGE_READONLY : PAGE_READWRITE, 
                           0, mmapSize, sharedName);
    status = GetLastError();
    if (mh == NULL) { 
        if (fh != INVALID_HANDLE_VALUE) { 
            CloseHandle(fh);
        }
        return status;
    }
    mmapAddr = (char*)MapViewOfFile(mh, readonly 
                                    ? FILE_MAP_READ : FILE_MAP_ALL_ACCESS, 
                                    0, 0, 0);
    if (mmapAddr == NULL) { 
        status = GetLastError();
        CloseHandle(mh);
        if (fh != INVALID_HANDLE_VALUE) { 
            CloseHandle(fh);
        }
        return status;
    } 
    if (status != ERROR_ALREADY_EXISTS && mmapSize > fileSize)
        //      && osinfo.dwPlatformId != VER_PLATFORM_WIN32_NT) 
    { 
        // Windows 95 doesn't initialize pages
        memset(mmapAddr+fileSize, 0, mmapSize - fileSize);
    }
#endif

#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
    SYSTEM_INFO systemInfo;
    GetSystemInfo(&systemInfo);
    pageSize = systemInfo.dwPageSize;
    pageMapSize = (mmapSize + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);
    pageMap = new int[pageMapSize];
    memset(pageMap, 0, pageMapSize*sizeof(int));
#endif

#if defined(REPLICATION_SUPPORT)
    db = NULL;
    int nPages = getMaxPages();        
    currUpdateCount = new int[nPages];

    if (replicationSupport) { 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -