sync.h
来自「一个功能强大的内存数据库源代码,c++编写,有详细的注释」· C头文件 代码 · 共 1,164 行 · 第 1/2 页
H
1,164 行
//-< SYNC.H >--------------------------------------------------------*--------*// FastDB Version 1.0 (c) 1999 GARRET * ? *// (Main Memory Database Management System) * /\| *// * / \ *// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *// Last update: 20-Dec-98 K.A. Knizhnik * GARRET *//-------------------------------------------------------------------*--------*// Intertask synchonization primitives//-------------------------------------------------------------------*--------*#ifndef __SYNC_H__#define __SYNC_H__class FASTDB_DLL_ENTRY dbSystem { public: static unsigned getCurrentTimeMsec();};#ifdef _WIN32#ifdef SET_NULL_DACLclass FASTDB_DLL_ENTRY dbNullSecurityDesciptor { public: SECURITY_DESCRIPTOR sd; SECURITY_ATTRIBUTES sa; dbNullSecurityDesciptor() { InitializeSecurityDescriptor(&sd, SECURITY_DESCRIPTOR_REVISION); SetSecurityDescriptorDacl(&sd, TRUE, NULL, FALSE); sa.nLength = sizeof(sa); sa.bInheritHandle = TRUE; sa.lpSecurityDescriptor = &sd; } static dbNullSecurityDesciptor instance;};#define FASTDB_SECURITY_ATTRIBUTES &dbNullSecurityDesciptor::instance.sa#else #define FASTDB_SECURITY_ATTRIBUTES NULL#endifclass FASTDB_DLL_ENTRY dbMutex { CRITICAL_SECTION cs; public: dbMutex() { InitializeCriticalSection(&cs); } ~dbMutex() { DeleteCriticalSection(&cs); } void lock() { EnterCriticalSection(&cs); } void unlock() { LeaveCriticalSection(&cs); }};#define thread_proc WINAPIclass FASTDB_DLL_ENTRY dbThread { HANDLE h; public: typedef void (thread_proc* thread_proc_t)(void*); void create(thread_proc_t f, void* arg) { DWORD threadid; h = CreateThread(FASTDB_SECURITY_ATTRIBUTES, NULL, LPTHREAD_START_ROUTINE(f), arg, 0, &threadid); } enum ThreadPriority { THR_PRI_LOW, THR_PRI_HIGH }; void setPriority(ThreadPriority pri) { SetThreadPriority(h, pri == THR_PRI_LOW ? THREAD_PRIORITY_IDLE : THREAD_PRIORITY_HIGHEST); } void join() { WaitForSingleObject(h, INFINITE); CloseHandle(h); h = NULL; } void detach() { if (h != NULL) { CloseHandle(h); h = NULL; } } dbThread() { h = NULL; } ~dbThread() { if (h != NULL) { CloseHandle(h); } } static int numberOfProcessors() { #ifdef PHAR_LAP return 1;#else SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); return sysinfo.dwNumberOfProcessors;#endif }}; class FASTDB_DLL_ENTRY dbProcessId { DWORD tid; public: bool operator != (dbProcessId const& other) const { return tid != other.tid; } void clear() { tid = 0; } static dbProcessId getCurrent() { dbProcessId curr; curr.tid = GetCurrentThreadId(); return curr; }};class FASTDB_DLL_ENTRY dbInitializationMutex { HANDLE m; public: enum initializationStatus { InitializationError, AlreadyInitialized, NotYetInitialized }; initializationStatus initialize(char const* name) { initializationStatus status; m = CreateMutex(FASTDB_SECURITY_ATTRIBUTES, true, name); if (GetLastError() == ERROR_ALREADY_EXISTS) { status = WaitForSingleObject(m, INFINITE) == WAIT_OBJECT_0 ? AlreadyInitialized : InitializationError; ReleaseMutex(m); } else if (m != NULL) { status = NotYetInitialized; } else { status = InitializationError; } return status; } void done() { ReleaseMutex(m); } bool close() { CloseHandle(m); return false; } void erase() { close(); }};const int dbMaxSemValue = 1000000;class FASTDB_DLL_ENTRY dbSemaphore { protected: HANDLE s; public: bool wait(unsigned msec = INFINITE) { int rc = WaitForSingleObject(s, msec); assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT); return rc == WAIT_OBJECT_0; } void signal(unsigned inc = 1) { if (inc != 0) { ReleaseSemaphore(s, inc, NULL); } } void reset() { while (WaitForSingleObject(s, 0) == WAIT_OBJECT_0); } bool open(char const* name, unsigned initValue = 0) { s = CreateSemaphore(FASTDB_SECURITY_ATTRIBUTES, initValue, dbMaxSemValue, name); return s != NULL; } void close() { CloseHandle(s); } void erase() { close(); }};class FASTDB_DLL_ENTRY dbEvent { protected: HANDLE e; public: bool wait(unsigned msec = INFINITE) { int rc = WaitForSingleObject(e, msec); assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT); return rc == WAIT_OBJECT_0; } void signal() { SetEvent(e); } void reset() { ResetEvent(e); } bool open(char const* name, bool signaled = false) { e = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, true, signaled, name); return e != NULL; } void close() { CloseHandle(e); } void erase() { close(); }};class FASTDB_DLL_ENTRY dbLocalSemaphore : public dbSemaphore { public: bool wait(dbMutex& mutex, time_t timeoutMsec) { mutex.unlock(); int rc = WaitForSingleObject(s, timeoutMsec); assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT); mutex.lock(); return rc == WAIT_OBJECT_0; } void wait(dbMutex& mutex) { mutex.unlock(); int rc = WaitForSingleObject(s, INFINITE); assert(rc == WAIT_OBJECT_0); mutex.lock(); } bool open(unsigned initValue = 0) { return dbSemaphore::open(NULL, initValue); }};class FASTDB_DLL_ENTRY dbLocalEvent : public dbEvent { public: bool wait(dbMutex& mutex, time_t timeoutMsec) { mutex.unlock(); int rc = WaitForSingleObject(e, timeoutMsec); assert(rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT); mutex.lock(); return rc == WAIT_OBJECT_0; } void wait(dbMutex& mutex) { mutex.unlock(); int rc = WaitForSingleObject(e, INFINITE); assert(rc == WAIT_OBJECT_0); mutex.lock(); } bool open(bool signaled = false) { return dbEvent::open(NULL, signaled); }};template<class T>class dbThreadContext { int index; public: T* get() { return (T*)TlsGetValue(index); } void set(T* value) { TlsSetValue(index, value); } dbThreadContext() { index = TlsAlloc(); assert(index != TLS_OUT_OF_INDEXES); } ~dbThreadContext() { TlsFree(index); }};template<class T>class dbSharedObject { T* ptr; HANDLE h; public: bool open(char* name) { #ifdef NO_MMAP ptr = new T();#else h = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE, 0, sizeof(T), name); if (h == NULL) { return false; } ptr = (T*)MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0); if (ptr == NULL) { CloseHandle(h); return false; }#endif return true; } T* get() { return ptr; } void close() { #ifdef NO_MMAP delete[] ptr;#else UnmapViewOfFile(ptr); CloseHandle(h);#endif } void erase() { close(); }};typedef long sharedsem_t;class FASTDB_DLL_ENTRY dbGlobalCriticalSection { HANDLE event; sharedsem_t* count; public: void enter() { if (InterlockedDecrement(count) != 0) { // another process is in critical section int rc = WaitForSingleObject(event, INFINITE); assert (rc == WAIT_OBJECT_0); } } void leave() { if (InterlockedIncrement(count) <= 0) { // some other processes try to enter critical section SetEvent(event); } } bool open(char const* name, long* count) { this->count = count; event = OpenEvent(EVENT_ALL_ACCESS, FALSE, name); return event != NULL; } bool create(char const* name, long* count) { this->count = count; *count = 1; event = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, name); return event != NULL; } void close() { CloseHandle(event); } void erase() { close(); }}; #else // Unix#include <unistd.h>#include <string.h>#include <fcntl.h>#include <sys/time.h>#include <sys/types.h>#include <errno.h>#ifdef USE_POSIX_API#include <semaphore.h>#include <sys/mman.h>#else#include <sys/ipc.h>#include <sys/sem.h>#include <sys/shm.h>#include <sys/mman.h>#endif#define thread_proc#ifndef NO_PTHREADS#include <pthread.h>class dbMutex { friend class dbLocalEvent; friend class dbLocalSemaphore; pthread_mutex_t cs; public: dbMutex() { pthread_mutex_init(&cs, NULL); } ~dbMutex() { pthread_mutex_destroy(&cs); } void lock() { pthread_mutex_lock(&cs); } void unlock() { pthread_mutex_unlock(&cs); }};const size_t dbThreadStackSize = 1024*1024;class dbThread { pthread_t thread; public: typedef void (thread_proc* thread_proc_t)(void*); void create(thread_proc_t f, void* arg) { pthread_attr_t attr; pthread_attr_init(&attr);#if !defined(__linux__) pthread_attr_setstacksize(&attr, dbThreadStackSize);#endif#if defined(_AIX41) // At AIX 4.1, 4.2 threads are by default created detached pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_UNDETACHED);#endif pthread_create(&thread, &attr, (void*(*)(void*))f, arg); pthread_attr_destroy(&attr); } void join() { void* result; pthread_join(thread, &result); } void detach() { pthread_detach(thread); } enum ThreadPriority { THR_PRI_LOW, THR_PRI_HIGH }; void setPriority(ThreadPriority pri) { #if defined(PRI_OTHER_MIN) && defined(PRI_OTHER_MAX) struct sched_param sp; sp.gsched_priority = pri == THR_PRI_LOW ? PRI_OTHER_MIN : PRI_OTHER_MAX; pthread_setschedparam(thread, SCHED_OTHER, &sp); #endif } static int numberOfProcessors();};class dbLocalEvent { pthread_cond_t cond; int signaled; public: void wait(dbMutex& mutex) { while (!signaled) { pthread_cond_wait(&cond, &mutex.cs); } } bool wait(dbMutex& mutex, time_t timeout) { while (!signaled) { struct timespec abs_ts; #ifdef PTHREAD_GET_EXPIRATION_NP struct timespec rel_ts; rel_ts.tv_sec = timeout/1000; rel_ts.tv_nsec = timeout%1000*1000000; pthread_get_expiration_np(&rel_ts, &abs_ts);#else struct timeval cur_tv; gettimeofday(&cur_tv, NULL); abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000; abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000; if (abs_ts.tv_nsec > 1000000000) { abs_ts.tv_nsec -= 1000000000; abs_ts.tv_sec += 1; }#endif int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts); if (rc == ETIMEDOUT) { return false; } } return true; } void signal() { signaled = true; pthread_cond_broadcast(&cond); } void reset() { signaled = false; } void open(bool initValue = false) { signaled = initValue; pthread_cond_init(&cond, NULL); } void close() { pthread_cond_destroy(&cond); }};class dbLocalSemaphore { pthread_cond_t cond; int count; public: void wait(dbMutex& mutex) { while (count == 0) { pthread_cond_wait(&cond, &mutex.cs); } count -= 1; } void wait(dbMutex& mutex, time_t timeout) { while (count == 0) { struct timespec abs_ts; #ifdef PTHREAD_GET_EXPIRATION_NP struct timespec rel_ts; rel_ts.tv_sec = timeout/1000; rel_ts.tv_nsec = timeout%1000*1000000; pthread_get_expiration_np(&rel_ts, &abs_ts);#else struct timeval cur_tv; gettimeofday(&cur_tv, NULL); abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000; abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000; if (abs_ts.tv_nsec > 1000000000) { abs_ts.tv_nsec -= 1000000000; abs_ts.tv_sec += 1; }#endif pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts); } count -= 1; } void signal(unsigned inc = 1) { count += inc; if (inc > 1) { pthread_cond_broadcast(&cond); } else if (inc == 1) { pthread_cond_signal(&cond); } } void open(unsigned initValue = 0) { pthread_cond_init(&cond, NULL); count = initValue; } void close() { pthread_cond_destroy(&cond); }};template<class T> class dbThreadContext { pthread_key_t key; public: T* get() { return (T*)pthread_getspecific(key); } void set(T* value) { pthread_setspecific(key, value); } dbThreadContext() { pthread_key_create(&key, NULL); } ~dbThreadContext() { pthread_key_delete(key); }};class dbProcessId { int pid; pthread_t tid; public: bool operator != (dbProcessId const& other) const { return pid != other.pid || tid != other.tid; } void clear() { pid = 0; tid = 0; } static dbProcessId getCurrent() { dbProcessId curr;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?