📄 sync_unix.h
字号:
//-< SYNC_UNIX.H >---------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *
// Last update: 20-Dec-98 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// Intertask synchonization primitives for Unix platforms
//-------------------------------------------------------------------*--------*
#ifndef __SYNC_UNIX_H__
#define __SYNC_UNIX_H__
// Standard includes for all Unix platforms
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#ifdef VXWORKS
#include "fastdbShim.h"
#else
#include <sys/time.h>
#endif // VXWORKS
#include <sys/types.h>
#include <assert.h>
#include <errno.h>
#if !defined(USE_POSIX_SEMAPHORES) || !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
#ifndef VXWORKS
#include <sys/ipc.h>
#endif // ndef VXWORKS
extern char const* keyFileDir; // default value: "/tmp/"
#endif
#if defined(USE_POSIX_SEMAPHORES)
#include <semaphore.h> // For POSIX style semaphores
#else
#include <sys/sem.h> // For SysV style semaphores
#endif
#if defined(USE_POSIX_MMAP) && USE_POSIX_MMAP
#include <sys/mman.h> // For mmap()
#else
#ifndef VXWORKS
#include <sys/shm.h>
#endif // ndef VXWORKS
#include <sys/mman.h>
#endif
BEGIN_FASTDB_NAMESPACE
#define thread_proc
//////////////////////////////////////////////////////////
// If this system uses pthread based threads, then define
// dbMutex(), dbThread(), dbLocalEvent(), etc as pthread-based implemenations
#ifndef NO_PTHREADS
// Use pthread based implementation
#include <pthread.h>
class dbMutex {
friend class dbLocalEvent;
friend class dbLocalSemaphore;
pthread_mutex_t cs;
bool initialized;
public:
dbMutex() {
#ifdef NDEBUG
pthread_mutex_init(&cs, NULL);
#else
#ifdef VXWORKS
memset(&cs, '\0', sizeof(cs));
#endif // VXWORKS
int rc = pthread_mutex_init(&cs, NULL);
assert(rc == 0);
#endif
initialized = true;
}
~dbMutex() {
#ifdef NDEBUG
pthread_mutex_destroy(&cs);
#else
int rc = pthread_mutex_destroy(&cs);
assert(rc == 0);
#endif
initialized = false;
}
bool isInitialized() {
return initialized;
}
void lock() {
if (initialized) {
#ifdef NDEBUG
pthread_mutex_lock(&cs);
#else
int rc = pthread_mutex_lock(&cs);
assert(rc == 0);
#endif
}
}
void unlock() {
if (initialized) {
#ifdef NDEBUG
pthread_mutex_unlock(&cs);
#else
int rc = pthread_mutex_unlock(&cs);
assert(rc == 0);
#endif
}
}
};
const size_t dbThreadStackSize = 1024*1024;
class dbThread {
pthread_t thread;
public:
typedef void (thread_proc* thread_proc_t)(void*);
static void sleep(time_t sec) {
::sleep(sec);
}
void create(thread_proc_t f, void* arg) {
pthread_attr_t attr;
pthread_attr_init(&attr);
#if !defined(__linux__)
pthread_attr_setstacksize(&attr, dbThreadStackSize);
#endif
#if defined(_AIX41)
// At AIX 4.1, 4.2 threads are by default created detached
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_UNDETACHED);
#endif
pthread_create(&thread, &attr, (void*(*)(void*))f, arg);
pthread_attr_destroy(&attr);
}
void join() {
void* result;
pthread_join(thread, &result);
}
void detach() {
pthread_detach(thread);
}
enum ThreadPriority {
THR_PRI_LOW,
THR_PRI_HIGH
};
void setPriority(ThreadPriority pri) {
#if defined(PRI_OTHER_MIN) && defined(PRI_OTHER_MAX)
struct sched_param sp;
sp.sched_priority = pri == THR_PRI_LOW ? PRI_OTHER_MIN : PRI_OTHER_MAX;
pthread_setschedparam(thread, SCHED_OTHER, &sp);
#endif
}
static int numberOfProcessors();
};
class dbLocalEvent {
pthread_cond_t cond;
int signaled;
public:
void wait(dbMutex& mutex) {
while (!signaled) {
pthread_cond_wait(&cond, &mutex.cs);
}
}
bool wait(dbMutex& mutex, time_t timeout) {
if (!signaled) {
struct timespec abs_ts;
#ifdef PTHREAD_GET_EXPIRATION_NP
struct timespec rel_ts;
rel_ts.tv_sec = timeout/1000;
rel_ts.tv_nsec = timeout%1000*1000000;
pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
struct timeval cur_tv;
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000;
abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000;
if (abs_ts.tv_nsec > 1000000000) {
abs_ts.tv_nsec -= 1000000000;
abs_ts.tv_sec += 1;
}
#endif
do {
int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
if (rc != 0) {
return false;
}
} while (!signaled);
}
return true;
}
void signal() {
signaled = true;
pthread_cond_broadcast(&cond);
}
void reset() {
signaled = false;
}
void open(bool initValue = false) {
signaled = initValue;
pthread_cond_init(&cond, NULL);
}
void close() {
pthread_cond_destroy(&cond);
}
};
class dbLocalSemaphore {
pthread_cond_t cond;
int count;
public:
void wait(dbMutex& mutex) {
while (count == 0) {
pthread_cond_wait(&cond, &mutex.cs);
}
count -= 1;
}
bool wait(dbMutex& mutex, time_t timeout) {
if (count == 0) {
struct timespec abs_ts;
#ifdef PTHREAD_GET_EXPIRATION_NP
struct timespec rel_ts;
rel_ts.tv_sec = timeout/1000;
rel_ts.tv_nsec = timeout%1000*1000000;
pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
struct timeval cur_tv;
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000;
abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000;
if (abs_ts.tv_nsec > 1000000000) {
abs_ts.tv_nsec -= 1000000000;
abs_ts.tv_sec += 1;
}
#endif
do {
int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
if (rc != 0) {
return false;
}
} while (count == 0);
}
count -= 1;
return true;
}
void signal(unsigned inc = 1) {
count += inc;
if (inc > 1) {
pthread_cond_broadcast(&cond);
} else if (inc == 1) {
pthread_cond_signal(&cond);
}
}
void open(unsigned initValue = 0) {
pthread_cond_init(&cond, NULL);
count = initValue;
}
void close() {
pthread_cond_destroy(&cond);
}
};
template<class T>
class dbThreadContext {
pthread_key_t key;
public:
T* get() {
T* p_val = (T*)pthread_getspecific(key);
#ifdef VXWORKS
// HACK - on VXWORKS, setting to NULL releases the key for _all_ threads.
// when windriver defect number WIND00119383 is fixed - this hack may be
// removed.
if (p_val == (T*)-1)
p_val = (T*)NULL;
#endif // VXWORKS
return p_val;
}
void set(T* value) {
#ifdef VXWORKS
// HACK - on VXWORKS, setting to NULL releases the key for _all_ threads.
// when windriver defect number WIND00119383 is fixed - this hack may be
// removed.
if (value == NULL)
value = (T*)-1;
#endif // VXWORKS
pthread_setspecific(key, value);
}
dbThreadContext() {
pthread_key_create(&key, NULL);
}
~dbThreadContext() {
pthread_key_delete(key);
}
};
class dbProcessId {
int pid;
pthread_t tid;
public:
bool operator != (dbProcessId const& other) const {
return pid != other.pid || tid != other.tid;
}
void clear() {
pid = 0;
tid = 0;
}
static dbProcessId getCurrent() {
dbProcessId curr;
curr.pid = getpid();
curr.tid = pthread_self();
return curr;
}
};
#else // NO_PTHREAD
// Non pthread based threads, mutexes, etc.
// Maps to skeleton functions, this implementation isn't using threads.
class dbMutex {
bool initialized;
public:
dbMutex() {
initialized = true;
}
~dbMutex() {
initialized = false;
}
bool isInitialized() {
return initialized;
}
void lock() {}
void unlock() {}
};
class dbThread {
public:
typedef void (thread_proc* thread_proc_t)(void*);
void create(thread_proc_t f, void* arg) { f(arg); }
void join() {}
void detach() {}
enum ThreadPriority {
THR_PRI_LOW,
THR_PRI_HIGH
};
void setPriority(ThreadPriority pri) { }
static int numberOfProcessors() { return 1; }
};
class dbLocalSemaphore {
int count;
public:
void wait(dbMutex&) {
assert (count > 0);
count -= 1;
}
void signal(unsigned inc = 1) {
count += inc;
}
void open(unsigned initValue = 0) {
count = initValue;
}
void close() {}
};
class dbLocalEvent {
bool signaled;
public:
void wait(dbMutex&) {
assert(signaled);
}
bool wait(dbMutex& mutex, time_t timeout) {
return true;
}
void signal() {
signaled = true;
}
void reset() {
signaled = false;
}
void open(bool initValue = false) {
signaled = initValue;
}
void close() {}
};
template<class T>
class dbThreadContext {
T* value;
public:
T* get() {
return value;
}
void set(T* value) {
this->value = value;
}
dbThreadContext() { value = NULL; }
};
class dbProcessId {
int pid;
public:
bool operator != (dbProcessId const& other) const {
return pid != other.pid;
}
void clear() {
pid = 0;
}
static dbProcessId getCurrent() {
dbProcessId curr;
curr.pid = getpid();
return curr;
}
};
#endif // NO_PTHREAD
#define INFINITE (~0U)
#ifdef USE_POSIX_SEMAPHORES
// Initialization Mutex using Posix based semaphores
class dbInitializationMutex {
sem_t* sem;
char* name;
public:
enum initializationStatus {
InitializationError,
AlreadyInitialized,
NotYetInitialized
};
initializationStatus initialize(char const* name) {
initializationStatus status;
this->name = new char[strlen(name)+2];
if (*name != '/') {
strcpy(this->name+1, name);
*this->name = '/';
} else {
strcpy(this->name, name);
}
while (true) {
sem = sem_open(this->name, 0);
if (sem == SEM_FAILED) {
if (errno == ENOENT) {
sem = sem_open(this->name, O_CREAT|O_EXCL, 0777, 0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -