⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 w32sock.cpp

📁 FastDb是高效的内存数据库系统
💻 CPP
📖 第 1 页 / 共 2 页
字号:
  while (size < min_size && state == ss_open)
  {
    RcvBuf->RcvWaitFlag = true;
    serialize();
    size_t begin = RcvBuf->DataBeg;
    size_t end = RcvBuf->DataEnd;
    size_t rcv_size = (begin <= end)
                      ? end - begin : sizeof(RcvBuf->Data) - begin;

    if (rcv_size > 0)
    {
      RcvBuf->RcvWaitFlag = false;

      if (rcv_size >= max_size)
      {
        memcpy(dst, &RcvBuf->Data[begin], max_size);
        begin += max_size;
        size += max_size;
      }
      else
      {
        memcpy(dst, &RcvBuf->Data[begin], rcv_size);
        begin += rcv_size;
        dst += rcv_size;
        size += rcv_size;
        max_size -= rcv_size;
      }

      RcvBuf->DataBeg = (begin == sizeof(RcvBuf->Data)) ? 0 : begin;

      if (RcvBuf->SndWaitFlag)
      {
        SetEvent(Signal[RTR]);
      }
    }
    else
    {
      HANDLE h[2];
      h[0] = Signal[RD];
      h[1] = Mutex;
      int rc = WaitForMultipleObjects(2, h, false, timeout);
      RcvBuf->RcvWaitFlag = false;

      if (rc != WAIT_OBJECT_0)
      {
        if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1)
        {
          Error = broken_pipe;
          ReleaseMutex(Mutex);
        }
        else if (rc == WAIT_TIMEOUT)
        {
          return size;
        }
        else
        {
          Error = GetLastError();
        }

        return -1;
      }

      if (timeout != WAIT_FOREVER)
      {
        time_t now = time(NULL);
        timeout = timeout >= (now - start)*1000
                  ? timeout - (now - start)*1000 : 0;
      }
    }
  }

  return size < min_size ? -1 : (int)size;
}


bool local_win_socket::write(const void* buf, size_t size)
{
  char* src = (char*)buf;
  Error = ok;

  while (size > 0 && state == ss_open)
  {
    SndBuf->SndWaitFlag = true;
    serialize();
    size_t begin = SndBuf->DataBeg;
    size_t end = SndBuf->DataEnd;
    size_t snd_size = (begin <= end)
                      ? sizeof(SndBuf->Data) - end - (begin == 0)
                      : begin - end - 1;

    if (snd_size > 0)
    {
      SndBuf->SndWaitFlag = false;

      if (snd_size >= size)
      {
        memcpy(&SndBuf->Data[end], src, size);
        end += size;
        size = 0;
      }
      else
      {
        memcpy(&SndBuf->Data[end], src, snd_size);
        end += snd_size;
        src += snd_size;
        size -= snd_size;
      }

      SndBuf->DataEnd = (end == sizeof(SndBuf->Data)) ? 0 : end;

      if (SndBuf->RcvWaitFlag)
      {
        SetEvent(Signal[TD]);
      }
    }
    else
    {
      HANDLE h[2];
      h[0] = Signal[RTT];
      h[1] = Mutex;
      int rc = WaitForMultipleObjects(2, h, false, INFINITE);
      RcvBuf->SndWaitFlag = false;

      if (rc != WAIT_OBJECT_0)
      {
        if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1)
        {
          Error = broken_pipe;
          ReleaseMutex(Mutex);
        }
        else
        {
          Error = GetLastError();
        }

        return false;
      }
    }
  }

  return size == 0;
}

#define MAX_ADDRESS_LEN 64

local_win_socket::local_win_socket(const char* address)
{
  Name = new char[strlen(address)+1];
  strcpy(Name, address);
  Error = not_opened;
  Mutex = NULL;
}

bool local_win_socket::open(int)
{
  char buf[MAX_ADDRESS_LEN];
  int  i;

  for (i = RD; i <= RTT; i++)
  {
    sprintf(buf, "%s.%c", Name, i + '0');
    Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, buf);

    if (GetLastError() == ERROR_ALREADY_EXISTS)
    {
      WaitForSingleObject(Signal[i], 0);
    }

    if (!Signal[i])
    {
      Error = GetLastError();

      while (--i >= 0)
      {
        CloseHandle(Signal[i]);
      }

      return false;
    }
  }

  sprintf(buf, "%s.shr", Name);
  BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
                             0, sizeof(socket_buf)*2, buf);

  if (!BufHnd)
  {
    Error = GetLastError();

    for (i = RD; i <= RTT; i++)
    {
      CloseHandle(Signal[i]);
    }

    return false;
  }

  RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);

  if (!RcvBuf)
  {
    Error = GetLastError();
    CloseHandle(BufHnd);

    for (i = RD; i <= RTT; i++)
    {
      CloseHandle(Signal[i]);
    }

    return false;
  }

  SndBuf = RcvBuf+1;
  RcvBuf->DataBeg = RcvBuf->DataEnd = 0;
  SndBuf->DataBeg = SndBuf->DataEnd = 0;
  Error = ok;
  state = ss_open;
  return true;
}

local_win_socket::local_win_socket()
{
  int i;
  BufHnd = NULL;
  Mutex = NULL;
  Name = NULL;

  for (i = RD; i <= RTT; i++)
  {
    Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, NULL);

    if (!Signal[i])
    {
      Error = GetLastError();

      while (--i >= 0)
      {
        CloseHandle(Signal[i]);
      }

      return;
    }
  }

  // create anonymous shared memory section
  BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
                             0, sizeof(socket_buf)*2, NULL);

  if (!BufHnd)
  {
    Error = GetLastError();

    for (i = RD; i <= RTT; i++)
    {
      CloseHandle(Signal[i]);
    }

    return;
  }

  RcvBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);

  if (!RcvBuf)
  {
    Error = GetLastError();
    CloseHandle(BufHnd);

    for (i = RD; i <= RTT; i++)
    {
      CloseHandle(Signal[i]);
    }

    BufHnd = NULL;
    return;
  }

  SndBuf = RcvBuf+1;
  RcvBuf->DataBeg = RcvBuf->DataEnd = 0;
  SndBuf->DataBeg = SndBuf->DataEnd = 0;
  Error = ok;
  state = ss_open;
}

local_win_socket::~local_win_socket()
{
  close();
  delete[] Name;
}

socket_t* local_win_socket::accept()
{
  HANDLE h[2];

  if (state != ss_open)
  {
    return NULL;
  }

  connect_data* cdp = (connect_data*)SndBuf->Data;
  cdp->Pid = GetCurrentProcessId();
  cdp->Mutex = WatchDogMutex;

  while (true)
  {
    SetEvent(Signal[RTR]);
    int rc = WaitForSingleObject(Signal[RD], ACCEPT_TIMEOUT);

    if (rc == WAIT_OBJECT_0)
    {
      if (state != ss_open)
      {
        Error = not_opened;
        return NULL;
      }

      Error = ok;
      break;
    }
    else if (rc != WAIT_TIMEOUT)
    {
      Error = GetLastError();
      return NULL;
    }
  }

  local_win_socket* sock = new local_win_socket();
  sock->Mutex = ((connect_data*)RcvBuf->Data)->Mutex;
  accept_data* adp = (accept_data*)SndBuf->Data;
  adp->BufHnd = sock->BufHnd;

  for (int i = RD; i <= RTT; i++)
  {
    adp->Signal[(i + TD - RD) & RTT] = sock->Signal[i];
  }

  SetEvent(Signal[TD]);
  h[0] = Signal[RD];
  h[1] = sock->Mutex;
  int rc = WaitForMultipleObjects(2, h, false, INFINITE);

  if (rc != WAIT_OBJECT_0)
  {
    if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1)
    {
      Error = broken_pipe;
      ReleaseMutex(Mutex);
    }
    else
    {
      Error = GetLastError();
    }

    delete sock;
    return NULL;
  }

  return sock;
}

bool local_win_socket::cancel_accept()
{
  state = ss_shutdown;
  SetEvent(Signal[RD]);
  SetEvent(Signal[RTT]);
  return true;
}

char* local_win_socket::get_peer_name()
{
  if (state != ss_open)
  {
    Error = not_opened;
    return NULL;
  }

  char* addr = "127.0.0.1";
  char* addr_copy = new char[strlen(addr)+1];
  strcpy(addr_copy, addr);
  Error = ok;
  return addr_copy;
}

bool local_win_socket::is_ok()
{
  return !Error;
}

bool local_win_socket::close()
{
  if (state != ss_close)
  {
    state = ss_close;

    if (Mutex)
    {
      CloseHandle(Mutex);
    }

    for (int i = RD; i <= RTT; i++)
    {
      CloseHandle(Signal[i]);
    }

    UnmapViewOfFile(RcvBuf < SndBuf ? RcvBuf : SndBuf);
    CloseHandle(BufHnd);
    Error = not_opened;
  }

  return true;
}

void local_win_socket::get_error_text(char* buf, size_t buf_size)
{
  switch (Error)
  {

  case ok:
    strncpy(buf, "ok", buf_size);
    break;

  case not_opened:
    strncpy(buf, "socket not opened", buf_size);
    break;

  case broken_pipe:
    strncpy(buf, "connection is broken", buf_size);
    break;

  case timeout_expired:
    strncpy(buf, "connection timeout expired", buf_size);
    break;

  default:
#ifndef PHAR_LAP

    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
                  NULL,
                  Error,
                  0,
                  buf,
                  buf_size,
                  NULL);
#else

    strncpy(buf, "Unknown socket error", buf_size);
#endif

  }
}


bool local_win_socket::shutdown()
{
  if (state == ss_open)
  {
    state = ss_shutdown;
    SetEvent(Signal[RD]);
    SetEvent(Signal[RTT]);
  }

  return true;
}

