📄 sync.cpp
字号:
//-< SYNC.CPP >------------------------------------------------------*--------*// FastDB Version 1.0 (c) 1999 GARRET * ? *// (Main Memory Database Management System) * /\| *// * / \ *// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *// Last update: 10-Dec-98 K.A. Knizhnik * GARRET *//-------------------------------------------------------------------*--------*// Intertask synchonization primitives//-------------------------------------------------------------------*--------*#define INSIDE_FASTDB#include "stdtp.h"#include "sync.h"BEGIN_FASTDB_NAMESPACE#ifndef _WIN32// Unix specificunsigned dbSystem::getCurrentTimeMsec(){ struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec*1000 + tv.tv_usec / 1000;}#if !defined(USE_POSIX_SEMAPHORES) || !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAPEND_FASTDB_NAMESPACE#include <errno.h>BEGIN_FASTDB_NAMESPACE#define PRINT_ERROR(func) perror(func)char const* keyFileDir = "/tmp/";#ifndef USE_POSIX_SEMAPHORESEND_FASTDB_NAMESPACE#include <signal.h>BEGIN_FASTDB_NAMESPACEstatic void alarm_handler(int){}class moduleInitializer { public: moduleInitializer() { static struct sigaction sigact; sigact.sa_handler = alarm_handler; ::sigaction(SIGALRM, &sigact, NULL); }};static moduleInitializer initializer; // install SIGLARM handler#endif // USE_POSIX_SEMAPHORES#endif // use SysV primitives#if !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAPbool dbSharedMemory::open(char const* name, size_t size){ char* fileName = (char*)name; if (strchr(name, '/') == NULL) { fileName = new char[strlen(name)+strlen(keyFileDir)+1]; sprintf(fileName, "%s%s", keyFileDir, name); } int fd = ::open(fileName, O_RDWR|O_CREAT, 0777); if (fd < 0) { if (fileName != name) { delete[] fileName; } return false; } ::close(fd); int key = ftok(fileName, '0'); if (fileName != name) { delete[] fileName; } if (key < 0) { return false; } shm = shmget(key, DOALIGN(size, 4096), IPC_CREAT|0777); if (shm < 0) { return false; } ptr = (char*)shmat(shm, NULL, 0); return (ptr != (char*)-1);}void dbSharedMemory::close(){ shmdt((char*)ptr);}void dbSharedMemory::erase(){ close(); shmctl(shm, IPC_RMID, NULL);}#endif // use SysV shmat//////////////////////////////////////////////////////////////////////// // If we are to use the local implementation of dbSemaphore and dbEvent// (which currently uses SysV based semaphores)#ifndef USE_POSIX_SEMAPHORESint sem_init(int& sem, char const* name, unsigned init_value){ key_t key = IPC_PRIVATE; int semid; struct sembuf sops[3]; sops[0].sem_num = 1; sops[0].sem_op = 0; /* check if semaphore was already initialized */ sops[0].sem_flg = IPC_NOWAIT; sops[1].sem_num = 1; sops[1].sem_op = 1; /* mark semaphore as initialized */ sops[1].sem_flg = 0; sops[2].sem_num = 0; sops[2].sem_op = init_value; sops[2].sem_flg = 0; if (name != NULL) { int fd; char* path = (char*)name; if (strchr(name, '/') == NULL) { path = new char[strlen(name)+strlen(keyFileDir)+1]; sprintf(path, "%s%s", keyFileDir, name); } fd = open(path, O_WRONLY|O_CREAT, 0777); if (fd < 0) { if (path != name) { delete[] path; } PRINT_ERROR("open"); return -1; } close(fd); key = ftok(path, '0'); if (path != name) { delete[] path; } if (key < 0) { PRINT_ERROR("ftok"); return -1; } } semid = semget(key, 2, IPC_CREAT|0777); if (semid < 0) { PRINT_ERROR("semget"); return -1; } if (semop(semid, sops, itemsof(sops)) != 0 && errno != EAGAIN) { PRINT_ERROR("semop"); return -1; } sem = semid; return 0;}enum wait_status { wait_ok, wait_timeout_expired, wait_error };static wait_status wait_semaphore(int& sem, unsigned msec, struct sembuf* sops, int n_sops){ if (msec != INFINITE) { struct timeval start; struct timeval stop; gettimeofday(&start, NULL); unsigned long usec = start.tv_usec + msec % 1000 * 1000; stop.tv_usec = usec % 1000000; stop.tv_sec = start.tv_sec + msec / 1000 + usec / 1000000; while (true) { struct itimerval it; it.it_interval.tv_sec = 0; it.it_interval.tv_usec = 0; it.it_value.tv_sec = stop.tv_sec - start.tv_sec; it.it_value.tv_usec = stop.tv_usec - start.tv_usec; if (stop.tv_usec < start.tv_usec) { it.it_value.tv_usec += 1000000; it.it_value.tv_sec -= 1; } if (setitimer(ITIMER_REAL, &it, NULL) < 0) { return wait_error; } if (semop(sem, sops, n_sops) == 0) { break; } if (errno != EINTR) { return wait_error; } gettimeofday(&start, NULL); if (stop.tv_sec < start.tv_sec || (stop.tv_sec == start.tv_sec && stop.tv_usec < start.tv_sec)) { return wait_timeout_expired; } } } else { while (semop(sem, sops, n_sops) < 0) { if (errno != EINTR) { return wait_error; } } } return wait_ok;}bool dbSemaphore::wait(unsigned msec){ static struct sembuf sops[] = {{0, -1, 0}}; wait_status ws = wait_semaphore(s, msec, sops, itemsof(sops)); assert(ws != wait_error); return ws == wait_ok;}void dbSemaphore::signal(unsigned inc){ if (inc != 0) { struct sembuf sops[1]; sops[0].sem_num = 0; sops[0].sem_op = inc; sops[0].sem_flg = 0; int rc = semop(s, sops, 1); assert(rc == 0); }}bool dbWatchDog::watch(){ static struct sembuf sops[] = {{0, -1, SEM_UNDO}}; int rc; while ((rc = semop(id, sops, 1)) < 0 && errno == EINTR); return rc == 0;}void dbWatchDog::close(){ semctl(id, 0, IPC_RMID, NULL);}bool dbWatchDog::open(char const* name){ return open(name, 0777);}bool dbWatchDog::open(char const* name, int flags){ key_t key = IPC_PRIVATE; if (name != NULL) { int fd; char* path = (char*)name; if (strchr(name, '/') == NULL) { path = new char[strlen(name)+strlen(keyFileDir)+1]; sprintf(path, "%s%s", keyFileDir, name); } fd = ::open(path, O_WRONLY|O_CREAT, 0777); if (fd < 0) { if (path != name) { delete[] path; } PRINT_ERROR("open"); return -1; } ::close(fd); key = ftok(path, '0'); if (path != name) { delete[] path; } if (key < 0) { PRINT_ERROR("ftok"); return -1; } } return (id = semget(key, 1, flags)) >= 0;}bool dbWatchDog::create(char const* name) { static struct sembuf sops[] = {{0, 1, 0}, {0, -1, SEM_UNDO}}; if (open(name, IPC_CREAT|0777)) { return semop(id, sops, 2) == 0; } return false;}#if (defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)) || defined(__FreeBSD__) /* union semun is defined by including <sys/sem.h> */ #elseunion semun { int val; struct semid_ds* buf; unsigned short* array;};#endifstatic union semun u;void dbSemaphore::reset() { u.val = 0; int rc = semctl(s, 0, SETVAL, u); assert(rc >= 0);}bool dbSemaphore::open(char const* name, unsigned init_value){ return sem_init(s, name, init_value) == 0;}void dbSemaphore::close() {}void dbSemaphore::erase() { semctl(s, 0, IPC_RMID, &u);}bool dbEvent::wait(unsigned msec){ static struct sembuf sops[] = {{0, -1, 0}, {0, 1, 0}}; wait_status ws = wait_semaphore(e, msec, sops, itemsof(sops)); assert(ws != wait_error); return ws == wait_ok;}void dbEvent::signal(){ static struct sembuf sops[] = {{0, 0, IPC_NOWAIT}, {0, 1, 0}}; int rc = semop(e, sops, itemsof(sops)); assert(rc == 0 || errno == EAGAIN); }void dbEvent::reset(){ static struct sembuf sops[] = {{0, -1, IPC_NOWAIT}}; int rc = semop(e, sops, itemsof(sops)); assert(rc == 0 || errno == EAGAIN); }bool dbEvent::open(char const* name, bool signaled){// XXX: sem_init is POSIX, the rest of these calls are SysV. return sem_init(e, name, signaled) == 0;}void dbEvent::close() {}void dbEvent::erase() { semctl(e, 0, IPC_RMID, &u);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -