wwwapi.cpp

来自「FastDb是高效的内存数据库系统」· C++ 代码 · 共 1,062 行 · 第 1/2 页

CPP
1,062
字号
    *(int4*)con.reply_buf = con.reply_buf_used;
    con.sock->write(con.reply_buf, con.reply_buf_used);
  }

  delete[] con.peer;
  con.peer = NULL;
  delete con.sock;
  con.sock = NULL; // close connection
  return result;
}

inline char* stristr(char* s, char* p)
{
  while (*s != '\0')
  {
    int i;

    for (i = 0; (s[i] & ~('a'-'A')) == (p[i] & ~('a' - 'A')) && p[i] != '\0'; i++)

      ;
    if (p[i] == '\0')
    {
      return s;
    }

    s += 1;
  }

  return NULL;
}

bool HTTPapi::serve(WWWconnection& con)
{
  const size_t inputBufferSize = 16*1024;
  char buf[inputBufferSize];
  bool result = false;
  size_t size = 0;

  con.peer = con.sock->get_peer_name();

  while (true)
  {
    con.reset();
    char* p = buf;
    char prev_ch = 0;

    do
    {
      if (p == buf + size)
      {
        int rc = con.sock->read(buf + size, 5, sizeof(buf) - size - 1,
                                connectionHoldTimeout);

        if (rc < 0)
        {
          delete con.sock;
          con.sock = NULL;
          return true;
        }

        if (rc < 5)
        {
          con.append(ERROR_TEXT("200 OK")); // connection closed due to timeout expiration
          break;
        }

        size += rc;
      }

      buf[size] = '\0';

      while (*p != '\0' && (prev_ch != '\n' || *p != '\r'))
      {
        prev_ch = *p++;
      }
    }
    while (*p == '\0' && p == buf + size); // p now points to the message body

    if (*p != '\r' || *(p+1) != '\n')
    {
      con.append(ERROR_TEXT("400 Bad Request"));
      break;
    }

    p += 2;
    int length = INT_MAX;
    char* lenptr = stristr(buf, "content-length: ");
    bool  persistentConnection =
      stristr(buf, "Connection: keep-alive") != NULL;
    char* host = stristr(buf, "host: ");

    if (host != NULL)
    {
      char* q = host += 6;

      while (*q != '\n' && *q != '\r' && *q != '\0')
        q += 1;

      *q = '\0';
    }

    if (lenptr != NULL)
    {
      sscanf(lenptr+15, "%d", &length);
    }

    if (strncmp(buf, "GET ", 4) == 0)
    {
      char* file, *uri = buf;
      file = strchr(uri, '/');

      if (file == NULL)
      {
        con.append(ERROR_TEXT("400 Bad Request"));
        break;
      }

      if (*++file == '/')
      {
        if (host == NULL)
        {
          host = file+1;
        }

        file = strchr(uri, '/');

        if (file == NULL)
        {
          con.append(ERROR_TEXT("400 Bad Request"));
          break;
        }

        *file++ = '\0';
      }

      char* file_end = strchr(file, ' ');
      char index_html[] = "index.html";

      if (file_end == NULL)
      {
        con.append(ERROR_TEXT("400 Bad Request"));
        break;
      }

      if (file_end == file)
      {
        file = index_html;
      }
      else
      {
        *file_end = '\0';
      }

      if (host == NULL)
      {
        host = "localhost";
      }

      char* params = strchr(file, '?');

      if (params != NULL)
      {
        if (!handleRequest(con, params+1, file_end, host, result))
        {
          delete con.sock;
          con.sock = NULL;
          return result;
        }
      }
      else
      {
        URL2ASCII(file);
        FILE* f = fopen(file, "rb");

        if (f == NULL)
        {
          if (strcmp(file, index_html) == 0)
          {
            static char defaultPage[] = "page=defaultPage";

            if (!handleRequest(con, defaultPage, defaultPage + strlen(defaultPage), host, result))
            {
              delete con.sock;
              con.sock = NULL;
              return result;
            }
          }
          else
          {
            con.append(ERROR_TEXT("404 File Not Found"));
            break;
          }
        }
        else
        {
          fseek(f, 0, SEEK_END);
          size_t file_size = ftell(f);
          fseek(f, 0, SEEK_SET);
          char reply[1024];
          sprintf(reply, "HTTP/1.1 200 OK\r\nContent-Length: %u\r\n"
                  "Content-Type: text/html\r\nConnection: %s\r\n\r\n",
                  file_size,
                  keepConnectionAlive ? "Keep-Alive" : "close");
          con.append(reply);
          size_t pos = con.reply_buf_used;
          char* dst = con.extendBuffer(file_size);

          if (dst == NULL)
          {
            con.reset();
            con.append(ERROR_TEXT("413 Request Entity Too Large"));
            break;
          }

          if (fread(dst + pos, 1, file_size, f) != file_size)
          {
            con.reset();
            con.append(ERROR_TEXT("500 Internal server error"));
            break;
          }

          fclose(f);

          if (!con.sock->write(dst, con.reply_buf_used)
              || !keepConnectionAlive)
          {
            delete con.sock;
            con.sock = NULL;
            return true;
          }
        }
      }
    }
    else if (strncmp(buf, "POST ", 5) == 0)
    {
      char* body = p;

ScanNextPart:
      int n = length < buf + size - p
              ? length : buf + size - p;

      while (--n >= 0 && *p != '\r' && *p != '\n')
      {
        p += 1;
      }

      if (n < 0 && p - body != length)
      {
        if (size >= sizeof(buf) - 1)
        {
          con.append(ERROR_TEXT("413 Request Entity Too Large"));
          break;
        }

        int rc = con.sock->read(p, 1, sizeof(buf) - size - 1,
                                connectionHoldTimeout);

        if (rc < 0)
        {
          delete con.sock;
          con.sock = NULL;
          return true;
        }

        size += rc;
        goto ScanNextPart;
      }
      else
      {
        if (host == NULL)
        {
          host = "localhost";
        }

        if (!handleRequest(con, body, p, host, result))
        {
          delete con.sock;
          con.sock = NULL;
          return result;
        }

        while (n >= 0 && (*p == '\n' || *p == '\r'))
        {
          p += 1;
          n -= 1;
        }
      }
    }
    else
    {
      con.append(ERROR_TEXT("405 Method not allowed"));
      break;
    }

    if (!persistentConnection)
    {
      delete con.sock;
      con.sock = NULL;
      return true;
    }

    if (p - buf < (long)size)
    {
      size -= p - buf;
      memcpy(buf, p, size);
    }
    else
    {
      size = 0;
    }
  }

  if (con.sock != NULL)
  {
    con.sock->write(con.reply_buf, con.reply_buf_used);
    con.sock->shutdown();
    delete con.sock;
    con.sock = NULL;
  }

  return true;
}


bool HTTPapi::handleRequest(WWWconnection& con, char* begin, char* end,
                            char* host, bool& result)
{
  char buf[64];
  char ch = *end;
  char* page = con.unpack(begin, end - begin);

  if (page != NULL)
  {
    con.append("HTTP/1.1 200 OK\r\nContent-Length:       \r\n");
    int length_pos = con.reply_buf_used - 8;
    con.append(keepConnectionAlive
               ? "Connection: Keep-Alive\r\n"
               : "Connection: close\r\n");
    sprintf(buf, "http://%s/", host);
    con.stub = buf;
    result = dispatch(con, page);
    char* body = con.reply_buf + length_pos;
    char prev_ch = 0;
    con.reply_buf[con.reply_buf_used] = '\0';

    while ((*body != '\n' || prev_ch != '\n') &&
           (*body != '\r' || prev_ch != '\n') &&
           *body != '\0')
    {
      prev_ch = *body++;
    }

    if (*body == '\0')
    {
      con.reset();
      con.append(ERROR_TEXT("404 Not found"));
      con.sock->write(con.reply_buf, con.reply_buf_used);
      return false;
    }

    body += *body == '\n' ? 1 : 2;
    sprintf(buf, "%u",
            con.reply_buf_used - (body - con.reply_buf));
    memcpy(con.reply_buf + length_pos,
           buf, strlen(buf));

    if (!con.sock->write(con.reply_buf, con.reply_buf_used))
    {
      return false;
    }

    *end = ch;
    return result && keepConnectionAlive;
  }
  else
  {
    con.append(ERROR_TEXT("Not acceptable"));
    con.sock->write(con.reply_buf, con.reply_buf_used);
    result = true;
    *end = ch;
    return false;
  }
}


//----------------------------------------------------

void thread_proc QueueManager::handleThread(void* arg)
{
  ((QueueManager*)arg)->handle();
}


QueueManager::QueueManager(WWWapi&     api,
                           dbDatabase& dbase,
                           int         nThreads,
                           int         connectionQueueLen)
    : db(dbase)
{
  assert(nThreads >= 1 && connectionQueueLen >= 1);
  this->nThreads = nThreads;
  go.open();
  done.open();
  threads = new dbThread[nThreads];

  while (--nThreads >= 0)
  {
    threads[nThreads].create(handleThread, this);
    threads[nThreads].detach();
  }

  connectionPool = new WWWconnection[connectionQueueLen];
  connectionPool[--connectionQueueLen].next = NULL;

  while (--connectionQueueLen >= 0)
  {
    connectionPool[connectionQueueLen].next =
      &connectionPool[connectionQueueLen+1];
  }

  freeList = connectionPool;
  waitList = NULL;
  server = &api;
}

void QueueManager::start()
{
  mutex.lock();

  while (server != NULL)
  {
    if (freeList == NULL)
    {
      done.reset();
      done.wait(mutex);

      if (server == NULL)
      {
        break;
      }

      assert(freeList != NULL);
    }

    WWWconnection* con = freeList;
    freeList = con->next;
    WWWapi* srv = server;
    mutex.unlock();

    if (!srv->connect(*con) || server == NULL)
    {
      return;
    }

    mutex.lock();
    con->next = waitList;
    waitList = con;
    go.signal();
  }

  mutex.unlock();
}


void QueueManager::handle()
{
  db.attach();
  mutex.lock();

  while (true)
  {
    go.wait(mutex);
    WWWapi* api = server;

    if (api == NULL)
    {
      break;
    }

    WWWconnection* con = waitList;
    assert(con != NULL);
    waitList = con->next;
    mutex.unlock();

    if (!api->serve(*con))
    {
      stop();
    }

    mutex.lock();

    if (freeList == NULL)
    {
      done.signal();
    }

    con->next = freeList;
    freeList = con;
  }

  mutex.unlock();
  db.detach();
}


void QueueManager::stop()
{
  mutex.lock();
  WWWapi* server = this->server;
  this->server = NULL;
  server->cancel();

  while (--nThreads >= 0)
  {
    go.signal();
  }

  done.signal();
  mutex.unlock();
}


QueueManager::~QueueManager()
{
  go.close();
  done.close();
  delete[] threads;
  delete[] connectionPool;
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?