bool local_win_socket::connect(int max_attempts, time_t timeout)
{
  char buf[MAX_ADDRESS_LEN];
  int  rc, i, error_code;
  HANDLE h[2];

  for (i = RD; i <= RTT; i++)
  {
    sprintf(buf, "%s.%c", Name, ((i + TD - RD) & RTT) + '0');
    Signal[i] = CreateEvent(FASTDB_SECURITY_ATTRIBUTES, false, false, buf);

    if (!Signal[i])
    {
      Error = GetLastError();

      while (--i >= 0)
      {
        CloseHandle(Signal[i]);
      }

      return false;
    }
  }

  sprintf(buf, "%s.shr", Name);
  BufHnd = CreateFileMapping(INVALID_HANDLE_VALUE, FASTDB_SECURITY_ATTRIBUTES, PAGE_READWRITE,
                             0, sizeof(socket_buf)*2, buf);

  if (!BufHnd)
  {
    Error = GetLastError();

    for (i = RD; i <= RTT; i++)
    {
      CloseHandle(Signal[i]);
    }

    return false;
  }

  SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);

  if (!SndBuf)
  {
    Error = GetLastError();

    for (i = RD; i <= RTT; i++)
    {
      CloseHandle(Signal[i]);
    }

    CloseHandle(BufHnd);
    return false;
  }

  RcvBuf = SndBuf+1;
  state = ss_shutdown;
  Mutex = NULL;

  rc = WaitForSingleObject(Signal[RTT],timeout*max_attempts*MILLISECOND);

  if (rc != WAIT_OBJECT_0)
  {
    error_code = rc == WAIT_TIMEOUT ? timeout_expired : GetLastError();
    close();
    Error = error_code;
    return false;
  }

  connect_data* cdp = (connect_data*)RcvBuf->Data;
  HANDLE hServer = OpenProcess(STANDARD_RIGHTS_REQUIRED|PROCESS_DUP_HANDLE,
                               false, cdp->Pid);

  if (!hServer)
  {
    error_code = GetLastError();
    close();
    Error = error_code;
    return false;
  }

  HANDLE hSelf = GetCurrentProcess();

  if (!DuplicateHandle(hServer, cdp->Mutex, hSelf, &Mutex,
                       0, false, DUPLICATE_SAME_ACCESS) ||
      !DuplicateHandle(hSelf, WatchDogMutex, hServer,
                       &((connect_data*)SndBuf->Data)->Mutex,
                       0, false, DUPLICATE_SAME_ACCESS))
  {
    error_code = GetLastError();
    CloseHandle(hServer);
    close();
    Error = error_code;
    return false;
  }

  SetEvent(Signal[TD]);
  h[0] = Signal[RD];
  h[1] = Mutex;
  rc = WaitForMultipleObjects(2, h, false, INFINITE);

  if (rc != WAIT_OBJECT_0)
  {
    if (rc == WAIT_OBJECT_0+1 || rc == WAIT_ABANDONED+1)
    {
      error_code = broken_pipe;
      ReleaseMutex(Mutex);
    }
    else
    {
      error_code = GetLastError();
    }

    CloseHandle(hServer);
    close();
    Error = error_code;
    return false;
  }

  accept_data ad = *(accept_data*)RcvBuf->Data;

  SetEvent(Signal[TD]);

  for (i = RD; i <= RTT; i++)
  {
    CloseHandle(Signal[i]);
  }

  UnmapViewOfFile(SndBuf);
  CloseHandle(BufHnd);
  BufHnd = NULL;

  if (!DuplicateHandle(hServer, ad.BufHnd, hSelf, &BufHnd,
                       0, false, DUPLICATE_SAME_ACCESS))
  {
    Error = GetLastError();
    CloseHandle(hServer);
    CloseHandle(Mutex);
    return false;
  }
  else
  {
    for (i = RD; i <= RTT; i++)
    {
      if (!DuplicateHandle(hServer, ad.Signal[i],
                           hSelf, &Signal[i],
                           0, false, DUPLICATE_SAME_ACCESS))
      {
        Error = GetLastError();
        CloseHandle(hServer);
        CloseHandle(BufHnd);
        CloseHandle(Mutex);

        while (--i >= 0)
          CloseHandle(Signal[1]);

        return false;
      }
    }
  }

  CloseHandle(hServer);

  SndBuf = (socket_buf*)MapViewOfFile(BufHnd, FILE_MAP_ALL_ACCESS, 0, 0, 0);

  if (!SndBuf)
  {
    Error = GetLastError();
    CloseHandle(BufHnd);
    CloseHandle(Mutex);

    for (i = RD; i <= RTT; i++)
    {
      CloseHandle(Signal[i]);
    }

    return false;
  }

  RcvBuf = SndBuf+1;
  Error = ok;
  state = ss_open;
  return true;
}

int local_win_socket::get_handle()
{
  return -1;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -