📄 sync_unix.h
字号:
//-< SYNC_UNIX.H >---------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *
// Last update: 20-Dec-98 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// Intertask synchonization primitives for Unix platforms
//-------------------------------------------------------------------*--------*
#ifndef __SYNC_UNIX_H__
#define __SYNC_UNIX_H__
// Standard includes for all Unix platforms
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/types.h>
#include <errno.h>
#if !defined(USE_POSIX_SEMAPHORES) || !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
#include <sys/ipc.h>
extern char const* keyFileDir; // default value: "/tmp/"
#endif
#if defined(USE_POSIX_SEMAPHORES)
#include <semaphore.h> // For POSIX style semaphores
#else
#include <sys/sem.h> // For SysV style semaphores
#endif
#if defined(USE_POSIX_MMAP) && USE_POSIX_MMAP
#include <sys/mman.h> // For mmap()
#else
#include <sys/shm.h>
#include <sys/mman.h>
#endif
#define thread_proc
//////////////////////////////////////////////////////////
// If this system uses pthread based threads, then define
// dbMutex(), dbThread(), dbLocalEvent(), etc as pthread-based implemenations
#ifndef NO_PTHREADS
// Use pthread based implementation
#include <pthread.h>
class dbMutex
{
friend class dbLocalEvent;
friend class dbLocalSemaphore;
pthread_mutex_t cs;
bool initialized;
public:
dbMutex()
{
pthread_mutex_init(&cs, NULL);
initialized = true;
}
~dbMutex()
{
pthread_mutex_destroy(&cs);
initialized = false;
}
bool isInitialized()
{
return initialized;
}
void lock()
{
if (initialized)
{
pthread_mutex_lock(&cs);
}
}
void unlock()
{
if (initialized)
{
pthread_mutex_unlock(&cs);
}
}
};
const size_t dbThreadStackSize = 1024*1024;
class dbThread
{
pthread_t thread;
public:
typedef void (thread_proc* thread_proc_t)(void*);
void create(thread_proc_t f, void* arg)
{
pthread_attr_t attr;
pthread_attr_init(&attr);
#if !defined(__linux__)
pthread_attr_setstacksize(&attr, dbThreadStackSize);
#endif
#if defined(_AIX41)
// At AIX 4.1, 4.2 threads are by default created detached
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_UNDETACHED);
#endif
pthread_create(&thread, &attr, (void*(*)(void*))f, arg);
pthread_attr_destroy(&attr);
}
void join()
{
void* result;
pthread_join(thread, &result);
}
void detach()
{
pthread_detach(thread);
}
enum ThreadPriority {
THR_PRI_LOW,
THR_PRI_HIGH
};
void setPriority(ThreadPriority pri)
{
#if defined(PRI_OTHER_MIN) && defined(PRI_OTHER_MAX)
struct sched_param sp;
sp.sched_priority = pri == THR_PRI_LOW ? PRI_OTHER_MIN : PRI_OTHER_MAX;
pthread_setschedparam(thread, SCHED_OTHER, &sp);
#endif
}
static int numberOfProcessors();
};
class dbLocalEvent
{
pthread_cond_t cond;
int signaled;
public:
void wait(dbMutex& mutex)
{
while (!signaled)
{
pthread_cond_wait(&cond, &mutex.cs);
}
}
bool wait(dbMutex& mutex, time_t timeout)
{
while (!signaled)
{
struct timespec abs_ts;
#ifdef PTHREAD_GET_EXPIRATION_NP
struct timespec rel_ts;
rel_ts.tv_sec = timeout/1000;
rel_ts.tv_nsec = timeout%1000*1000000;
pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
struct timeval cur_tv;
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000;
abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000;
if (abs_ts.tv_nsec > 1000000000)
{
abs_ts.tv_nsec -= 1000000000;
abs_ts.tv_sec += 1;
}
#endif
int rc = pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
if (rc == ETIMEDOUT)
{
return false;
}
}
return true;
}
void signal()
{
signaled = true;
pthread_cond_broadcast(&cond);
}
void reset()
{
signaled = false;
}
void open(bool initValue = false)
{
signaled = initValue;
pthread_cond_init(&cond, NULL);
}
void close()
{
pthread_cond_destroy(&cond);
}
};
class dbLocalSemaphore
{
pthread_cond_t cond;
int count;
public:
void wait(dbMutex& mutex)
{
while (count == 0)
{
pthread_cond_wait(&cond, &mutex.cs);
}
count -= 1;
}
void wait(dbMutex& mutex, time_t timeout)
{
while (count == 0)
{
struct timespec abs_ts;
#ifdef PTHREAD_GET_EXPIRATION_NP
struct timespec rel_ts;
rel_ts.tv_sec = timeout/1000;
rel_ts.tv_nsec = timeout%1000*1000000;
pthread_get_expiration_np(&rel_ts, &abs_ts);
#else
struct timeval cur_tv;
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + timeout/1000;
abs_ts.tv_nsec = cur_tv.tv_usec*1000 + timeout%1000*1000000;
if (abs_ts.tv_nsec > 1000000000)
{
abs_ts.tv_nsec -= 1000000000;
abs_ts.tv_sec += 1;
}
#endif
pthread_cond_timedwait(&cond, &mutex.cs, &abs_ts);
}
count -= 1;
}
void signal(unsigned inc = 1)
{
count += inc;
if (inc > 1)
{
pthread_cond_broadcast(&cond);
}
else if (inc == 1)
{
pthread_cond_signal(&cond);
}
}
void open(unsigned initValue = 0)
{
pthread_cond_init(&cond, NULL);
count = initValue;
}
void close()
{
pthread_cond_destroy(&cond);
}
};
template<class T>
class dbThreadContext
{
pthread_key_t key;
public:
T* get
()
{
return (T*)pthread_getspecific(key);
}
void set
(T* value)
{
pthread_setspecific(key, value);
}
dbThreadContext()
{
pthread_key_create(&key, NULL);
}
~dbThreadContext()
{
pthread_key_delete(key);
}
};
class dbProcessId
{
int pid;
pthread_t tid;
public:
bool operator != (dbProcessId const& other) const
{
return pid != other.pid || tid != other.tid;
}
void clear()
{
pid = 0;
tid = 0;
}
static dbProcessId getCurrent()
{
dbProcessId curr;
curr.pid = getpid();
curr.tid = pthread_self();
return curr;
}
};
#else // NO_PTHREAD
// Non pthread based threads, mutexes, etc.
// Maps to skeleton functions, this implementation isn't using threads.
class dbMutex
{
bool initialized;
public:
dbMutex()
{
initialized = true;
}
~dbMutex()
{
initialized = false;
}
bool isInitialized()
{
return initialized;
}
void lock()
{}
void unlock()
{}
}
;
class dbThread
{
public:
typedef void (thread_proc* thread_proc_t)(void*);
void create(thread_proc_t f, void* arg)
{
f(arg);
}
void join()
{}
void detach()
{}
enum ThreadPriority {
THR_PRI_LOW,
THR_PRI_HIGH
};
void setPriority(ThreadPriority pri)
{ }
static int numberOfProcessors()
{
return 1;
}
};
class dbLocalSemaphore
{
int count;
public:
void wait(dbMutex&)
{
assert (count > 0);
count -= 1;
}
void signal(unsigned inc = 1)
{
count += inc;
}
void open(unsigned initValue = 0)
{
count = initValue;
}
void close()
{}
}
;
class dbLocalEvent
{
bool signaled;
public:
void wait(dbMutex&)
{
assert(signaled);
}
bool wait(dbMutex& mutex, time_t timeout)
{
return true;
}
void signal()
{
signaled = true;
}
void reset()
{
signaled = false;
}
void open(bool initValue = false)
{
signaled = initValue;
}
void close()
{}
}
;
template<class T>
class dbThreadContext
{
T* value;
public:
T* get
()
{
return value;
}
void set
(T* value)
{
this->value = value;
}
dbThreadContext()
{
value = NULL;
}
};
class dbProcessId
{
int pid;
public:
bool operator != (dbProcessId const& other) const
{
return pid != other.pid;
}
void clear()
{
pid = 0;
}
static dbProcessId getCurrent()
{
dbProcessId curr;
curr.pid = getpid();
return curr;
}
};
#endif // NO_PTHREAD
#define INFINITE (~0U)
#ifdef USE_POSIX_SEMAPHORES
// Initialization Mutex using Posix based semaphores
class dbInitializationMutex
{
sem_t* sem;
public:
enum initializationStatus {
InitializationError,
AlreadyInitialized,
NotYetInitialized
};
initializationStatus initialize(char const* name)
{
initializationStatus status;
char* tmp = NULL;
if (*name != '/')
{
tmp = new char[strlen(name)+2];
strcpy(tmp+1, name);
*tmp = '/';
name = tmp;
}
while (true)
{
sem = sem_open(name, 0);
if (sem == SEM_FAILED)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -