📄 file.cpp
字号:
bool rc = VirtualProtect(mmapAddr + pos, DOALIGN(size, pageSize), PAGE_READONLY, &oldProtect);
assert(rc);
}
void dbFile::unprotect(size_t pos, size_t size)
{
PDWORD oldProtect;
bool rc = VirtualProtect(mmapAddr + pos, DOALIGN(size, pageSize), PAGE_READWRITE, &oldProtect);
assert(rc);
}
#endif
int dbFile::open(char const* fileName, char const* sharedName, bool readonly,
size_t initSize, bool replicationSupport)
{
int status;
size_t fileSize;
this->readonly = readonly;
#ifndef DISKLESS_CONFIGURATION
#if defined(_WINCE) && !defined(NO_MMAP)
fh = CreateFileForMapping
#else
fh = CreateFile
#endif
(W32_STRING(fileName),
readonly ? GENERIC_READ : (GENERIC_READ|GENERIC_WRITE),
FILE_SHARE_READ | FILE_SHARE_WRITE,
FASTDB_SECURITY_ATTRIBUTES,
readonly ? OPEN_EXISTING : OPEN_ALWAYS,
#ifdef _WINCE
FILE_ATTRIBUTE_NORMAL
#else
FILE_FLAG_RANDOM_ACCESS
#ifdef NO_MMAP
|FILE_FLAG_NO_BUFFERING
#endif
#if 0 // not needed as we do explicit flush ???
|FILE_FLAG_WRITE_THROUGH
#endif
#endif
, NULL);
if (fh == INVALID_HANDLE_VALUE) {
return GetLastError();
}
DWORD highSize;
fileSize = GetFileSize(fh, &highSize);
if (fileSize == BAD_POS && (status = GetLastError()) != ok) {
CloseHandle(fh);
return status;
}
assert(highSize == 0);
mmapSize = fileSize;
this->sharedName = new char[strlen(sharedName) + 1];
strcpy(this->sharedName, sharedName);
if (!readonly && fileSize == 0) {
mmapSize = initSize;
}
#else
fh = INVALID_HANDLE_VALUE;
this->sharedName = NULL;
mmapSize = fileSize = initSize;
#endif
#if defined(NO_MMAP)
if (fileSize < mmapSize && !readonly) {
if (SetFilePointer(fh, mmapSize, NULL, FILE_BEGIN) != mmapSize || !SetEndOfFile(fh)) {
status = GetLastError();
CloseHandle(fh);
return status;
}
}
mmapAddr = (char*)VirtualAlloc(NULL, mmapSize, MEM_COMMIT|MEM_RESERVE,
PAGE_READWRITE);
#ifdef DISKLESS_CONFIGURATION
if (mmapAddr == NULL)
#else
DWORD readBytes;
if (mmapAddr == NULL
|| !ReadFile(fh, mmapAddr, fileSize, &readBytes, NULL) || readBytes != fileSize)
#endif
{
status = GetLastError();
if (fh != INVALID_HANDLE_VALUE) {
CloseHandle(fh);
}
return status;
}
memset(mmapAddr+fileSize, 0, mmapSize - fileSize);
mh = NULL;
#else
mh = CreateFileMapping(fh, FASTDB_SECURITY_ATTRIBUTES, readonly ? PAGE_READONLY : PAGE_READWRITE,
0, mmapSize, W32_STRING(sharedName));
status = GetLastError();
if (mh == NULL) {
if (fh != INVALID_HANDLE_VALUE) {
CloseHandle(fh);
}
return status;
}
mmapAddr = (char*)MapViewOfFile(mh, readonly ? FILE_MAP_READ : FILE_MAP_ALL_ACCESS, 0, 0, 0);
if (mmapAddr == NULL) {
status = GetLastError();
CloseHandle(mh);
if (fh != INVALID_HANDLE_VALUE) {
CloseHandle(fh);
}
return status;
}
if (status != ERROR_ALREADY_EXISTS && mmapSize > fileSize)
// && osinfo.dwPlatformId != VER_PLATFORM_WIN32_NT)
{
// Windows 95 doesn't initialize pages
memset(mmapAddr+fileSize, 0, mmapSize - fileSize);
}
#endif
#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
SYSTEM_INFO systemInfo;
GetSystemInfo(&systemInfo);
pageSize = systemInfo.dwPageSize;
pageMapSize = (mmapSize + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);
pageMap = new int[pageMapSize];
memset(pageMap, 0, pageMapSize*sizeof(int));
#endif
#if defined(REPLICATION_SUPPORT)
db = NULL;
int nPages = getMaxPages();
currUpdateCount = new int[nPages];
if (replicationSupport) {
char* cFileName = new char[strlen(fileName) + 5];
strcat(strcpy(cFileName, fileName), ".cnt");
#ifdef DISKLESS_CONFIGURATION
cfh = INVALID_HANDLE_VALUE;
#else
cfh = CreateFile(cFileName, GENERIC_READ|GENERIC_WRITE,
0, NULL, OPEN_ALWAYS,
FILE_FLAG_RANDOM_ACCESS|FILE_FLAG_WRITE_THROUGH,
NULL);
delete[] cFileName;
if (cfh == INVALID_HANDLE_VALUE) {
status = errno;
return status;
}
#endif
cmh = CreateFileMapping(cfh, NULL, PAGE_READWRITE, 0,
nPages*sizeof(int), NULL);
status = GetLastError();
if (cmh == NULL) {
CloseHandle(cfh);
return status;
}
diskUpdateCount = (int*)MapViewOfFile(cmh, FILE_MAP_ALL_ACCESS, 0, 0, 0);
if (diskUpdateCount == NULL) {
status = GetLastError();
CloseHandle(cmh);
CloseHandle(cfh);
return status;
}
rootPage = dbMalloc(pageSize);
int maxCount = 0;
for (int i = 0; i < nPages; i++) {
int count = diskUpdateCount[i];
currUpdateCount[i] = count;
if (count > maxCount) {
maxCount = count;
}
}
updateCounter = maxCount;
nRecovered = 0;
recoveredEvent.open(true);
syncEvent.open();
startSync();
}
#endif
#ifdef FUZZY_CHECKPOINT
writer = new dbFileWriter(this);
#endif
return ok;
}
bool dbFile::write(size_t pos, void const* ptr, size_t size)
{
DWORD written;
if (SetFilePointer(fh, pos, NULL, FILE_BEGIN) != pos ||
!WriteFile(fh, ptr, size, &written, NULL)
|| written != (DWORD)size)
{
dbTrace("Failed to save page to the disk, position=%ld, size=%ld, error=%d\n",
(long)pos, (long)size, GetLastError());
return false;
}
return true;
}
#if defined(REPLICATION_SUPPORT)
void dbFile::syncToDisk()
{
syncThread.setPriority(dbThread::THR_PRI_LOW);
dbCriticalSection cs(syncCS);
while (doSync) {
size_t i, j, k, n;
int maxUpdated = 0;
for (i = 0, n = mmapSize >> dbModMapBlockBits; i < n;) {
int updateCounters[dbMaxSyncSegmentSize];
for (j=i; j < (mmapSize >> dbModMapBlockBits) && j-i < dbMaxSyncSegmentSize
&& currUpdateCount[j] > diskUpdateCount[j]; j++)
{
updateCounters[j-i] = currUpdateCount[j];
}
if (i != j) {
size_t pos = (i << dbModMapBlockBits) & ~(pageSize-1);
size_t size = (((j-i) << dbModMapBlockBits) + pageSize - 1) & ~(pageSize-1);
#ifdef NO_MMAP
write(pos, mmapAddr + pos, size);
#else
FlushViewOfFile(mmapAddr + pos, size);
#endif
for (k = 0; i < j; k++, i++) {
diskUpdateCount[i] = updateCounters[k];
}
maxUpdated = i;
} else {
i += 1;
}
if (!doSync) {
return;
}
}
if (maxUpdated != 0) {
FlushViewOfFile(diskUpdateCount, maxUpdated*sizeof(int));
}
if (closing && maxUpdated == 0) {
return;
} else {
syncEvent.wait(syncCS, dbSyncTimeout);
}
}
}
#endif
int dbFile::create(const char* name, bool noBuffering)
{
fh = CreateFile(W32_STRING(name), GENERIC_READ|GENERIC_WRITE, 0, FASTDB_SECURITY_ATTRIBUTES, CREATE_ALWAYS,
(noBuffering ? FILE_FLAG_NO_BUFFERING : 0)|FILE_FLAG_SEQUENTIAL_SCAN, NULL);
if (fh == INVALID_HANDLE_VALUE) {
return GetLastError();
}
mh = NULL;
mmapAddr = NULL;
sharedName = NULL;
return ok;
}
int dbFile::read(void* buf, size_t& readBytes, size_t size)
{
DWORD count;
if (ReadFile(fh, buf, size, &count, NULL)) {
readBytes = count;
return ok;
} else {
readBytes = 0;
return GetLastError();
}
}
int dbFile::write(void const* buf, size_t& writtenBytes, size_t size)
{
DWORD count;
if (WriteFile(fh, buf, size, &count, NULL)) {
writtenBytes = count;
return ok;
} else {
writtenBytes = 0;
return GetLastError();
}
}
int dbFile::flush(bool physical)
{
#if defined(REPLICATION_SUPPORT)
dbCriticalSection cs(replCS);
if (db == NULL) {
physical = true;
}
if (!physical) {
updateCounter += 1;
}
#endif
#if defined(REPLICATION_SUPPORT) || (defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION))
int* map = pageMap;
for (size_t i = 0, n = pageMapSize; i < n; i++) {
if (map[i] != 0) {
size_t pos = i << (dbModMapBlockBits + 5);
unsigned mask = map[i];
int count = 0;
do {
size_t size = 0;
while ((mask & 1) == 0) {
pos += dbModMapBlockSize;
mask >>= 1;
count += 1;
}
while (true) {
do {
#ifdef REPLICATION_SUPPORT
if (!physical) {
currUpdateCount[(pos + size) >> dbModMapBlockBits] = updateCounter;
}
#endif
size += dbModMapBlockSize;
mask >>= 1;
count += 1;
} while ((mask & 1) != 0);
if (i+1 < n && count == 32 && size < dbMaxSyncSegmentSize*dbModMapBlockSize
&& (map[i+1] & 1) != 0)
{
map[i] = 0;
mask = map[++i];
count = 0;
} else {
break;
}
}
#if defined(REPLICATION_SUPPORT)
if (db != NULL) {
if (!physical) {
for (int j = db->nServers; --j >= 0;) {
if (db->con[j].status == dbReplicatedDatabase::ST_STANDBY) {
ReplicationRequest rr;
rr.op = ReplicationRequest::RR_UPDATE_PAGE;
rr.nodeId = db->id;
rr.page.updateCount = updateCounter;
rr.page.offs = pos;
rr.size = size;
db->writeReq(j, rr, mmapAddr + pos, size);
}
}
}
pos += size;
continue;
}
#endif
#ifndef DISKLESS_CONFIGURATION
#ifdef FUZZY_CHECKPOINT
writer->put(pos, mmapAddr + pos, size);
#else
if (!write(pos, mmapAddr + pos, size)) {
return GetLastError();
}
#endif
#endif
pos += size;
} while (mask != 0);
map[i] = 0;
}
}
#endif
#if !defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION) && !defined(REPLICATION_SUPPORT) && !defined(NO_FLUSH_ON_COMMIT)
if (!FlushViewOfFile(mmapAddr, mmapSize)) {
return GetLastError();
}
#endif
return ok;
}
int dbFile::setSize(size_t size, char const* sharedName, bool initialize)
{
#if defined(REPLICATION_SUPPORT)
dbCriticalSection cs1(syncCS);
dbCriticalSection cs2(replCS);
#endif
#ifdef DISKLESS_CONFIGURATION
assert(false);
#else
#ifdef NO_MMAP
char* newBuf = (char*)VirtualAlloc(NULL, size, MEM_COMMIT|MEM_RESERVE, PAGE_READWRITE);
if (newBuf == NULL) {
return GetLastError();
}
if (SetFilePointer(fh, size, NULL, FILE_BEGIN) != size || !SetEndOfFile(fh)) {
return GetLastError();
}
memcpy(newBuf, mmapAddr, mmapSize);
VirtualFree(mmapAddr, 0, MEM_RELEASE);
mmapAddr = newBuf;
mmapSize = size;
#else
if (!UnmapViewOfFile(mmapAddr) || !CloseHandle(mh)) {
return GetLastError();
}
mh = CreateFileMapping(fh, FASTDB_SECURITY_ATTRIBUTES, readonly ? PAGE_READONLY : PAGE_READWRITE,
0, size, W32_STRING(sharedName));
int status = GetLastError();
if (mh == NULL) {
printf("CreateFileMapping failed: %d\n", status);
return status;
}
mmapAddr = (char*)MapViewOfFile(mh, readonly ? FILE_MAP_READ : FILE_MAP_ALL_ACCESS, 0, 0, 0);
if (mmapAddr == NULL) {
return GetLastError();
}
if (initialize && status != ERROR_ALREADY_EXISTS)
//&& osinfo.dwPlatformId != VER_PLATFORM_WIN32_NT)
{
// Windows 95 doesn't initialize pages
memset(mmapAddr+mmapSize, 0, size - mmapSize);
}
mmapSize = size;
#endif
#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
int newPageMapSize = (size + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);
int* newPageMap = new int[newPageMapSize];
memcpy(newPageMap, pageMap, pageMapSize*sizeof(int));
memset(newPageMap + pageMapSize, 0,
(newPageMapSize-pageMapSize)*sizeof(int));
delete[] pageMap;
pageMapSize = newPageMapSize;
pageMap = newPageMap;
#endif
#endif
return ok;
}
int dbFile::close()
{
delete[] sharedName;
#if defined(REPLICATION_SUPPORT)
if (db != NULL) {
closing = true;
stopSync();
{
dbCriticalSection cs(replCS);
if (nRecovered != 0) {
recoveredEvent.wait(replCS);
}
}
syncEvent.close();
recoveredEvent.close();
UnmapViewOfFile(diskUpdateCount);
CloseHandle(cmh);
CloseHandle(cfh);
}
delete[] currUpdateCount;
currUpdateCount = NULL;
dbFree(rootPage);
rootPage = NULL;
#endif
if (mmapAddr != NULL) {
#if defined(NO_MMAP)
int rc = flush();
if (rc != ok) {
return rc;
}
#ifdef FUZZY_CHECKPOINT
delete writer;
#endif
VirtualFree(mmapAddr, 0, MEM_RELEASE);
delete[] pageMap;
#else
if (!UnmapViewOfFile(mmapAddr)) {
return GetLastError();
}
#if defined(REPLICATION_SUPPORT)
delete[] pageMap;
#endif
#endif
}
if (mh != NULL) {
if (!CloseHandle(mh)) {
return GetLastError();
}
}
return fh == INVALID_HANDLE_VALUE || CloseHandle(fh) ? ok : GetLastError();
}
char* dbFile::errorText(int code, char* buf, size_t bufSize)
{
#ifndef PHAR_LAP
#if defined(_WINCE) || defined(UNICODE)
wchar_t cnvBuf[CNV_BUF_SIZE];
int len = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
NULL,
code,
0,
cnvBuf,
CNV_BUF_SIZE-1,
NULL);
cnvBuf[CNV_BUF_SIZE-1] = '\0';
len = wcstombs(buf, cnvBuf, bufSize);
#else
int len = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
NULL,
code,
0,
buf,
bufSize-1,
NULL);
#endif
if (len == 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -