📄 file.cpp
字号:
//-< FILE.CPP >------------------------------------------------------*--------*// FastDB Version 1.0 (c) 1999 GARRET * ? *// (Main Memory Database Management System) * /\| *// * / \ *// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *// Last update: 10-Dec-98 K.A. Knizhnik * GARRET *//-------------------------------------------------------------------*--------*// System dependent implementation of mapped on memory file//-------------------------------------------------------------------*--------*#define INSIDE_FASTDB#include "stdtp.h"#include "file.h"#include "database.h"BEGIN_FASTDB_NAMESPACE#if FUZZY_CHECKPOINTstruct dbWriteQuery { dbWriteQuery* next; size_t pos; char page[dbPageSize];}; #define DEFAULT_QUEUE_LENGTH_LIMIT 16*1024class dbFileWriter { private: dbLocalSemaphore event; dbMutex mutex; dbWriteQuery* last; dbWriteQuery* first; dbWriteQuery* free; int queueLength; int queueLengthLimit; int underflow; int overflow; int running; dbThread thread; dbFile* file; static void thread_proc writeLoop(void* arg) { ((dbFileWriter*)arg)->write(); } void write() { mutex.lock(); while (true) { dbWriteQuery* query = first; if (query != NULL) { queueLength -= 1; if (overflow) { overflow = false; event.signal(); } first = query->next; mutex.unlock(); file->write(query->pos, query->page, dbPageSize); mutex.lock(); query->next = free; free = query; } else { if (!running) { break; } underflow = true; event.wait(mutex); } } mutex.unlock(); } public: dbFileWriter(dbFile* file) { this->file = file; queueLength = 0; queueLengthLimit = DEFAULT_QUEUE_LENGTH_LIMIT; event.open(); first = last = NULL; free = NULL; overflow = underflow = false; running = true; thread.create(&writeLoop, this); } void setQueueLimit(size_t limit) { queueLengthLimit = limit; } ~dbFileWriter() { mutex.lock(); running = false; event.signal(); mutex.unlock(); thread.join(); dbWriteQuery* query = first; while (query != NULL) { dbWriteQuery* next = query->next; delete query; query = next; } event.close(); } void put(size_t pos, void* page, size_t size) { char* beg = (char*)page; char* end = beg + size; while (beg < end) { put(pos, beg); pos += dbPageSize; beg += dbPageSize; } } void put(size_t pos, void* page) { mutex.lock(); while (queueLength >= queueLengthLimit) { overflow = true; event.wait(mutex); } dbWriteQuery* query = free; if (query == NULL) { query = new dbWriteQuery(); } else { free = query->next; } if (first == NULL) { first = query; } else { last->next = query; } queueLength += 1; last = query; query->next = NULL; query->pos = pos; memcpy(query->page, page, dbPageSize); if (underflow) { underflow = false; event.signal(); } mutex.unlock(); }};void dbFile::setCheckpointBufferSize(size_t nPages) { writer->setQueueLimit(nPages);}#endifdbFile::dbFile(){ sharedName = NULL; mmapAddr = NULL; mmapSize = 0; readonly = false;#ifdef REPLICATION_SUPPORT currUpdateCount = NULL; diskUpdateCount = NULL; rootPage = NULL; db = NULL;#endif}dbFile::~dbFile(){}#if defined(REPLICATION_SUPPORT) || defined(NO_MMAP)const int dbMaxSyncSegmentSize = 128*1024 / dbModMapBlockSize; #endif#ifdef REPLICATION_SUPPORT#include "database.h"int dbFile::dbSyncTimeout = 1000; // one secondbool dbFile::updatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size){ if (pos + size > mmapSize) { size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2; setSize(newSize, sharedName); } if (s->read(mmapAddr + pos, size)) { int pageNo = pos >> dbModMapBlockBits; if (updateCounter < pageUpdateCounter) { updateCounter = pageUpdateCounter; } while (size > 0) { currUpdateCount[pageNo++] = pageUpdateCounter; size -= dbModMapBlockSize; } return true; } return false;}bool dbFile::concurrentUpdatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size){ if (pos + size > mmapSize) { size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2; db->beginTransaction(dbDatabase::dbCommitLock); setSize(newSize, sharedName); ((dbHeader*)mmapAddr)->size = newSize; db->baseAddr = (byte*)mmapAddr; db->header = (dbHeader*)mmapAddr; db->version = db->monitor->version += 1; } if (pos == 0 && size <= pageSize) { if (!s->read(rootPage, size)) { return false; } if (((dbHeader*)rootPage)->curr != ((dbHeader*)mmapAddr)->curr) { db->beginTransaction(dbDatabase::dbCommitLock); memcpy(mmapAddr, rootPage, size); // now readers will see updated data db->monitor->curr ^= 1; db->endTransaction(); } else { memcpy(mmapAddr, rootPage, size); } } else { if (!s->read(mmapAddr + pos, size)) { return false; } } int pageNo = pos >> dbModMapBlockBits; if (updateCounter < pageUpdateCounter) { updateCounter = pageUpdateCounter; } while (size > 0) { currUpdateCount[pageNo++] = pageUpdateCounter; size -= dbModMapBlockSize; } return true;}int dbFile::getUpdateCountTableSize() { int nPages = mmapSize >> dbModMapBlockBits; while (--nPages >= 0 && diskUpdateCount[nPages] == 0); return nPages + 1;}int dbFile::getMaxPages() { return 1 << (dbDatabaseOffsetBits - dbModMapBlockBits);}void dbFile::startSync(){#ifndef DISKLESS_CONFIGURATION doSync = true; closing = false; syncEvent.reset(); syncThread.create(startSyncToDisk, this);#endif}void dbFile::stopSync(){#ifndef DISKLESS_CONFIGURATION doSync = false; syncEvent.signal(); syncThread.join();#endif}void thread_proc dbFile::startSyncToDisk(void* arg){ ((dbFile*)arg)->syncToDisk();}void thread_proc dbFile::startRecovery(void* arg){ RecoveryRequest* rr = (RecoveryRequest*)arg; rr->file->doRecovery(rr->nodeId, rr->updateCounters, rr->nPages); { dbCriticalSection cs(rr->file->replCS); if (--rr->file->nRecovered == 0) { rr->file->recoveredEvent.signal(); } } delete rr;}void dbFile::recovery(int nodeId, int* updateCounters, int nPages){ RecoveryRequest* rr = new RecoveryRequest; rr->nodeId = nodeId; rr->updateCounters = updateCounters; rr->nPages = nPages; rr->file = this; { dbCriticalSection cs(replCS); if (nRecovered++ == 0) { recoveredEvent.reset(); } } dbThread recoveryThread; recoveryThread.create(startRecovery, rr); recoveryThread.setPriority(dbThread::THR_PRI_HIGH);}void dbFile::doRecovery(int nodeId, int* updateCounters, int nPages){ ReplicationRequest rr; memset(updateCounters+nPages, 0, (getMaxPages() - nPages)*sizeof(int)); int i, j, n; if (db->con[nodeId].reqSock == NULL) { char buf[256]; socket_t* s = socket_t::connect(db->serverURL[nodeId], socket_t::sock_global_domain, dbReplicatedDatabase::dbRecoveryConnectionAttempts); if (!s->is_ok()) { s->get_error_text(buf, sizeof buf); dbTrace("Failed to establish connection with node %d: %s\n", nodeId, buf); delete s; return; } rr.op = ReplicationRequest::RR_GET_STATUS; rr.nodeId = db->id; if (!s->write(&rr, sizeof rr) || !s->read(&rr, sizeof rr)) { s->get_error_text(buf, sizeof buf); dbTrace("Connection with node %d is broken: %s\n", nodeId, buf); delete s; return; } if (rr.op != ReplicationRequest::RR_STATUS && rr.status != dbReplicatedDatabase::ST_STANDBY) { dbTrace("Unexpected response from standby node %d: code %d status %d\n", nodeId, rr.op, rr.status); delete s; return; } else { db->addConnection(nodeId, s); } } while (true) { int maxUpdateCount = 0; { dbCriticalSection cs(syncCS); TRACE_MSG(("Database synchronization: mmapSize=%ld\n", mmapSize)); for (i = 0, j = 0, n = mmapSize >> dbModMapBlockBits; i < n; i++) { if (updateCounters[i] > currUpdateCount[i]) { updateCounters[i] = 0; } else { if (updateCounters[i] > maxUpdateCount) { maxUpdateCount = updateCounters[i]; } } if (i > j && (currUpdateCount[i] <= updateCounters[i] || i-j >= dbMaxSyncSegmentSize || currUpdateCount[i] != currUpdateCount[j])) { rr.op = ReplicationRequest::RR_UPDATE_PAGE; rr.nodeId = nodeId; rr.size = (i-j)*dbModMapBlockSize; rr.page.offs = (size_t)j << dbModMapBlockBits; rr.page.updateCount = currUpdateCount[j]; TRACE_MSG(("Send segment [%lx, %ld]\n", rr.page.offs, rr.size)); if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size)) { delete[] updateCounters; return; } j = i; } if (i >= nPages || currUpdateCount[i] > updateCounters[i]) { if (currUpdateCount[i] > maxUpdateCount) { maxUpdateCount = currUpdateCount[i]; } updateCounters[i] = currUpdateCount[i]; } else { j = i + 1; } } if (i != j) { rr.op = ReplicationRequest::RR_UPDATE_PAGE; rr.nodeId = nodeId; rr.size = (i-j)*dbModMapBlockSize; rr.page.offs = (size_t)j << dbModMapBlockBits; rr.page.updateCount = currUpdateCount[j]; TRACE_MSG(("Send segment [%lx, %ld]\n", rr.page.offs, rr.size)); if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size)) { delete[] updateCounters; return; } } } { dbCriticalSection cs(replCS); if (maxUpdateCount == updateCounter) { dbTrace("Complete recovery of node %d\n", nodeId); delete[] updateCounters; rr.op = ReplicationRequest::RR_STATUS; rr.nodeId = nodeId; db->con[nodeId].status = rr.status = dbReplicatedDatabase::ST_STANDBY; for (i = 0, n = db->nServers; i < n; i++) { if (db->con[i].status != dbReplicatedDatabase::ST_OFFLINE && i != db->id) { db->writeReq(i, rr); } } return; } } }}#endifbool dbFile::write(void const* buf, size_t size){ size_t writtenBytes; bool result = write(buf, writtenBytes, size) == ok && writtenBytes == size; return result;}#ifdef _WIN32class OS_info : public OSVERSIONINFO { public: OS_info() { dwOSVersionInfoSize = sizeof(OSVERSIONINFO); GetVersionEx(this); }};static OS_info osinfo;#define BAD_POS 0xFFFFFFFF // returned by SetFilePointer and GetFileSizeint dbFile::erase(){ return ok;}int dbFile::open(char const* fileName, char const* sharedName, bool readonly, size_t initSize, bool replicationSupport){ int status; size_t fileSize; this->readonly = readonly;#ifndef DISKLESS_CONFIGURATION fh = CreateFile(W32_STRING(fileName), readonly ? GENERIC_READ : (GENERIC_READ|GENERIC_WRITE), FILE_SHARE_READ | FILE_SHARE_WRITE, FASTDB_SECURITY_ATTRIBUTES, readonly ? OPEN_EXISTING : OPEN_ALWAYS,#ifdef _WINCE FILE_ATTRIBUTE_NORMAL#else FILE_FLAG_RANDOM_ACCESS#ifdef NO_MMAP |FILE_FLAG_NO_BUFFERING#endif#if 0 // not needed as we do explicit flush ??? |FILE_FLAG_WRITE_THROUGH#endif#endif , NULL); if (fh == INVALID_HANDLE_VALUE) { return GetLastError(); } DWORD highSize; fileSize = GetFileSize(fh, &highSize); if (fileSize == BAD_POS && (status = GetLastError()) != ok) { CloseHandle(fh); return status; } assert(highSize == 0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -