⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 file.cpp

📁 FastDb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 3 页
字号:
//-< FILE.CPP >------------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *
//                          Last update: 10-Dec-98    K.A. Knizhnik  * GARRET *
//-------------------------------------------------------------------*--------*
// System dependent implementation of mapped on memory file
//-------------------------------------------------------------------*--------*

#define INSIDE_FASTDB

#include "stdtp.h"
#include "file.h"


dbFile::dbFile()
{
  sharedName = NULL;
  mmapAddr = NULL;
  mmapSize = 0;

#ifdef REPLICATION_SUPPORT

  currUpdateCount = NULL;
  diskUpdateCount = NULL;
  rootPage = NULL;
  db = NULL;
#endif
}

#if defined(REPLICATION_SUPPORT) || defined(NO_MMAP)
const int dbMaxSyncSegmentSize = 128*1024 / dbModMapBlockSize;
#endif

#ifdef REPLICATION_SUPPORT

#include "database.h"

int dbFile::dbSyncTimeout = 1000; // one second

bool dbFile::updatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
  if (pos + size > mmapSize)
  {
    size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
    setSize(newSize, sharedName);
  }

  if (s->read(mmapAddr + pos, size))
  {
    int pageNo = pos >> dbModMapBlockBits;

    if (updateCounter < pageUpdateCounter)
    {
      updateCounter = pageUpdateCounter;
    }

    while (size > 0)
    {
      currUpdateCount[pageNo++] = pageUpdateCounter;
      size -= dbModMapBlockSize;
    }

    return true;
  }

  return false;
}

bool dbFile::concurrentUpdatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
  if (pos + size > mmapSize)
  {
    size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
    db->beginTransaction(dbDatabase::dbCommitLock);
    setSize(newSize, sharedName);
    ((dbHeader*)mmapAddr)->size = newSize;
    db->version = db->monitor->version += 1;
  }

  if (pos == 0 && size <= pageSize)
  {
    if (!s->read(rootPage, size))
    {
      return false;
    }

    if (((dbHeader*)rootPage)->curr != ((dbHeader*)mmapAddr)->curr)
    {
      db->beginTransaction(dbDatabase::dbCommitLock);
      memcpy(mmapAddr, rootPage, size);
      // now readers will see updated data
      db->monitor->curr ^= 1;
      db->endTransaction();
    }
    else
    {
      memcpy(mmapAddr, rootPage, size);
    }
  }
  else
  {
    if (!s->read(mmapAddr + pos, size))
    {
      return false;
    }
  }

  int pageNo = pos >> dbModMapBlockBits;

  if (updateCounter < pageUpdateCounter)
  {
    updateCounter = pageUpdateCounter;
  }

  while (size > 0)
  {
    currUpdateCount[pageNo++] = pageUpdateCounter;
    size -= dbModMapBlockSize;
  }

  return true;
}

int dbFile::getUpdateCountTableSize()
{
  int nPages = mmapSize >> dbModMapBlockBits;

  while (--nPages >= 0 && diskUpdateCount[nPages] == 0)

    ;
  return nPages + 1;
}

int dbFile::getMaxPages()
{
  return 1 << (dbDatabaseOffsetBits - dbModMapBlockBits);
}


void dbFile::startSync()
{
#ifndef DISKLESS_CONFIGURATION
  doSync = true;
  closing = false;
  syncEvent.reset();
  syncThread.create(startSyncToDisk, this);
#endif
}

void dbFile::stopSync()
{
#ifndef DISKLESS_CONFIGURATION
  doSync = false;
  syncEvent.signal();
  syncThread.join();
#endif
}

void thread_proc dbFile::startSyncToDisk(void* arg)
{
  ((dbFile*)arg)->syncToDisk();
}

void thread_proc dbFile::startRecovery(void* arg)
{
  RecoveryRequest* rr = (RecoveryRequest*)arg;
  rr->file->doRecovery(rr->nodeId, rr->updateCounters, rr->nPages);
  {
    dbCriticalSection cs(rr->file->replCS);

    if (--rr->file->nRecovered == 0)
    {
      rr->file->recoveredEvent.signal();
    }
  }

  delete rr;
}

void dbFile::recovery(int nodeId, int* updateCounters, int nPages)
{
  RecoveryRequest* rr = new RecoveryRequest;
  rr->nodeId = nodeId;
  rr->updateCounters = updateCounters;
  rr->nPages = nPages;
  rr->file = this;
  {
    dbCriticalSection cs(replCS);

    if (nRecovered++ == 0)
    {
      recoveredEvent.reset();
    }
  }

  dbThread recoveryThread;
  recoveryThread.create(startRecovery, rr);
  recoveryThread.setPriority(dbThread::THR_PRI_HIGH);
}

void dbFile::doRecovery(int nodeId, int* updateCounters, int nPages)
{
  ReplicationRequest rr;
  memset(updateCounters+nPages, 0, (getMaxPages() - nPages)*sizeof(int));
  int i, j, n;

  if (db->con[nodeId].reqSock == NULL)
  {
    char buf[256];
    socket_t* s = socket_t::connect(db->serverURL[nodeId],
                                    socket_t::sock_global_domain,
                                    dbReplicatedDatabase::dbRecoveryConnectionAttempts);

    if (!s->is_ok())
    {
      s->get_error_text(buf, sizeof buf);
      dbTrace("Failed to establish connection with node %d: %s\n",
              nodeId, buf);
      delete s;
      return;
    }

    rr.op = ReplicationRequest::RR_GET_STATUS;
    rr.nodeId = db->id;

    if (!s->write(&rr, sizeof rr) || !s->read(&rr, sizeof rr))
    {
      s->get_error_text(buf, sizeof buf);
      dbTrace("Connection with node %d is broken: %s\n",
              nodeId, buf);
      delete s;
      return;
    }

    if (rr.op != ReplicationRequest::RR_STATUS && rr.status != dbReplicatedDatabase::ST_STANDBY)
    {
      dbTrace("Unexpected response from standby node %d: code %d status %d\n",
              nodeId, rr.op, rr.status);
      delete s;
      return;
    }
    else
    {
      db->addConnection(nodeId, s);
    }
  }

  while (true)
  {
    int maxUpdateCount = 0;
    {
      dbCriticalSection cs(syncCS);

      for (i = 0, j = 0, n = mmapSize >> dbModMapBlockBits; i < n; i++)
      {
        if (updateCounters[i] > currUpdateCount[i])
        {
          updateCounters[i] = 0;
        }
        else
        {
          if (updateCounters[i] > maxUpdateCount)
          {
            maxUpdateCount = updateCounters[i];
          }
        }

        if (i > j && (currUpdateCount[i] <= updateCounters[i] || i-j >= dbMaxSyncSegmentSize
                      || currUpdateCount[i] != currUpdateCount[j]))
        {
          rr.op = ReplicationRequest::RR_UPDATE_PAGE;
          rr.nodeId = nodeId;
          rr.size = (i-j)*dbModMapBlockSize;
          rr.page.offs = (size_t)j << dbModMapBlockBits;
          rr.page.updateCount = currUpdateCount[j];

          if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size))
          {
            delete[] updateCounters;
            return;
          }

          j = i;
        }

        if (currUpdateCount[i] > updateCounters[i])
        {
          if (currUpdateCount[i] > maxUpdateCount)
          {
            maxUpdateCount = currUpdateCount[i];
          }

          updateCounters[i] = currUpdateCount[i];
        }
        else
        {
          j = i + 1;
        }
      }

      if (i != j)
      {
        rr.op = ReplicationRequest::RR_UPDATE_PAGE;
        rr.nodeId = nodeId;
        rr.size = (i-j)*dbModMapBlockSize;
        rr.page.offs = (size_t)j << dbModMapBlockBits;
        rr.page.updateCount = currUpdateCount[j];

        if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size))
        {
          delete[] updateCounters;
          return;
        }
      }
    }

    {
      dbCriticalSection cs(replCS);

      if (maxUpdateCount == updateCounter)
      {
        dbTrace("Complete recovery of node %d\n", nodeId);
        delete[] updateCounters;
        rr.op = ReplicationRequest::RR_STATUS;
        rr.nodeId = nodeId;
        db->con[nodeId].status = rr.status = dbReplicatedDatabase::ST_STANDBY;

        for (i = 0, n = db->nServers; i < n; i++)
        {
          if (db->con[i].status != dbReplicatedDatabase::ST_OFFLINE && i != db->id)
          {
            db->writeReq(i, rr);
          }
        }

        return;
      }
    }
  }
}


#endif

bool dbFile::write(void const* buf, size_t size)
{
  size_t writtenBytes;
  bool result = write(buf, writtenBytes, size) == ok && writtenBytes == size;
  assert(result);
  return result;
}

#ifdef _WIN32

class OS_info : public OSVERSIONINFO
{

public:
  OS_info()
  {
    dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
    GetVersionEx(this);
  }
};

static OS_info osinfo;

#define BAD_POS 0xFFFFFFFF // returned by SetFilePointer and GetFileSize


int dbFile::erase()
{
  return ok;
}

int dbFile::open(char const* fileName, char const* sharedName, bool readonly,
                 size_t initSize, bool replicationSupport)
{
  int status;
  size_t fileSize;
#ifndef DISKLESS_CONFIGURATION

  fh = CreateFile(fileName, readonly ? GENERIC_READ : (GENERIC_READ|GENERIC_WRITE),
                  FILE_SHARE_READ | FILE_SHARE_WRITE, FASTDB_SECURITY_ATTRIBUTES,
                  readonly ? OPEN_EXISTING : OPEN_ALWAYS,
                  FILE_FLAG_RANDOM_ACCESS
#ifdef NO_MMAP
                  |FILE_FLAG_NO_BUFFERING
#endif
#if 0 // not needed as we do explicit flush ???
                  |FILE_FLAG_WRITE_THROUGH
#endif
                  , NULL);

  if (fh == INVALID_HANDLE_VALUE)
  {
    return GetLastError();
  }

  DWORD highSize;
  fileSize = GetFileSize(fh, &highSize);

  if (fileSize == BAD_POS && (status = GetLastError()) != ok)
  {
    CloseHandle(fh);
    return status;
  }

  assert(highSize == 0);

  mmapSize = fileSize;

  this->sharedName = new char[strlen(sharedName) + 1];
  strcpy(this->sharedName, sharedName);

  if (!readonly && fileSize == 0)
  {
    mmapSize = initSize;
  }

#else
  fh = INVALID_HANDLE_VALUE;

  this->sharedName = NULL;

  mmapSize = fileSize = initSize;

#endif
#if defined(NO_MMAP)

  if (fileSize < mmapSize && !readonly)
  {
    if (SetFilePointer(fh, mmapSize, NULL, FILE_BEGIN) != mmapSize || !SetEndOfFile(fh))
    {
      status = GetLastError();
      CloseHandle(fh);
      return status;
    }
  }

  mmapAddr = (char*)VirtualAlloc(NULL, mmapSize, MEM_COMMIT|MEM_RESERVE,
                                 PAGE_READWRITE);

#ifdef DISKLESS_CONFIGURATION

  if (mmapAddr == NULL)
#else

  DWORD readBytes;

  if (mmapAddr == NULL
      || !ReadFile(fh, mmapAddr, fileSize, &readBytes, NULL) || readBytes != fileSize)
#endif

  {
    status = GetLastError();

    if (fh != INVALID_HANDLE_VALUE)
    {
      CloseHandle(fh);
    }

    return status;
  }

  memset(mmapAddr+fileSize, 0, mmapSize - fileSize);
  mh = NULL;
#else

  mh = CreateFileMapping(fh, FASTDB_SECURITY_ATTRIBUTES, readonly ? PAGE_READONLY : PAGE_READWRITE,
                         0, mmapSize, sharedName);
  status = GetLastError();

  if (mh == NULL)
  {
    if (fh != INVALID_HANDLE_VALUE)
    {
      CloseHandle(fh);
    }

    return status;
  }

  mmapAddr = (char*)MapViewOfFile(mh, readonly
                                  ? FILE_MAP_READ : FILE_MAP_ALL_ACCESS,
                                  0, 0, 0);

  if (mmapAddr == NULL)
  {
    status = GetLastError();
    CloseHandle(mh);

    if (fh != INVALID_HANDLE_VALUE)
    {
      CloseHandle(fh);
    }

    return status;
  }

  if (status != ERROR_ALREADY_EXISTS && mmapSize > fileSize)
    // && osinfo.dwPlatformId != VER_PLATFORM_WIN32_NT)
  {
    // Windows 95 doesn't initialize pages
    memset(mmapAddr+fileSize, 0, mmapSize - fileSize);
  }
#endif

#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
  SYSTEM_INFO systemInfo;

  GetSystemInfo(&systemInfo);

  pageSize = systemInfo.dwPageSize;

  pageMapSize = (mmapSize + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);

  pageMap = new int[pageMapSize];

  memset(pageMap, 0, pageMapSize*sizeof(int));

#endif

#if defined(REPLICATION_SUPPORT)

  db = NULL;

  int nPages = getMaxPages();

  currUpdateCount = new int[nPages];

  if (replicationSupport)
  {
    char* cFileName = new char[strlen(fileName) + 5];
    strcat(strcpy(cFileName, fileName), ".cnt");

#ifdef DISKLESS_CONFIGURATION

    cfh = INVALID_HANDLE_VALUE;
#else

    cfh = CreateFile(cFileName, GENERIC_READ|GENERIC_WRITE,
                     0, NULL, OPEN_ALWAYS,
                     FILE_FLAG_RANDOM_ACCESS|FILE_FLAG_WRITE_THROUGH,
                     NULL);
    delete[] cFileName;

    if (cfh == INVALID_HANDLE_VALUE)
    {
      status = errno;
      return status;
    }

#endif
    cmh = CreateFileMapping(cfh, NULL, PAGE_READWRITE, 0,
                            nPages*sizeof(int), NULL);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -