⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 file.cpp

📁 FastDb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 3 页
字号:
}

#ifdef USE_SYSV_SHARED_MEMORY
if (!shmem.open(name, initSize))
{
  status = errno;

  if (fd >= 0)
  {
    ::close(fd);
  }

  return status;
}

mmapSize = initSize;
mmapAddr = shmem.get_base();
#else

mmapAddr = (char*)valloc(mmapSize);

if (mmapAddr == NULL)
{
  status = errno;

  if (fd >= 0)
  {
    ::close(fd);
  }

  return status;
}

#endif
lseek(fd, 0, SEEK_SET);

if ((size_t)::read(fd, mmapAddr, fileSize) != fileSize)
{
#ifdef USE_SYSV_SHARED_MEMORY
  shmem.close();
#else

free(mmapAddr);
#endif

  mmapAddr = NULL;
  status = errno;

  if (fd >= 0)
  {
    ::close(fd);
  }

  return status;
}

#else  // NO_MMAP
mmapAddr = (char*)mmap(NULL, mmapSize,
                       readonly ? PROT_READ : PROT_READ|PROT_WRITE,
                       mmap_attr, fd, 0);

if (mmapAddr == (char*)-1)
{
  status = errno;
  mmapAddr = NULL;

  if (fd >= 0)
  {
    ::close(fd);
  }

  return status;
}

#endif // NO_MMAP
#endif // USE_SYSV_SHARED_MEMORY && DISKLESS_CONIFIGURATION
#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
  pageSize = getpagesize();

  pageMapSize = (mmapSize + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);

  pageMap = new int[pageMapSize];

  memset(pageMap, 0, pageMapSize*sizeof(int));

#endif
#if defined(REPLICATION_SUPPORT)

  db = NULL;

  int nPages = getMaxPages();

  currUpdateCount = new int[nPages];

  if (replicationSupport)
  {
    char* cFileName = new char[strlen(name) + 5];
    strcat(strcpy(cFileName, name), ".cnt");
#ifndef DISKLESS_CONFIGURATION

    cfd = ::open(cFileName, O_RDWR|O_DSYNC|O_CREAT, 0666);
    delete[] cFileName;

    if (cfd < 0)
    {
      return errno;
    }

    if (ftruncate(cfd, nPages*sizeof(int)) != ok)
    {
      status = errno;
      ::close(cfd);
      return status;
    }

#else
int mmap_attr = MAP_SHARED;

#ifndef MAP_ANONYMOUS

cfd = ::open("/dev/zero", O_RDONLY, 0);

#else

cfd = -1;

mmap_attr |= MAP_ANONYMOUS;

#endif
#endif

    diskUpdateCount = (int*)mmap(NULL, nPages*sizeof(int),
                                 PROT_READ|PROT_WRITE, mmap_attr, cfd, 0);

    if (diskUpdateCount == (int*)-1)
    {
      int status = errno;
      diskUpdateCount = NULL;

      if (cfd >= 0)
      {
        ::close(cfd);
      }

      return status;
    }

    int maxCount = 0;
    rootPage = dbMalloc(pageSize);

    for (int i = 0; i < nPages; i++)
    {
      int count = diskUpdateCount[i];
      currUpdateCount[i] = count;

      if (count > maxCount)
      {
        maxCount = count;
      }
    }

    updateCounter = maxCount;
    nRecovered = 0;
    recoveredEvent.open(true);
    syncEvent.open();
    startSync();
  }

#endif
  return ok;
}


#if defined(REPLICATION_SUPPORT)
void dbFile::syncToDisk()
{
  syncThread.setPriority(dbThread::THR_PRI_LOW);
  dbCriticalSection cs(syncCS);

  while (doSync)
  {
    int i, j, k;
    int maxUpdated = 0;

    for (i = 0; i < int(mmapSize >> dbModMapBlockBits);)
    {
      int updateCounters[dbMaxSyncSegmentSize];

      for (j=i; j < (int)(mmapSize >> dbModMapBlockBits) && j-i < dbMaxSyncSegmentSize
           && currUpdateCount[j] > diskUpdateCount[j]; j++)
      {
        updateCounters[j-i] = currUpdateCount[j];
      }

      if (i != j)
      {
        size_t pos = (i << dbModMapBlockBits) & ~(pageSize-1);
        size_t size = (((j-i) << dbModMapBlockBits) + pageSize - 1) & ~(pageSize-1);
#ifdef NO_MMAP

        if (lseek(fd, pos, SEEK_SET) != pos
            || ::write(fd, mmapAddr + pos, size) != size)
        {
          dbTrace("Failed to save page to the disk, position=%ld, size=%ld, error=%d\n",
                  (long)pos, (long)size, errno);
        }

#else
msync(mmapAddr + pos, size, MS_SYNC);

#endif

        for (k = 0; i < j; k++, i++)
        {
          diskUpdateCount[i] = updateCounters[k];
        }

        maxUpdated = i;
      }
      else
      {
        i += 1;
      }

      if (!doSync)
      {
        return;
      }
    }

    if (maxUpdated != 0)
    {
      msync(diskUpdateCount, maxUpdated*sizeof(int), MS_SYNC);
    }

    if (closing && maxUpdated == 0)
    {
      return;
    }
    else
    {
      syncEvent.wait(syncCS, dbSyncTimeout);
    }
  }
}

#endif


int dbFile::create(const char* name, bool)
{
  mmapAddr = NULL;
  fd = ::open(name, O_RDWR|O_TRUNC|O_CREAT, 0666);

  if (fd < 0)
  {
    return errno;
  }

  return ok;
}

int dbFile::read(void* buf, size_t& readBytes, size_t size)
{
  long rc = ::read(fd, buf, size);

  if (rc < 0)
  {
    readBytes = 0;
    return errno;
  }

  readBytes = rc;
  return ok;
}

int dbFile::write(void const* buf, size_t& writtenBytes, size_t size)
{
  long rc = ::write(fd, buf, size);

  if (rc < 0)
  {
    writtenBytes = 0;
    return errno;
  }

  writtenBytes = rc;
  return ok;
}

int dbFile::setSize(size_t size, char const*, bool)
{
#ifdef REPLICATION_SUPPORT
  dbCriticalSection cs1(syncCS);
  dbCriticalSection cs2(replCS);
#endif
#if defined(DISKLESS_CONFIGURATION) || defined(USE_SYSV_SHARED_MEMORY)

  assert(false);
#else
#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)

int newPageMapSize = (size + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);
int* newPageMap = new int[newPageMapSize];
memcpy(newPageMap, pageMap, pageMapSize*sizeof(int));
memset(newPageMap + pageMapSize, 0,
       (newPageMapSize-pageMapSize)*sizeof(int));
delete[] pageMap;
pageMapSize = newPageMapSize;
pageMap = newPageMap;
#endif
#ifdef NO_MMAP

char* newBuf = (char*)valloc(size);

if (newBuf == NULL)
{
  return errno;
}

memcpy(newBuf, mmapAddr, mmapSize);
free(mmapAddr);
mmapAddr = newBuf;
mmapSize = size;

if (ftruncate(fd, size) != ok)
{
  return errno;
}

#else
if (munmap(mmapAddr, mmapSize) != ok ||
    ftruncate(fd, size) != ok ||
    (mmapAddr = (char*)mmap(NULL, size, PROT_READ|PROT_WRITE,
                            MAP_SHARED, fd, 0)) == (char*)-1)
{
  return errno;
}

#endif
mmapSize = size;

#endif

  return ok;
}

int dbFile::flush(bool physical)
{
#if defined(REPLICATION_SUPPORT)
  dbCriticalSection cs(replCS);

  if (db == NULL)
  {
    physical = true;
  }

  if (!physical)
  {
    updateCounter += 1;
  }

#endif
#if defined(REPLICATION_SUPPORT) || (defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION))
  int* map = pageMap;

  for (int i = 0, n = pageMapSize; i < n; i++)
  {
    if (map[i] != 0)
    {
      size_t pos = (size_t)i << (dbModMapBlockBits + 5);
      unsigned mask = map[i];
      int count = 0;

      do
      {
        int size = 0;

        while ((mask & 1) == 0)
        {
          pos += dbModMapBlockSize;
          mask >>= 1;
          count += 1;
        }

        while (true)
        {
          do
          {
#ifdef REPLICATION_SUPPORT

            if (!physical)
            {
              currUpdateCount[(pos + size) >> dbModMapBlockBits] = updateCounter;
            }

#endif
            size += dbModMapBlockSize;

            mask >>= 1;

            count += 1;
          }
          while ((mask & 1) != 0);

          if (i+1 < n && count == 32 && size < dbMaxSyncSegmentSize*dbModMapBlockSize
              && (map[i+1] & 1) != 0)
          {
            map[i] = 0;
            mask = map[++i];
            count = 0;
          }
          else
          {
            break;
          }
        }

#if defined(REPLICATION_SUPPORT)
        if (db != NULL)
        {
          if (!physical)
          {
            for (int j = db->nServers; --j >= 0;)
            {
              if (db->con[j].status == dbReplicatedDatabase::ST_STANDBY)
              {
                ReplicationRequest rr;
                rr.op = ReplicationRequest::RR_UPDATE_PAGE;
                rr.nodeId = db->id;
                rr.page.updateCount = updateCounter;
                rr.page.offs = pos;
                rr.size = size;
                db->writeReq(j, rr, mmapAddr + pos, size);
              }
            }
          }

          pos += size;
          continue;
        }

#else
if ((size_t)lseek(fd, pos, SEEK_SET) != pos
    || ::write(fd, mmapAddr + pos, size) != size)
{
  return errno;
}

#endif
        pos += size;
      }
      while (mask != 0);

      map[i] = 0;
    }
  }

#endif
#if !defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION) && !defined(REPLICATION_SUPPORT)
  if (msync(mmapAddr, mmapSize, MS_SYNC) != ok)
  {
    return errno;
  }

#endif
  return ok;
}

int dbFile::erase()
{
#ifdef USE_SYSV_SHARED_MEMORY
  shmem.erase();
#endif

  return ok;
}

int dbFile::close()
{
#if defined(REPLICATION_SUPPORT)

  if (db != NULL)
  {
    closing = true;
    stopSync();
    {
      dbCriticalSection cs(replCS);

      if (nRecovered != 0)
      {
        recoveredEvent.wait(replCS);
      }
    }

    syncEvent.close();
    recoveredEvent.close();
    munmap(diskUpdateCount, getMaxPages()*sizeof(int));

    if (cfd >= 0)
    {
      ::close(cfd);
    }
  }

  delete[] currUpdateCount;
  currUpdateCount = NULL;
  dbFree(rootPage);
  rootPage = NULL;
#endif // REPLICATION_SUPPORT

  if (mmapAddr != NULL)
  {
#ifdef NO_MMAP
    int rc = flush();

    if (rc != ok)
    {
      return rc;
    }

#endif
#ifdef USE_SYSV_SHARED_MEMORY
    shmem.close();

#elif defined(NO_MMAP)

free(mmapAddr);

#else

if (munmap(mmapAddr, mmapSize) != ok)
{
  return errno;
}

#endif
    mmapAddr = NULL;

#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)

    delete[] pageMap;

#endif

  }

  return fd < 0 && ::close(fd) != ok ? errno : ok;
}

char* dbFile::errorText(int code, char* buf, size_t bufSize)
{
  return strncpy(buf, strerror(code), bufSize);
}

#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -