📄 sync.h
字号:
//-< SYNC.H >--------------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *
// Last update: 20-Dec-98 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// Intertask synchonization primitives
//-------------------------------------------------------------------*--------*
#ifndef __SYNC_H__
#define __SYNC_H__
class FASTDB_DLL_ENTRY dbSystem {
public:
static unsigned getCurrentTimeMsec();
};
#ifdef _WIN32
class FASTDB_DLL_ENTRY dbMutex {
CRITICAL_SECTION cs;
public:
dbMutex() {
InitializeCriticalSection(&cs);
}
~dbMutex() {
DeleteCriticalSection(&cs);
}
void lock() {
EnterCriticalSection(&cs);
}
void unlock() {
LeaveCriticalSection(&cs);
}
};
#define thread_proc WINAPI
class FASTDB_DLL_ENTRY dbThread {
HANDLE h;
public:
void create(void (thread_proc* f)(void*), void* arg) {
DWORD threadid;
h = CreateThread(NULL, NULL, LPTHREAD_START_ROUTINE(f), arg,
0, &threadid);
}
enum ThreadPriority {
THR_PRI_LOW,
THR_PRI_HIGH
};
void setPriority(ThreadPriority pri) {
SetThreadPriority(h, pri == THR_PRI_LOW ? THREAD_PRIORITY_IDLE : THREAD_PRIORITY_HIGHEST);
}
void join() {
WaitForSingleObject(h, INFINITE);
CloseHandle(h);
h = NULL;
}
void detach() {
if (h != NULL) {
CloseHandle(h);
h = NULL;
}
}
dbThread() {
h = NULL;
}
~dbThread() {
if (h != NULL) {
CloseHandle(h);
}
}
static int numberOfProcessors() {
#ifdef PHAR_LAP
return 1;
#else
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
return sysinfo.dwNumberOfProcessors;
#endif
}
};
class FASTDB_DLL_ENTRY dbProcessId {
DWORD tid;
public:
bool operator != (dbProcessId const& other) const {
return tid != other.tid;
}
void clear() {
tid = 0;
}
static dbProcessId getCurrent() {
dbProcessId curr;
curr.tid = GetCurrentThreadId();
return curr;
}
};
class FASTDB_DLL_ENTRY dbInitializationMutex {
HANDLE m;
public:
enum initializationStatus {
InitializationError,
AlreadyInitialized,
NotYetInitialized
};
initializationStatus initialize(char const* name) {
initializationStatus status;
m = CreateMutex(NULL, true, name);
if (GetLastError() == ERROR_ALREADY_EXISTS) {
status = WaitForSingleObject(m, INFINITE) == WAIT_OBJECT_0
? AlreadyInitialized : InitializationError;
ReleaseMutex(m);
} else if (m != NULL) {
status = NotYetInitialized;
} else {
status = InitializationError;
}
return status;
}
void done() {
ReleaseMutex(m);
}
bool close() {
CloseHandle(m);
return false;
}
void erase() {
close();
}
};
const int dbMaxSemValue = 1000000;
class FASTDB_DLL_ENTRY dbSemaphore {
protected:
HANDLE s;
public:
bool wait(unsigned msec = INFINITE) {
int rc = WaitForSingleObject(s, msec);
assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
return rc == WAIT_OBJECT_0;
}
void signal(unsigned inc = 1) {
if (inc != 0) {
ReleaseSemaphore(s, inc, NULL);
}
}
void reset() {
while (WaitForSingleObject(s, 0) == WAIT_OBJECT_0);
}
bool open(char const* name, unsigned initValue = 0) {
s = CreateSemaphore(NULL, initValue, dbMaxSemValue, name);
return s != NULL;
}
void close() {
CloseHandle(s);
}
void erase() {
close();
}
};
class FASTDB_DLL_ENTRY dbEvent {
protected:
HANDLE e;
public:
bool wait(unsigned msec = INFINITE) {
int rc = WaitForSingleObject(e, msec);
assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
return rc == WAIT_OBJECT_0;
}
void signal() {
SetEvent(e);
}
void reset() {
ResetEvent(e);
}
bool open(char const* name, bool signaled = false) {
e = CreateEvent(NULL, true, signaled, name);
return e != NULL;
}
void close() {
CloseHandle(e);
}
void erase() {
close();
}
};
class FASTDB_DLL_ENTRY dbLocalSemaphore : public dbSemaphore {
public:
void wait(dbMutex& mutex, time_t timeoutSec) {
mutex.unlock();
int rc = WaitForSingleObject(s, timeoutSec*1000);
assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
mutex.lock();
}
void wait(dbMutex& mutex) {
mutex.unlock();
int rc = WaitForSingleObject(s, INFINITE);
assert(rc == WAIT_OBJECT_0);
mutex.lock();
}
bool open(unsigned initValue = 0) {
return dbSemaphore::open(NULL, initValue);
}
};
class FASTDB_DLL_ENTRY dbLocalEvent : public dbEvent {
public:
void wait(dbMutex& mutex, time_t timeoutSec) {
mutex.unlock();
int rc = WaitForSingleObject(e, timeoutSec*1000);
assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT);
mutex.lock();
}
void wait(dbMutex& mutex) {
mutex.unlock();
int rc = WaitForSingleObject(e, INFINITE);
assert(rc == WAIT_OBJECT_0);
mutex.lock();
}
bool open(bool signaled = false) {
return dbEvent::open(NULL, signaled);
}
};
template<class T>
class dbThreadContext {
int index;
public:
T* get() {
return (T*)TlsGetValue(index);
}
void set(T* value) {
TlsSetValue(index, value);
}
dbThreadContext() {
index = TlsAlloc();
assert(index != TLS_OUT_OF_INDEXES);
}
~dbThreadContext() {
TlsFree(index);
}
};
template<class T>
class dbSharedObject {
T* ptr;
HANDLE h;
public:
bool open(char* name) {
#ifdef NO_MMAP
ptr = new T();
#else
h = CreateFileMapping(INVALID_HANDLE_VALUE,
NULL, PAGE_READWRITE, 0,
sizeof(T), name);
if (h == NULL) {
return false;
}
ptr = (T*)MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0);
if (ptr == NULL) {
CloseHandle(h);
return false;
}
#endif
return true;
}
T* get() { return ptr; }
void close() {
#ifdef NO_MMAP
delete[] ptr;
#else
UnmapViewOfFile(ptr);
CloseHandle(h);
#endif
}
void erase() {
close();
}
};
typedef long sharedsem_t;
class FASTDB_DLL_ENTRY dbGlobalCriticalSection {
HANDLE event;
sharedsem_t* count;
public:
void enter() {
if (InterlockedDecrement(count) != 0) {
// another process is in critical section
int rc = WaitForSingleObject(event, INFINITE);
assert (rc == WAIT_OBJECT_0);
}
}
void leave() {
if (InterlockedIncrement(count) <= 0) {
// some other processes try to enter critical section
SetEvent(event);
}
}
bool open(char const* name, long* count) {
this->count = count;
event = OpenEvent(EVENT_ALL_ACCESS, FALSE, name);
return event != NULL;
}
bool create(char const* name, long* count) {
this->count = count;
*count = 1;
event = CreateEvent(NULL, false, false, name);
return event != NULL;
}
void close() {
CloseHandle(event);
}
void erase() {
close();
}
};
#else // Unix
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/types.h>
#ifdef USE_POSIX_API
#include <semaphore.h>
#include <sys/mman.h>
#include <errno.h>
#else
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/mman.h>
#endif
#define thread_proc
#ifndef NO_PTHREADS
#include <pthread.h>
class dbMutex {
friend class dbLocalEvent;
friend class dbLocalSemaphore;
pthread_mutex_t cs;
public:
dbMutex() {
pthread_mutex_init(&cs, NULL);
}
~dbMutex() {
pthread_mutex_destroy(&cs);
}
void lock() {
pthread_mutex_lock(&cs);
}
void unlock() {
pthread_mutex_unlock(&cs);
}
};
const size_t dbThreadStackSize = 1024*1024;
class dbThread {
pthread_t thread;
public:
void create(void (thread_proc* f)(void*), void* arg) {
pthread_attr_t attr;
pthread_attr_init(&attr);
#if !defined(__linux__)
pthread_attr_setstacksize(&attr, dbThreadStackSize);
#endif
#if defined(_AIX41)
// At AIX 4.1, 4.2 threads are by default created detached
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_UNDETACHED);
#endif
pthread_create(&thread, &attr, (void*(*)(void*))f, arg);
pthread_attr_destroy(&attr);
}
void join() {
void* result;
pthread_join(thread, &result);
}
void detach() {
pthread_detach(thread);
}
enum ThreadPriority {
THR_PRI_LOW,
THR_PRI_HIGH
};
void setPriority(ThreadPriority pri) {
#if defined(PRI_OTHER_MIN) && defined(PRI_OTHER_MAX)
struct sched_param sp;
sp.sched_priority = pri == THR_PRI_LOW ? PRI_OTHER_MIN : PRI_OTHER_MAX;
pthread_setschedparam(thread, SCHED_OTHER, &sp);
#endif
}
static int numberOfProcessors();
};
class dbLocalEvent {
pthread_cond_t cond;
int signaled;
public:
void wait(dbMutex& mutex) {
while (!signaled) {
pthread_cond_wait(&cond, &mutex.cs);
}
}
void wait(dbMutex& mutex, time_t timeout) {
while (!signaled) {
struct timespec abs_ts;
#ifdef PTHREAD_GET_EXPIRATION_NP
struct timespec rel_ts;
rel_ts.tv_sec = timeout;
rel_ts.tv_nsec = 0;
pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
struct timeval cur_tv;
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + timeout;
abs_ts.tv_nsec = cur_tv.tv_usec * 1000;
#endif
pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
}
}
void signal() {
signaled = true;
pthread_cond_broadcast(&cond);
}
void reset() {
signaled = false;
}
void open(bool initValue = false) {
signaled = initValue;
pthread_cond_init(&cond, NULL);
}
void close() {
pthread_cond_destroy(&cond);
}
};
class dbLocalSemaphore {
pthread_cond_t cond;
int count;
public:
void wait(dbMutex& mutex) {
while (count == 0) {
pthread_cond_wait(&cond, &mutex.cs);
}
count -= 1;
}
void wait(dbMutex& mutex, time_t timeout) {
while (count == 0) {
struct timespec abs_ts;
#ifdef PTHREAD_GET_EXPIRATION_NP
struct timespec rel_ts;
rel_ts.tv_sec = timeout;
rel_ts.tv_nsec = 0;
pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
struct timeval cur_tv;
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + timeout;
abs_ts.tv_nsec = cur_tv.tv_usec;
#endif
pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
}
count -= 1;
}
void signal(unsigned inc = 1) {
count += inc;
if (inc > 1) {
pthread_cond_broadcast(&cond);
} else if (inc == 1) {
pthread_cond_signal(&cond);
}
}
void open(unsigned initValue = 0) {
pthread_cond_init(&cond, NULL);
count = initValue;
}
void close() {
pthread_cond_destroy(&cond);
}
};
template<class T>
class dbThreadContext {
pthread_key_t key;
public:
T* get() {
return (T*)pthread_getspecific(key);
}
void set(T* value) {
pthread_setspecific(key, value);
}
dbThreadContext() {
pthread_key_create(&key, NULL);
}
~dbThreadContext() {
pthread_key_delete(key);
}
};
class dbProcessId {
int pid;
pthread_t tid;
public:
bool operator != (dbProcessId const& other) const {
return pid != other.pid || tid != other.tid;
}
void clear() {
pid = 0;
tid = 0;
}
static dbProcessId getCurrent() {
dbProcessId curr;
curr.pid = getpid();
curr.tid = pthread_self();
return curr;
}
};
#else
class dbMutex {
public:
void lock() {}
void unlock() {}
};
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -