📄 sync.cpp
字号:
//-< SYNC.CPP >------------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *
// Last update: 10-Dec-98 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// Intertask synchonization primitives
//-------------------------------------------------------------------*--------*
#define INSIDE_FASTDB
#include "stdtp.h"
#include "sync.h"
#ifndef _WIN32
unsigned dbSystem::getCurrentTimeMsec()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec*1000 + tv.tv_usec / 1000;
}
#ifndef USE_POSIX_API
#include <errno.h>
#include <signal.h>
#define PRINT_ERROR(func) perror(func)
char const* keyFileDir = "/tmp/";
static void alarm_handler(int){}
class moduleInitializer {
public:
moduleInitializer() {
static struct sigaction sigact;
sigact.sa_handler = alarm_handler;
::sigaction(SIGALRM, &sigact, NULL);
}
};
static moduleInitializer initializer; // install SIGLARM handler
int sem_init(int& sem, char const* name, unsigned init_value)
{
key_t key = IPC_PRIVATE;
int semid;
struct sembuf sops[3];
sops[0].sem_num = 1;
sops[0].sem_op = 0; /* check if semaphore was already initialized */
sops[0].sem_flg = IPC_NOWAIT;
sops[1].sem_num = 1;
sops[1].sem_op = 1; /* mark semaphore as initialized */
sops[1].sem_flg = 0;
sops[2].sem_num = 0;
sops[2].sem_op = init_value;
sops[2].sem_flg = 0;
if (name != NULL) {
int fd;
char* path = (char*)name;
if (*name != '/') {
path = new char[strlen(name)+strlen(keyFileDir)+1];
sprintf(path, "%s%s", keyFileDir, name);
}
fd = open(path, O_WRONLY|O_CREAT, 0777);
if (fd < 0) {
if (path != name) {
delete[] path;
}
PRINT_ERROR("open");
return -1;
}
close(fd);
key = ftok(path, '0');
if (path != name) {
delete[] path;
}
if (key < 0) {
PRINT_ERROR("ftok");
return -1;
}
}
semid = semget(key, 2, IPC_CREAT|0777);
if (semid < 0) {
PRINT_ERROR("semget");
return -1;
}
if (semop(semid, sops, items(sops)) && errno != EAGAIN) {
PRINT_ERROR("semop");
return -1;
}
sem = semid;
return 0;
}
enum wait_status { wait_ok, wait_timeout_expired, wait_error };
static wait_status wait_semaphore(int& sem, unsigned msec,
struct sembuf* sops, int n_sops)
{
if (msec != INFINITE) {
struct timeval start;
struct timeval stop;
gettimeofday(&start, NULL);
unsigned long usec = start.tv_usec + msec % 1000 * 1000;
stop.tv_usec = usec % 1000000;
stop.tv_sec = start.tv_sec + msec / 1000 + usec / 1000000;
while (true) {
struct itimerval it;
it.it_interval.tv_sec = 0;
it.it_interval.tv_usec = 0;
it.it_value.tv_sec = stop.tv_sec - start.tv_sec;
it.it_value.tv_usec = stop.tv_usec - start.tv_usec;
if (stop.tv_usec < start.tv_usec) {
it.it_value.tv_usec += 1000000;
it.it_value.tv_sec -= 1;
}
if (setitimer(ITIMER_REAL, &it, NULL) < 0) {
return wait_error;
}
if (semop(sem, sops, n_sops) == 0) {
break;
}
if (errno != EINTR) {
return wait_error;
}
gettimeofday(&start, NULL);
if (stop.tv_sec < start.tv_sec ||
(stop.tv_sec == start.tv_sec && stop.tv_usec < start.tv_sec))
{
return wait_timeout_expired;
}
}
} else {
while (semop(sem, sops, n_sops) < 0) {
if (errno != EINTR) {
return wait_error;
}
}
}
return wait_ok;
}
bool dbSemaphore::wait(unsigned msec)
{
static struct sembuf sops[] = {{0, -1, 0}};
wait_status ws = wait_semaphore(s, msec, sops, items(sops));
assert(ws != wait_error);
return ws == wait_ok;
}
void dbSemaphore::signal(unsigned inc)
{
if (inc != 0) {
struct sembuf sops[1];
sops[0].sem_num = 0;
sops[0].sem_op = inc;
sops[0].sem_flg = 0;
int rc = semop(s, sops, 1);
assert(rc == 0);
}
}
#if (defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)) || defined(__FreeBSD__)
/* union semun is defined by including <sys/sem.h> */
#else
union semun {
int val;
struct semid_ds* buf;
unsigned short* array;
};
#endif
static union semun u;
void dbSemaphore::reset()
{
u.val = 0;
int rc = semctl(s, 0, SETVAL, u);
assert(rc >= 0);
}
bool dbSemaphore::open(char const* name, unsigned init_value)
{
return sem_init(s, name, init_value) == 0;
}
void dbSemaphore::close() {}
void dbSemaphore::erase() {
semctl(s, 0, IPC_RMID, &u);
}
bool dbEvent::wait(unsigned msec)
{
static struct sembuf sops[] = {{0, -1, 0}, {0, 1, 0}};
wait_status ws = wait_semaphore(e, msec, sops, items(sops));
assert(ws != wait_error);
return ws == wait_ok;
}
void dbEvent::signal()
{
static struct sembuf sops[] = {{0, 0, IPC_NOWAIT}, {0, 1, 0}};
int rc = semop(e, sops, items(sops));
assert(rc == 0 || errno == EAGAIN);
}
void dbEvent::reset()
{
static struct sembuf sops[] = {{0, -1, IPC_NOWAIT}};
int rc = semop(e, sops, items(sops));
assert(rc == 0 || errno == EAGAIN);
}
bool dbEvent::open(char const* name, bool signaled)
{
return sem_init(e, name, signaled) == 0;
}
void dbEvent::close() {}
void dbEvent::erase() {
semctl(e, 0, IPC_RMID, &u);
}
#if !defined(__osf__) && !defined(__svr4__)
#if defined(__GNUC__) && defined(i386)
/*
* Make sure gcc doesn't try to be clever and move things around
* on us. We need to use _exactly_ the address the user gave us,
* not some alias that contains the same information.
*/
#define __atomic_fool_gcc(x) (*(struct { int a[100]; } *)x)
void dbGlobalCriticalSection::enter()
{
int inc = -1;
__asm__ __volatile__(
"lock; xadd %0,%1"
:"=r" (inc), "=m" (__atomic_fool_gcc(count))
:"r" (inc), "m" (__atomic_fool_gcc(count)));
if (inc != 1) {
static struct sembuf sops[] = {{0, -1, SEM_UNDO}};
int rc;
while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
assert(rc == 0);
}
}
void dbGlobalCriticalSection::leave()
{
int inc = 1;
__asm__ __volatile__(
"lock; xadd %0,%1"
:"=d" (inc), "=m" (__atomic_fool_gcc(count))
:"d" (inc), "m" (__atomic_fool_gcc(count)));
if (inc != 0) {
/* some other processes waiting to enter critical section */
static struct sembuf sops[] = {{0, 1, SEM_UNDO}};
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
}
bool dbGlobalCriticalSection::create(char const* name, sharedsem_t* count)
{
this->count = count;
*count = 1;
return sem_init(semid, name, 0) == 0;
}
bool dbGlobalCriticalSection::open(char const* name, sharedsem_t* count)
{
this->count = count;
return sem_init(semid, name, 0) == 0;
}
#else // defined(__GNUC__) && defined(i386)
void dbGlobalCriticalSection::enter()
{
static struct sembuf sops[] = {{0, -1, SEM_UNDO}};
int rc;
while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR);
assert(rc == 0);
}
void dbGlobalCriticalSection::leave()
{
static struct sembuf sops[] = {{0, 1, SEM_UNDO}};
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
bool dbGlobalCriticalSection::open(char const* name, sharedsem_t*)
{
return sem_init(semid, name, 1) == 0;
}
bool dbGlobalCriticalSection::create(char const* name, sharedsem_t*)
{
return sem_init(semid, name, 1) == 0;
}
#endif // defined(__GNUC__) && defined(i386)
void dbGlobalCriticalSection::erase()
{
semctl(semid, 0, IPC_RMID, &u);
}
#endif // !defined(__osf__) && !defined(__svr4__)
dbInitializationMutex::initializationStatus
dbInitializationMutex::initialize(char const* name)
{
struct sembuf sops[4];
char* path = (char*)name;
if (*name != '/') {
path = new char[strlen(name)+strlen(keyFileDir)+1];
sprintf(path, "%s%s", keyFileDir, name);
}
int fd = open(path, O_WRONLY|O_CREAT, 0777);
if (fd < 0) {
if (path != name) {
delete[] path;
}
PRINT_ERROR("open");
return InitializationError;
}
::close(fd);
int key = ftok(path, '0');
if (path != name) {
delete[] path;
}
if (key < 0) {
PRINT_ERROR("ftok");
return InitializationError;
}
while (true) {
semid = semget(key, 3, IPC_CREAT|0777);
if (semid < 0) {
PRINT_ERROR("semget");
return InitializationError;
}
// Semaphore 0 - number of active processes
// Semaphore 1 - intialization in progress (1 while initialization, 0 after it)
// Semaphore 2 - semaphore was destroyed
sops[0].sem_num = 0;
sops[0].sem_op = 0; /* check if semaphore was already initialized */
sops[0].sem_flg = IPC_NOWAIT;
sops[1].sem_num = 0;
sops[1].sem_op = 1; /* increment number of active processes */
sops[1].sem_flg = SEM_UNDO;
sops[2].sem_num = 1;
sops[2].sem_op = 1; /* initialization in process */
sops[2].sem_flg = SEM_UNDO;
sops[3].sem_num = 2;
sops[3].sem_op = 0; /* check if semaphore was destroyed */
sops[3].sem_flg = IPC_NOWAIT;
if (semop(semid, sops, 4) < 0) {
if (errno == EAGAIN) {
sops[0].sem_num = 0;
sops[0].sem_op = -1; /* check if semaphore was already initialized */
sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
sops[1].sem_num = 1;
sops[1].sem_op = 0; /* wait until inialization completed */
sops[1].sem_flg = 0;
sops[2].sem_num = 0;
sops[2].sem_op = 2; /* increment number of active processes */
sops[3].sem_flg = SEM_UNDO;
sops[3].sem_num = 2;
sops[3].sem_op = 0; /* check if semaphore was destroyed */
sops[3].sem_flg = IPC_NOWAIT;
if (semop(semid, sops, 4) == 0) {
return AlreadyInitialized;
}
if (errno == EAGAIN) {
sleep(1);
continue;
}
}
if (errno == EIDRM) {
continue;
}
PRINT_ERROR("semop");
return InitializationError;
} else {
return NotYetInitialized;
}
}
}
void dbInitializationMutex::done()
{
struct sembuf sops[1];
sops[0].sem_num = 1;
sops[0].sem_op = -1; /* initialization done */
sops[0].sem_flg = SEM_UNDO;
int rc = semop(semid, sops, 1);
assert(rc == 0);
}
bool dbInitializationMutex::close()
{
int rc;
struct sembuf sops[3];
while (true) {
sops[0].sem_num = 0;
sops[0].sem_op = -1; /* decrement process couter */
sops[0].sem_flg = SEM_UNDO;
sops[1].sem_num = 0;
sops[1].sem_op = 0; /* check if there are no more active processes */
sops[1].sem_flg = IPC_NOWAIT;
sops[2].sem_num = 2;
sops[2].sem_op = 1; /* mark as destructed */
sops[2].sem_flg = SEM_UNDO;
if ((rc = semop(semid, sops, 3)) == 0) {
return true;
} else {
assert(errno == EAGAIN);
}
sops[0].sem_num = 0;
sops[0].sem_op = -2; /* decrement process couter and check for non-zero */
sops[0].sem_flg = SEM_UNDO|IPC_NOWAIT;
sops[1].sem_num = 0;
sops[1].sem_op = 1;
sops[1].sem_flg = SEM_UNDO;
if ((rc = semop(semid, sops, 2)) == 0) {
return false;
} else {
assert(errno == EAGAIN);
}
}
}
void dbInitializationMutex::erase()
{
semctl(semid, 0, IPC_RMID, &u);
}
#endif // USE_POSIX_API
#ifndef NO_PTHREADS
#if defined(_SC_NPROCESSORS_ONLN)
int dbThread::numberOfProcessors() {
return sysconf(_SC_NPROCESSORS_ONLN);
}
#elif defined(__linux__)
#include <linux/smp.h>
int dbThread::numberOfProcessors() { return smp_num_cpus; }
#elif defined(__FreeBSD__)
#include <sys/sysctl.h>
int dbThread::numberOfProcessors() {
int mib[2],ncpus=0;
size_t len=sizeof(ncpus);
mib[0]= CTL_HW;
mib[1]= HW_NCPU;
sysctl(mib,2,&ncpus,&len,NULL,0);
return ncpus;
}
#else
int dbThread::numberOfProcessors() { return 1; }
#endif
#endif // NO_PTHREADS
#else // _WIN32
unsigned dbSystem::getCurrentTimeMsec()
{
return GetTickCount();
}
#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -