sync.h
来自「一个功能强大的内存数据库源代码,c++编写,有详细的注释」· C头文件 代码 · 共 1,164 行 · 第 1/2 页
H
1,164 行
curr.pid = getpid(); curr.tid = pthread_self(); return curr; }};#elseclass dbMutex { public: void lock() {} void unlock() {}};class dbThread { public: typedef void (thread_proc* thread_proc_t)(void*); void create(thread_proc_t f, void* arg) { f(arg); } void join() {} void detach() {} static int numberOfProcessors() { return 1; }};class dbLocalSemaphore { int count; public: void wait(dbMutex&) { assert (count > 0); count -= 1; } void signal(unsigned inc = 1) { count += inc; } void open(unsigned initValue = 0) { count = initValue; } void close() {}};class dbLocalEvent { bool signaled; public: void wait(dbMutex&) { assert(signaled); } void signal() { signaled = true; } void reset() { signaled = false; } void open(bool initValue = false) { signaled = initValue; } void close() {}};template<class T>class dbThreadContext { T* value; public: T* get() { return value; } void set(T* value) { this->value = value; } dbThreadContext() { value = NULL; }};class dbProcessId { int pid; public: bool operator != (dbProcessId const& other) const { return pid != other.pid; } void clear() { pid = 0; } static dbProcessId getCurrent() { dbProcessId curr; curr.pid = getpid(); return curr; }};#endif#define INFINITE (~0U)#ifdef USE_POSIX_APIclass dbInitializationMutex { sem_t* sem; public: enum initializationStatus { InitializationError, AlreadyInitialized, NotYetInitialized }; initializationStatus initialize(char const* name) { initializationStatus status; char* tmp = NULL; if (*name != '/') { tmp = new char[strlen(name)+2]; strcpy(tmp+1, name); *tmp = '/'; name = tmp; } while (true) { sem = sem_open(name, 0); if (sem == NULL) { if (errno == ENOENT) { sem = sem_open(name, O_CREAT|O_EXCL, 0777, 0); if (sem != NULL) { status = NotYetInitialized; break; } else if (errno != EEXIST) { status = InitializationError; break; } } else { status = InitializationError; break; } } else { status = (sem_wait(sem) == 0 && sem_post(sem) == 0) ? AlreadyInitialized : InitializationError; break; } } delete[] tmp; return status; } void done() { sem_post(sem); } bool close() { sem_close(sem); return false; } void erase() { close(); }};class dbSemaphore { protected: sem_t* sem; public: void wait() { int rc = sem_wait(sem); assert(rc == 0); } bool wait(unsigned msec) { #ifdef POSIX_1003_1d struct timespec abs_ts; struct timeval cur_tv; clock_gettime(CLOCK_REALTIME, &cur_tv); abs_ts.tv_sec = cur_tv.tv_sec + (msec + tv.tv_usec / 1000) / 1000000; abs_ts.tv_nsec = (msec + tv.tv_usec / 1000) % 1000000 * 1000; int rc = sem_timedwait(sem, &abs_ts); if (rc < 0) { assert(errno == ETIMEDOUT); return false; } return true;#else int rc = sem_wait(sem); assert(rc == 0); return true;#endif } void signal(unsigned inc = 1) { while (--inc > 0) { sem_post(sem); } } void reset() { while (sem_trywait(sem) == 0); } bool open(char const* name, unsigned initValue = 0) { char* tmp = NULL; if (*name != '/') { tmp = new char[strlen(name)+2]; strcpy(tmp+1, name); *tmp = '/'; name = tmp; } sem = sem_open(name, O_CREAT, 0777, initValue); delete[] tmp; return sem != NULL; } void close() { sem_close(sem); } void erase() { close(); }};class dbEvent : public dbSemaphore { public: void wait() { dbSemaphore::wait(); sem_post(sem); } bool wait(unsigned msec) { if (dbSemaphore::wait(msec)) { sem_post(sem); return true; } return false; } void signal() { while (sem_trywait(sem) == 0); sem_post(sem); } void reset() { while (sem_trywait(sem) == 0); } bool open(char const* name, bool signaled = false) { return dbSemaphore::open(name, (int)signaled); }};template<class T>class dbSharedObject { char* name; T* ptr; int fd; public: dbSharedObject() { name = NULL; ptr = NULL; fd = -1; } bool open(char* fileName) { delete[] name; name = new char[strlen(fileName) + 1]; strcpy(name, fileName); fd = ::open(fileName, O_RDWR|O_CREAT, 0777); if (fd < 0) { return false; } ptr = (T*)mmap(NULL, DOALIGN(sizeof(T), 4096), PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); if (ptr == MAP_FAILED) { ptr = NULL; ::close(fd); return false; } return true; } T* get() { return ptr; } void close() { if (ptr != NULL) { munmap(ptr, DOALIGN(sizeof(T), 4096)); } if (fd > 0) { ::close(fd); } } void erase() { close(); unlink(name); } ~dbSharedObject() { delete[] name; }};#else // USE_POSIX_APIextern char const* keyFileDir; // default value: "/tmp/" class dbInitializationMutex { int semid; public: enum initializationStatus { InitializationError, AlreadyInitialized, NotYetInitialized }; initializationStatus initialize(char const* name); void done(); bool close(); void erase();};class dbSemaphore { int s; public: bool wait(unsigned msec = INFINITE); void signal(unsigned inc = 1); bool open(char const* name, unsigned initValue = 0); void reset(); void close(); void erase();};class dbEvent { int e; public: bool wait(unsigned msec = INFINITE); void signal(); void reset(); bool open(char const* name, bool signaled = false); void close(); void erase();};class dbSharedMemory { protected: char* ptr; int shm; public: bool open(char const* name, size_t size); void close(); void erase(); char* get_base() { return ptr; }};template<class T>class dbSharedObject : public dbSharedMemory { public: bool open(char* name) { return dbSharedMemory::open(name, sizeof(T)); } T* get() { return (T*)ptr; }};#endif#if defined(__QNX__)typedef pthread_mutext_t sharedsem_t;class dbGlobalCriticalSection { pthread_mutexattr_t attr; sharedsem_t* sem; public: void enter() { int rc = pthread_mutex_lock(sem); assert(rc == 0); } void leave() { int rc = pthread_mutex_unlock(sem); assert(rc == 0); } bool open(char const*, sharedsem_t* shr) { sem = shr; return true; } bool create(char const*, sharedsem_t* shr) { sem = shr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutexattr_setrecursive(&attr, PTHREAD_RECURSIVE_ENABLE); pthread_mutex_init(sem, &attr); return true; } void close() {} void erase() { pthread_mutex_destroy(sem); }};#elif defined(__osf__)#include <errno.h>typedef msemaphore sharedsem_t;class dbGlobalCriticalSection { sharedsem_t* sem; public: void enter() { int rc; while ((rc = msem_lock(sem, 0)) < 0 && errno == EINTR); assert(rc == 0); } void leave() { int rc = msem_unlock(sem, 0); assert(rc == 0); } bool open(char const*, sharedsem_t* shr) { sem = shr; return true; } bool create(char const*, sharedsem_t* shr) { sem = shr; msem_init(shr, MSEM_UNLOCKED); return true; } void close() {} void erase() { msem_remove(sem); }}; #elif defined(__sun)#include <synch.h>#include <errno.h>typedef sema_t sharedsem_t;class dbGlobalCriticalSection { sharedsem_t* sem; public: void enter() { int rc; while ((rc = sema_wait(sem)) < 0 && errno == EINTR); assert(rc == 0); } void leave() { int rc = sema_post(sem); assert(rc == 0); } bool open(char const*, sharedsem_t* shr) { sem = shr; return true; } bool create(char const*, sharedsem_t* shr) { sem = shr; return sema_init(shr, 1, USYNC_PROCESS, NULL) == 0; } void close() {} void erase() { sema_destroy(sem); }};#elif defined(USE_POSIX_API)typedef sem_t sharedsem_t;class dbGlobalCriticalSection { sharedsem_t* sem; public: void enter() { int rc = sem_wait(sem); assert(rc == 0); } void leave() { int rc = sem_post(sem); assert(rc == 0); } bool open(char const* name, sharedsem_t* shr) { sem = shr; return true; } bool create(char const* name, sharedsem_t* shr) { sem = shr; return sem_init(sem, 1, 1) == 0; } void close() {} void erase() { sem_destroy(sem); }};#elsetypedef long sharedsem_t;class dbGlobalCriticalSection { int semid; sharedsem_t* count; public: void enter(); void leave(); bool open(char const* name, sharedsem_t* shr); bool create(char const* name, sharedsem_t* shr); void close() {} void erase();};#endif#endifclass FASTDB_DLL_ENTRY dbCriticalSection { private: dbMutex& mutex; public: dbCriticalSection(dbMutex& guard) : mutex(guard) { mutex.lock(); } ~dbCriticalSection() { mutex.unlock(); }}; #define SMALL_BUF_SIZE 1024class FASTDB_DLL_ENTRY dbSmallBuffer { protected: char* buf; char smallBuf[SMALL_BUF_SIZE]; public: dbSmallBuffer(size_t size) { if (size > SMALL_BUF_SIZE) { buf = new char[size]; } else { buf = smallBuf; } } operator char*() { return buf; } char* base() { return buf; } ~dbSmallBuffer() { if (buf != smallBuf) { delete[] buf; } }};class FASTDB_DLL_ENTRY dbPooledThread { private: friend class dbThreadPool; dbThread thread; dbThreadPool* pool; dbPooledThread* next; dbThread::thread_proc_t f; void* arg; bool running; dbLocalSemaphore startSem; dbLocalSemaphore readySem; static void thread_proc pooledThreadFunc(void* arg); void run(); void stop(); dbPooledThread(dbThreadPool* threadPool); ~dbPooledThread(); };class FASTDB_DLL_ENTRY dbThreadPool { friend class dbPooledThread; dbPooledThread* freeThreads; dbMutex mutex; public: dbPooledThread* create(dbThread::thread_proc_t f, void* arg); void join(dbPooledThread* thr); dbThreadPool(); ~dbThreadPool();}; #endif
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?