database.cpp

来自「FastDb是高效的内存数据库系统」· C++ 代码 · 共 2,469 行 · 第 1/5 页

CPP
2,469
字号
    sprintf(databaseName, "%s.%d", dbName, version);
    int rc = file.open(fileName, databaseName,
                       accessType == dbReadOnly || accessType == dbConcurrentRead, fileSize, false);

    if (rc != dbFile::ok)
    {
      char msgbuf[64];
      file.errorText(rc, msgbuf, sizeof msgbuf);
      TRACE_MSG(("File open error: %s\n", msgbuf));
      handleError(DatabaseOpenError, "Failed to create database file");
      return false;
    }

    baseAddr = (byte*)file.getAddr();
    fileSize = file.getSize();
    header = (dbHeader*)baseAddr;
    updatedRecordId = 0;

    if ((unsigned)header->curr > 1)
    {
      handleError(DatabaseOpenError, "Database file was corrupted: "
                  "invalid root index");
      return false;
    }

    if (header->initialized != 1)
    {
      if (accessType == dbReadOnly || accessType == dbConcurrentRead)
      {
        handleError(DatabaseOpenError, "Can not open uninitialized "
                    "file in read only mode");
        return false;
      }

      monitor->curr = header->curr = 0;
      header->size = fileSize;
      size_t used = dbPageSize;
      header->root[0].index = used;
      header->root[0].indexSize = indexSize;
      header->root[0].indexUsed = dbFirstUserId;
      header->root[0].freeList = 0;
      used += indexSize*sizeof(offs_t);
      header->root[1].index = used;
      header->root[1].indexSize = indexSize;
      header->root[1].indexUsed = dbFirstUserId;
      header->root[1].freeList = 0;
      used += indexSize*sizeof(offs_t);

      header->root[0].shadowIndex = header->root[1].index;
      header->root[1].shadowIndex = header->root[0].index;
      header->root[0].shadowIndexSize = indexSize;
      header->root[1].shadowIndexSize = indexSize;

      header->majorVersion= FASTDB_MAJOR_VERSION;
      header->minorVersion = FASTDB_MINOR_VERSION;

      index[0] = (offs_t*)(baseAddr + header->root[0].index);
      index[1] = (offs_t*)(baseAddr + header->root[1].index);
      index[0][dbInvalidId] = dbFreeHandleMarker;

      size_t bitmapPages =
        (used + dbPageSize*(dbAllocationQuantum*8-1) - 1)
        / (dbPageSize*(dbAllocationQuantum*8-1));
      memset(baseAddr+used, 0xFF, (used + bitmapPages*dbPageSize)
             / (dbAllocationQuantum*8));
      size_t i;

      for (i = 0; i < bitmapPages; i++)
      {
        index[0][dbBitmapId + i] = used + dbPageObjectMarker;
        used += dbPageSize;
      }

      while (i < dbBitmapPages)
      {
        index[0][dbBitmapId+i] = dbFreeHandleMarker;
        i += 1;
      }

      currIndex = index[0];
      currIndexSize = dbFirstUserId;
      committedIndexSize = 0;
      initializeMetaTable();
      header->dirty = true;
      memcpy(index[1], index[0], currIndexSize*sizeof(offs_t));
      file.markAsDirty(0, used);
      file.flush(true);
      header->initialized = true;
      file.markAsDirty(0, sizeof(dbHeader));
      file.flush(true);
    }
    else
    {
      monitor->curr = header->curr;

      if (header->dirty)
      {
        TRACE_MSG(("Database was not normally closed: "
                   "start recovery\n"));

        if (accessType == dbReadOnly || accessType == dbConcurrentRead)
        {
          handleError(DatabaseOpenError,
                      "Can not open dirty file in read only moode");
          return false;
        }

        recovery();
        TRACE_MSG(("Recovery completed\n"));
      }
      else
      {
        if (file.getSize() != header->size)
        {
          handleError(DatabaseOpenError, "Database file was "
                      "corrupted: file size in header differs "
                      "from actual file size");
          return false;
        }
      }
    }

    if (!loadScheme(true))
    {
      return false;
    }

    initMutex.done();
  }
  else
  {
    sprintf(name, "%s.cs", dbName);

    if (!cs.open(name, &monitor->sem))
    {
      handleError(DatabaseOpenError, "Failed to open shared semaphore");
      return false;
    }

    if (accessType == dbConcurrentUpdate)
    {
      sprintf(name, "%s.mcs", dbName);

      if (!mutatorCS.open(name, &monitor->mutatorSem))
      {
        handleError(DatabaseOpenError, "Failed to open shared semaphore");
        return false;
      }
    }

    version = 0;

    if (!loadScheme(false))
    {
      return false;
    }
  }

  cs.enter();
  monitor->users += 1;
  cs.leave();
  opened = true;

  if (commitDelaySec != 0)
  {
    dbCriticalSection cs(delayedCommitStartTimerMutex);
    commitTimeout = commitDelay = commitDelaySec;
    commitThread.create((dbThread::thread_proc_t)delayedCommitProc, this);
    commitThreadSyncEvent.wait(delayedCommitStartTimerMutex);
  }

  return true;
}

void dbDatabase::scheduleBackup(char const* fileName, time_t period)
{
  if (backupFileName == NULL)
  {
    backupFileName = new char[strlen(fileName) + 1];
    strcpy(backupFileName, fileName);
    backupPeriod = period;
    backupThread.create((dbThread::thread_proc_t)backupSchedulerProc, this);
  }
}

void dbDatabase::backupScheduler()
{
  backupThread.setPriority(dbThread::THR_PRI_LOW);
  dbCriticalSection cs(backupMutex);

  while (true)
  {
    time_t timeout = backupPeriod;

    if (backupFileName[strlen(backupFileName)-1] != '?')
    {

      struct stat st;

      if (::stat(backupFileName, &st) == 0)
      {
        time_t howOld = time(NULL) - st.st_atime;

        if (timeout < howOld)
        {
          timeout = 0;
        }
        else
        {
          timeout -= howOld;
        }
      }
    }

    backupInitEvent.wait(backupMutex, timeout*1000);

    if (backupFileName != NULL)
    {
      if (backupFileName[strlen(backupFileName)-1] == '?')
      {
        time_t currTime = time(NULL);
        char* fileName = new char[strlen(backupFileName) + 32];

        struct tm* t = localtime(&currTime);
        sprintf(fileName, "%.*s-%04d.%02d.%02d_%02d.%02d.%02d",
                (int)strlen(backupFileName)-1, backupFileName,
                t->tm_year + 1900, t->tm_mon+1, t->tm_mday,
                t->tm_hour, t->tm_min, t->tm_sec);
        backup(fileName, false);
        delete[] fileName;
      }
      else
      {
        char* newFileName = new char[strlen(backupFileName) + 5];
        sprintf(newFileName,"%s.new", backupFileName);
        backup(newFileName, false);

        ::remove
          (backupFileName);

        ::rename(newFileName, backupFileName);

        delete[] newFileName;
      }
    }
    else
    {
      return;
    }
  }
}


void dbDatabase::recovery()
{
  int curr = header->curr;
  header->size = file.getSize();
  header->root[1-curr].indexUsed = header->root[curr].indexUsed;
  header->root[1-curr].freeList = header->root[curr].freeList;
  header->root[1-curr].index = header->root[curr].shadowIndex;
  header->root[1-curr].indexSize =
    header->root[curr].shadowIndexSize;
  header->root[1-curr].shadowIndex = header->root[curr].index;
  header->root[1-curr].shadowIndexSize =
    header->root[curr].indexSize;

  offs_t* dst = (offs_t*)(baseAddr+header->root[1-curr].index);
  offs_t* src = (offs_t*)(baseAddr+header->root[curr].index);
  currIndex = dst;

  for (int i = 0, n = header->root[curr].indexUsed; i < n; i++)
  {
    if (dst[i] != src[i])
    {
      dst[i] = src[i];
      file.markAsDirty(header->root[1-curr].index + i*sizeof(offs_t), sizeof(offs_t));
    }
  }

  //
  // Restore consistency of table rows l2-list
  //
  restoreTablesConsistency();

  file.markAsDirty(0, sizeof(dbHeader));
}

void dbDatabase::restoreTablesConsistency()
{
  dbTable* table = (dbTable*)getRow(dbMetaTableId);
  oid_t lastId = table->lastRow;

  if (lastId != 0)
  {
    dbRecord* record = getRow(lastId);

    if (record->next != 0)
    {
      record->next = 0;
      file.markAsDirty(currIndex[lastId], sizeof(dbTable));
    }
  }

  oid_t tableId = table->firstRow;

  while (tableId != 0)
  {
    table = (dbTable*)getRow(tableId);
    lastId = table->lastRow;

    if (lastId != 0)
    {
      dbRecord* record = getRow(lastId);

      if (record->next != 0)
      {
        record->next = 0;
        file.markAsDirty(currIndex[lastId], sizeof(dbTable));
      }
    }

    tableId = table->next;
  }
}

void dbDatabase::setConcurrency(unsigned nThreads)
{
  if (nThreads == 0)
  { // autodetect number of processors
    nThreads = dbThread::numberOfProcessors();
  }

  if (nThreads > dbMaxParallelSearchThreads)
  {
    nThreads = dbMaxParallelSearchThreads;
  }

  parThreads = nThreads;
}


bool dbDatabase::loadScheme(bool alter)
{
  if (!beginTransaction(accessType != dbReadOnly && accessType != dbConcurrentRead
                        ? dbExclusiveLock : dbSharedLock))
  {
    return false;
  }

  dbTableDescriptor *desc, *next;
  dbTable* metaTable = (dbTable*)getRow(dbMetaTableId);
  oid_t first = metaTable->firstRow;
  oid_t last = metaTable->lastRow;
  int nTables = metaTable->nRows;
  oid_t tableId = first;

  for (desc = dbTableDescriptor::chain; desc != NULL; desc = next)
  {
    next = desc->next;

    if (desc->db != NULL && desc->db != DETACHED_TABLE && desc->db != this)
    {
      continue;
    }

    if (desc->db == DETACHED_TABLE)
    {
      desc = desc->clone();
    }

    dbFieldDescriptor* fd;

    for (fd = desc->firstField; fd != NULL; fd = fd->nextField)
    {
      fd->tTree = 0;
      fd->hashTable = 0;
      fd->attr &= ~dbFieldDescriptor::Updated;
    }

    int n = nTables;

    while (--n >= 0)
    {
      dbTable* table = (dbTable*)getRow(tableId);
      oid_t next = table->next;

      if (strcmp(desc->name, (char*)table + table->name.offs) == 0)
      {
        if (!desc->equal(table))
        {
          if (!alter)
          {
            handleError(DatabaseOpenError, "Incompatible class"
                        " definition in application");
            return false;
          }

          beginTransaction(dbExclusiveLock);
          modified = true;

          if (table->nRows == 0)
          {
            TRACE_MSG(("Replace definition of table '%s'\n",
                       desc->name));
            updateTableDescriptor(desc, tableId);
          }
          else
          {
            reformatTable(tableId, desc);
          }
        }
        else
        {
          linkTable(desc, tableId);
        }

        desc->setFlags();
        break;
      }

      if (tableId == last)
      {
        tableId = first;
      }
      else
      {
        tableId = next;
      }
    }

    if (n < 0)
    { // no match found

      if (accessType == dbReadOnly || accessType == dbConcurrentRead)
      {
        handleError(DatabaseOpenError, "New table definition can not "
                    "be added to read only database");
        return false;
      }
      else
      {
        TRACE_MSG(("Create new table '%s' in database\n", desc->name));
        addNewTable(desc);
        modified = true;
      }
    }

    if (accessType != dbReadOnly && accessType != dbConcurrentRead)
    {
      if (!addIndices(alter, desc))
      {
        handleError(DatabaseOpenError, "Failed to alter indices with"
                    " active applications");
        rollback();
        return false;
      }
    }
  }

  for (desc = tables; desc != NULL; desc = desc->nextDbTable)
  {
    if (desc->cloneOf != NULL)
    {
      for (dbFieldDescriptor *fd = desc->firstField; fd != NULL; fd = fd->nextField)
      {
        if (fd->refTable != NULL)
        {
          fd->refTable = lookupTable(fd->refTable);
        }
      }
    }

    desc->checkRelationship();
  }

  commit();
  return true;
}


void dbDatabase::reformatTable(oid_t tableId, dbTableDescriptor* desc)
{
  dbTable* table = (dbTable*)putRow(tableId);

  if (desc->match(table, confirmDeleteColumns))
  {
    TRACE_MSG(("New version of table '%s' is compatible with old one\n",
               desc->name));
    updateTableDescriptor(desc, tableId);
  }
  else
  {
    TRACE_MSG(("

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?