📄 sync_unix.h
字号:
//-< SYNC_UNIX.H >---------------------------------------------------*--------*// FastDB Version 1.0 (c) 1999 GARRET * ? *// (Main Memory Database Management System) * /\| *// * / \ *// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *// Last update: 20-Dec-98 K.A. Knizhnik * GARRET *//-------------------------------------------------------------------*--------*// Intertask synchonization primitives for Unix platforms//-------------------------------------------------------------------*--------*#ifndef __SYNC_UNIX_H__#define __SYNC_UNIX_H__// Standard includes for all Unix platforms#include <unistd.h>#include <string.h>#include <fcntl.h>#include <sys/time.h>#include <sys/types.h>#include <errno.h>#if !defined(USE_POSIX_SEMAPHORES) || !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP#include <sys/ipc.h> extern char const* keyFileDir; // default value: "/tmp/" #endif#if defined(USE_POSIX_SEMAPHORES)#include <semaphore.h> // For POSIX style semaphores#else#include <sys/sem.h> // For SysV style semaphores#endif#if defined(USE_POSIX_MMAP) && USE_POSIX_MMAP#include <sys/mman.h> // For mmap()#else#include <sys/shm.h> #include <sys/mman.h>#endif#define thread_proc//////////////////////////////////////////////////////////// If this system uses pthread based threads, then define// dbMutex(), dbThread(), dbLocalEvent(), etc as pthread-based implemenations#ifndef NO_PTHREADS// Use pthread based implementation#include <pthread.h>class dbMutex { friend class dbLocalEvent; friend class dbLocalSemaphore; pthread_mutex_t cs; bool initialized; public: dbMutex() { pthread_mutex_init(&cs, NULL); initialized = true; } ~dbMutex() { pthread_mutex_destroy(&cs); initialized = false; } bool isInitialized() { return initialized; } void lock() { if (initialized) { pthread_mutex_lock(&cs); } } void unlock() { if (initialized) { pthread_mutex_unlock(&cs); } }};const size_t dbThreadStackSize = 1024*1024;class dbThread { pthread_t thread; public: typedef void (thread_proc* thread_proc_t)(void*); void create(thread_proc_t f, void* arg) { pthread_attr_t attr; pthread_attr_init(&attr);#if !defined(__linux__) pthread_attr_setstacksize(&attr, dbThreadStackSize);#endif#if defined(_AIX41) // At AIX 4.1, 4.2 threads are by default created detached pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_UNDETACHED);#endif pthread_create(&thread, &attr, (void*(*)(void*))f, arg); pthread_attr_destroy(&attr); } void join() { void* result; pthread_join(thread, &result); } void detach() { pthread_detach(thread); } enum ThreadPriority { THR_PRI_LOW, THR_PRI_HIGH }; void setPriority(ThreadPriority pri) { #if defined(PRI_OTHER_MIN) && defined(PRI_OTHER_MAX) struct sched_param sp; sp.sched_priority = pri == THR_PRI_LOW ? PRI_OTHER_MIN : PRI_OTHER_MAX; pthread_setschedparam(thread, SCHED_OTHER, &sp); #endif } static int numberOfProcessors();};class dbLocalEvent { pthread_cond_t cond; int signaled; public: void wait(dbMutex& mutex) { while (!signaled) { pthread_cond_wait(&cond, &mutex.cs); } } bool wait(dbMutex& mutex, time_t timeout) { while (!signaled) { struct timespec abs_ts; #ifdef PTHREAD_GET_EXPIRATION_NP struct timespec rel_ts; rel_ts.tv_sec = timeout/1000; rel_ts.tv_nsec = timeout%1000*1000000; pthread_get_expiration_np(&rel_ts, &abs_ts);#else struct timeval cur_tv; gettimeofday(&cur_tv, NULL); abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000; abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000; if (abs_ts.tv_nsec > 1000000000) { abs_ts.tv_nsec -= 1000000000; abs_ts.tv_sec += 1; }#endif int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts); if (rc == ETIMEDOUT) { return false; } } return true; } void signal() { signaled = true; pthread_cond_broadcast(&cond); } void reset() { signaled = false; } void open(bool initValue = false) { signaled = initValue; pthread_cond_init(&cond, NULL); } void close() { pthread_cond_destroy(&cond); }};class dbLocalSemaphore { pthread_cond_t cond; int count; public: void wait(dbMutex& mutex) { while (count == 0) { pthread_cond_wait(&cond, &mutex.cs); } count -= 1; } void wait(dbMutex& mutex, time_t timeout) { while (count == 0) { struct timespec abs_ts; #ifdef PTHREAD_GET_EXPIRATION_NP struct timespec rel_ts; rel_ts.tv_sec = timeout/1000; rel_ts.tv_nsec = timeout%1000*1000000; pthread_get_expiration_np(&rel_ts, &abs_ts);#else struct timeval cur_tv; gettimeofday(&cur_tv, NULL); abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000; abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000; if (abs_ts.tv_nsec > 1000000000) { abs_ts.tv_nsec -= 1000000000; abs_ts.tv_sec += 1; }#endif pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts); } count -= 1; } void signal(unsigned inc = 1) { count += inc; if (inc > 1) { pthread_cond_broadcast(&cond); } else if (inc == 1) { pthread_cond_signal(&cond); } } void open(unsigned initValue = 0) { pthread_cond_init(&cond, NULL); count = initValue; } void close() { pthread_cond_destroy(&cond); }};template<class T> class dbThreadContext { pthread_key_t key; public: T* get() { return (T*)pthread_getspecific(key); } void set(T* value) { pthread_setspecific(key, value); } dbThreadContext() { pthread_key_create(&key, NULL); } ~dbThreadContext() { pthread_key_delete(key); }};class dbProcessId { int pid; pthread_t tid; public: bool operator != (dbProcessId const& other) const { return pid != other.pid || tid != other.tid; } void clear() { pid = 0; tid = 0; } static dbProcessId getCurrent() { dbProcessId curr; curr.pid = getpid(); curr.tid = pthread_self(); return curr; }};#else // NO_PTHREAD// Non pthread based threads, mutexes, etc.// Maps to skeleton functions, this implementation isn't using threads.class dbMutex { bool initialized; public: dbMutex() { initialized = true; } ~dbMutex() { initialized = false; } bool isInitialized() { return initialized; } void lock() {} void unlock() {}};class dbThread { public: typedef void (thread_proc* thread_proc_t)(void*); void create(thread_proc_t f, void* arg) { f(arg); } void join() {} void detach() {} enum ThreadPriority { THR_PRI_LOW, THR_PRI_HIGH }; void setPriority(ThreadPriority pri) { } static int numberOfProcessors() { return 1; }};class dbLocalSemaphore { int count; public: void wait(dbMutex&) { assert (count > 0); count -= 1; } void signal(unsigned inc = 1) { count += inc; } void open(unsigned initValue = 0) { count = initValue; } void close() {}};class dbLocalEvent { bool signaled; public: void wait(dbMutex&) { assert(signaled); } bool wait(dbMutex& mutex, time_t timeout) { return true; } void signal() { signaled = true; } void reset() { signaled = false; } void open(bool initValue = false) { signaled = initValue; } void close() {}};template<class T>class dbThreadContext { T* value; public: T* get() { return value; } void set(T* value) { this->value = value; } dbThreadContext() { value = NULL; }};class dbProcessId { int pid; public: bool operator != (dbProcessId const& other) const { return pid != other.pid; } void clear() { pid = 0; } static dbProcessId getCurrent() { dbProcessId curr; curr.pid = getpid(); return curr; }};#endif // NO_PTHREAD#define INFINITE (~0U)#ifdef USE_POSIX_SEMAPHORES// Initialization Mutex using Posix based semaphoresclass dbInitializationMutex { sem_t* sem; public: enum initializationStatus { InitializationError, AlreadyInitialized, NotYetInitialized }; initializationStatus initialize(char const* name) { initializationStatus status; char* tmp = NULL; if (*name != '/') { tmp = new char[strlen(name)+2]; strcpy(tmp+1, name); *tmp = '/'; name = tmp; } while (true) { sem = sem_open(name, 0); if (sem == SEM_FAILED) { if (errno == ENOENT) { sem = sem_open(name, O_CREAT|O_EXCL, 0777, 0); if (sem != SEM_FAILED) { status = NotYetInitialized; break; } else if (errno != EEXIST) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -