sync.cpp

来自「FastDb是高效的内存数据库系统」· C++ 代码 · 共 856 行 · 第 1/2 页

CPP
856
字号
//-< SYNC.CPP >------------------------------------------------------*--------*
// FastDB                    Version 1.0         (c) 1999  GARRET    *     ?  *
// (Main Memory Database Management System)                          *   /\|  *
//                                                                   *  /  \  *
//                          Created:     20-Nov-98    K.A. Knizhnik  * / [] \ *
//                          Last update: 10-Dec-98    K.A. Knizhnik  * GARRET *
//-------------------------------------------------------------------*--------*
// Intertask synchonization primitives
//-------------------------------------------------------------------*--------*

#define INSIDE_FASTDB

#include "stdtp.h"
#include "sync.h"

#ifndef _WIN32

// Unix specific

unsigned dbSystem::getCurrentTimeMsec()
{

  struct timeval tv;
  gettimeofday(&tv, NULL);
  return tv.tv_sec*1000 + tv.tv_usec / 1000;
}


#if !defined(USE_POSIX_SEMAPHORES) || !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
#include <errno.h>

#define PRINT_ERROR(func)  perror(func)

char const*  keyFileDir = "/tmp/";

#ifndef USE_POSIX_SEMAPHORES
#include <signal.h>
static void alarm_handler(int)
{}

class moduleInitializer
{

public:
  moduleInitializer()
  {

    static struct sigaction sigact;
    sigact.sa_handler = alarm_handler;
    ::sigaction(SIGALRM, &sigact, NULL);
  }
};

static moduleInitializer initializer; // install SIGLARM handler
#endif // USE_POSIX_SEMAPHORES

#endif // use SysV primitives



#if !defined(USE_POSIX_MMAP) || !USE_POSIX_MMAP
bool dbSharedMemory::open(char const* name, size_t size)
{
  char* fileName = (char*)name;

  if (strchr(name, '/') == NULL)
  {
    fileName = new char[strlen(name)+strlen(keyFileDir)+1];
    sprintf(fileName, "%s%s", keyFileDir, name);
  }

  int fd = ::open(fileName, O_RDWR|O_CREAT, 0777);

  if (fd < 0)
  {
    if (fileName != name)
    {
      delete[] fileName;
    }

    return false;
  }

  ::close(fd);
  int key = ftok(fileName, '0');

  if (fileName != name)
  {
    delete[] fileName;
  }

  if (key < 0)
  {
    return false;
  }

  shm = shmget(key, DOALIGN(size, 4096), IPC_CREAT|0777);

  if (shm < 0)
  {
    return false;
  }

  ptr = (char*)shmat(shm, NULL, 0);
  return (ptr != (char*)-1);
}

void dbSharedMemory::close()
{
  shmdt((char*)ptr);
}

void dbSharedMemory::erase()
{
  close();
  shmctl(shm, IPC_RMID, NULL);
}

#endif // use SysV shmat


////////////////////////////////////////////////////////////////////////
// If we are to use the local implementation of dbSemaphore and dbEvent
//    (which currently uses SysV based semaphores)
#ifndef  USE_POSIX_SEMAPHORES
int sem_init(int& sem, char const* name, unsigned init_value)
{
  key_t key = IPC_PRIVATE;
  int semid;

  struct sembuf sops[3];
  sops[0].sem_num = 1;
  sops[0].sem_op  = 0; /* check if semaphore was already initialized */
  sops[0].sem_flg = IPC_NOWAIT;
  sops[1].sem_num = 1;
  sops[1].sem_op  = 1; /* mark semaphore as initialized */
  sops[1].sem_flg = 0;
  sops[2].sem_num = 0;
  sops[2].sem_op  = init_value;
  sops[2].sem_flg = 0;

  if (name != NULL)
  {
    int fd;
    char* path = (char*)name;

    if (strchr(name, '/') == NULL)
    {
      path = new char[strlen(name)+strlen(keyFileDir)+1];
      sprintf(path, "%s%s", keyFileDir, name);
    }

    fd = open(path, O_WRONLY|O_CREAT, 0777);

    if (fd < 0)
    {
      if (path != name)
      {
        delete[] path;
      }

      PRINT_ERROR("open");
      return -1;
    }

    close(fd);
    key = ftok(path, '0');

    if (path != name)
    {
      delete[] path;
    }

    if (key < 0)
    {
      PRINT_ERROR("ftok");
      return -1;
    }
  }

  semid = semget(key, 2, IPC_CREAT|0777);

  if (semid < 0)
  {
    PRINT_ERROR("semget");
    return -1;
  }

  if (semop(semid, sops, itemsof(sops)) && errno != EAGAIN)
  {
    PRINT_ERROR("semop");
    return -1;
  }

  sem = semid;
  return 0;
}

enum wait_status { wait_ok, wait_timeout_expired, wait_error };

static wait_status wait_semaphore(int& sem, unsigned msec,

                                  struct sembuf* sops, int n_sops)
{
  if (msec != INFINITE)
  {

    struct timeval start;

    struct timeval stop;
    gettimeofday(&start, NULL);
    unsigned long usec = start.tv_usec + msec % 1000 * 1000;
    stop.tv_usec = usec % 1000000;
    stop.tv_sec = start.tv_sec + msec / 1000 + usec / 1000000;

    while (true)
    {

      struct itimerval it;
      it.it_interval.tv_sec = 0;
      it.it_interval.tv_usec = 0;
      it.it_value.tv_sec = stop.tv_sec - start.tv_sec;
      it.it_value.tv_usec = stop.tv_usec - start.tv_usec;

      if (stop.tv_usec < start.tv_usec)
      {
        it.it_value.tv_usec += 1000000;
        it.it_value.tv_sec -= 1;
      }

      if (setitimer(ITIMER_REAL, &it, NULL) < 0)
      {
        return wait_error;
      }

      if (semop(sem, sops, n_sops) == 0)
      {
        break;
      }

      if (errno != EINTR)
      {
        return wait_error;
      }

      gettimeofday(&start, NULL);

      if (stop.tv_sec < start.tv_sec ||
          (stop.tv_sec == start.tv_sec && stop.tv_usec < start.tv_sec))
      {
        return wait_timeout_expired;
      }
    }
  }
  else
  {
    while (semop(sem, sops, n_sops) < 0)
    {
      if (errno != EINTR)
      {
        return wait_error;
      }
    }
  }

  return wait_ok;
}


bool dbSemaphore::wait(unsigned msec)
{

  static struct sembuf sops[] =
    {
      {
        0, -1, 0
      }
    };

  wait_status ws = wait_semaphore(s, msec, sops, itemsof(sops));
  assert(ws != wait_error);
  return ws == wait_ok;
}

void dbSemaphore::signal(unsigned inc)
{
  if (inc != 0)
  {

    struct sembuf sops[1];
    sops[0].sem_num = 0;
    sops[0].sem_op  = inc;
    sops[0].sem_flg = 0;
    int rc = semop(s, sops, 1);
    assert(rc == 0);
  }
}

#if (defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)) || defined(__FreeBSD__)
/* union semun is defined by including <sys/sem.h> */
#else
union semun {
  int val;

  struct semid_ds* buf;
  unsigned short* array;
};
#endif
static union semun u;


void dbSemaphore::reset()
{
  u.val = 0;
  int rc = semctl(s, 0, SETVAL, u);
  assert(rc >= 0);
}

bool dbSemaphore::open(char const* name, unsigned init_value)
{
  return sem_init(s, name, init_value) == 0;
}

void dbSemaphore::close()
{}

void dbSemaphore::erase()
{
  semctl(s, 0, IPC_RMID, &u);
}

bool dbEvent::wait(unsigned msec)
{

  static struct sembuf sops[] =
    {
      {
        0, -1, 0
      }

      , {0, 1, 0}
    };

  wait_status ws = wait_semaphore(e, msec, sops, itemsof(sops));
  assert(ws != wait_error);
  return ws == wait_ok;
}

void dbEvent::signal()
{

  static struct sembuf sops[] =
    {
      {
        0, 0, IPC_NOWAIT
      }

      , {0, 1, 0}
    };

  int rc = semop(e, sops, itemsof(sops));
  assert(rc == 0 || errno == EAGAIN);
}

void dbEvent::reset()
{

  static struct sembuf sops[] =
    {
      {
        0, -1, IPC_NOWAIT
      }
    };

  int rc = semop(e, sops, itemsof(sops));
  assert(rc == 0 || errno == EAGAIN);
}

bool dbEvent::open(char const* name, bool signaled)
{
  // XXX: sem_init is POSIX, the rest of these calls are SysV.
  return sem_init(e, name, signaled) == 0;
}

void dbEvent::close()
{}

void dbEvent::erase()
{
  semctl(e, 0, IPC_RMID, &u);
}


////////////////////////////////////////////////////////////////////
// dbGLobalCriticalSection local implementation
// If we are on an i386 based platform, we can use the processor primitives
//   (XXX: although do these work with multiprocessor?   shouldn't this be
//    under the non-pthread based implemenation?)

#if defined(__GNUC__) && defined(i386)

void dbGlobalCriticalSection::enter()
{
  int inc = -1;
  __asm__ __volatile__(

    "lock; xadd %0,%1"
  :"=d" (inc), "=m" (*count)
        :"d" (inc), "m" (*count));

  if (inc != 1)
  {

    static struct sembuf sops[] =
      {
        {
          0, -1, 0
        }
      };

    int rc;

    while ((rc = semop(semid, sops, 1)) < 0 && errno == EINTR)

      ;
    assert(rc == 0);
  }
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?