⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 http_server.cpp

📁 最新的版本ACE-5.6.8,刚从外文网上搬下,与大家分享.
💻 CPP
字号:
// $Id: HTTP_Server.cpp 82543 2008-08-06 18:21:48Z parsons $

#ifndef ACE_BUILD_SVC_DLL
#define ACE_BUILD_SVC_DLL
#endif /* ACE_BUILD_SVC_DLL */

#include "ace/OS_NS_string.h"
#include "ace/Get_Opt.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/LOCK_SOCK_Acceptor.h"
#include "ace/Proactor.h"
#include "ace/Signal.h"
#include "ace/Auto_Ptr.h"

#include "IO.h"
#include "HTTP_Server.h"

ACE_RCSID(server, HTTP_Server, "$Id: HTTP_Server.cpp 82543 2008-08-06 18:21:48Z parsons $")

// class is overkill
class JAWS
{
public:
  enum
  {
    JAWS_POOL = 0,
    JAWS_PER_REQUEST = 1
  };

  enum
  {
    JAWS_SYNCH = 0,
    JAWS_ASYNCH = 2
  };
};

void
HTTP_Server::parse_args (int argc, ACE_TCHAR *argv[])
{
  int c;
  int thr_strategy = 0;
  int io_strategy = 0;
  const ACE_TCHAR *prog = argc > 0 ? argv[0] : ACE_TEXT ("HTTP_Server");

  // Set some defaults
  this->port_ = 0;
  this->threads_ = 0;
  this->backlog_ = 0;
  this->throttle_ = 0;
  this->caching_ = true;

  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:n:t:i:b:c:"));

  while ((c = get_opt ()) != -1)
    switch (c)
      {
      case 'p':
        this->port_ = ACE_OS::atoi (get_opt.opt_arg ());
        break;
      case 'n':
        this->threads_ = ACE_OS::atoi (get_opt.opt_arg ());
        break;
      case 't':
        // POOL        -> thread pool
        // PER_REQUEST -> thread per request
        // THROTTLE    -> thread per request with throttling
        if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("POOL")) == 0)
          thr_strategy = JAWS::JAWS_POOL;
        else if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("PER_REQUEST")) == 0)
          {
            thr_strategy = JAWS::JAWS_PER_REQUEST;
            this->throttle_ = 0;
          }
        else if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("THROTTLE")) == 0)
          {
            thr_strategy = JAWS::JAWS_PER_REQUEST;
            this->throttle_ = 1;
          }
        break;
      case 'f':
        if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("THR_BOUND")) == 0)
          {
            // What happened here?
          }
        else if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("THR_DAEMON")) == 0)
          {
          }
        else if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("THR_DETACHED")) == 0)
          {
          }
      case 'i':
        // SYNCH  -> synchronous I/O
        // ASYNCH -> asynchronous I/O
        if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("SYNCH")) == 0)
          io_strategy = JAWS::JAWS_SYNCH;
        else if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("ASYNCH")) == 0)
          io_strategy = JAWS::JAWS_ASYNCH;
        break;
      case 'b':
        this->backlog_ = ACE_OS::atoi (get_opt.opt_arg ());
        break;
      case 'c':
        if (ACE_OS::strcmp (get_opt.opt_arg (), ACE_TEXT ("NO_CACHE")) == 0)
          this->caching_ = false;
        else
          this->caching_ = true;
        break;
        default:
        break;
      }

  // No magic numbers.
  if (this->port_ <= 0)
    this->port_ = 5432;
  if (this->threads_ <= 0)
    this->threads_ = 5;
  // Don't use number of threads as default
  if (this->backlog_ <= 0)
    this->backlog_ = this->threads_;

  this->strategy_ = thr_strategy | io_strategy;

  ACE_UNUSED_ARG (prog);
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("in HTTP_Server::init, %s port = %d, ")
              ACE_TEXT ("number of threads = %d\n"),
              prog, this->port_, this->threads_));
}

int
HTTP_Server::init (int argc, ACE_TCHAR *argv[])
  // Document this function
{
  // Ignore signals generated when a connection is broken unexpectedly.
  ACE_Sig_Action sig ((ACE_SignalHandler) SIG_IGN, SIGPIPE);
  ACE_UNUSED_ARG (sig);

  // Parse arguments which sets the initial state.
  this->parse_args (argc, argv);

  //If the IO strategy is synchronous (SYNCH case), then choose a handler
  //factory based on the desired caching scheme
  HTTP_Handler_Factory *f = 0;

  if (this->strategy_ != (JAWS::JAWS_POOL | JAWS::JAWS_ASYNCH))
    {
      if (this->caching_)
        {
          ACE_NEW_RETURN (f, Synch_HTTP_Handler_Factory (), -1);
        }
      else
        {
          ACE_NEW_RETURN (f, No_Cache_Synch_HTTP_Handler_Factory (), -1);
        }
    }

  //NOTE: At this point f better not be a NULL pointer,
  //so please do not change the ACE_NEW_RETURN macros unless
  //you know what you are doing
  ACE_Auto_Ptr<HTTP_Handler_Factory> factory (f);


  // Choose what concurrency strategy to run.
  switch (this->strategy_)
    {
    case (JAWS::JAWS_POOL | JAWS::JAWS_ASYNCH) :
      return this->asynch_thread_pool ();

    case (JAWS::JAWS_PER_REQUEST | JAWS::JAWS_SYNCH) :
      return this->thread_per_request (*factory.get ());

    case (JAWS::JAWS_POOL | JAWS::JAWS_SYNCH) :
    default:
      return this->synch_thread_pool (*factory.get ());
    }

  ACE_NOTREACHED (return 0);
}

int
HTTP_Server::fini (void)
{
  this->tm_.close ();
  return 0;
}


int
HTTP_Server::synch_thread_pool (HTTP_Handler_Factory &factory)
{
  // Main thread opens the acceptor
  if (this->acceptor_.open (ACE_INET_Addr (this->port_), 1,
                            PF_INET, this->backlog_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                       ACE_TEXT ("HTTP_Acceptor::open")), -1);

  // Create a pool of threads to handle incoming connections.
  Synch_Thread_Pool_Task t (this->acceptor_, this->tm_, this->threads_, factory);

  this->tm_.wait ();
  return 0;
}

Synch_Thread_Pool_Task::Synch_Thread_Pool_Task (HTTP_Acceptor &acceptor,
                                                ACE_Thread_Manager &tm,
                                                int threads,
                                                HTTP_Handler_Factory &factory)
  : ACE_Task<ACE_NULL_SYNCH> (&tm),
    acceptor_ (acceptor),
    factory_ (factory)
{
  if (this->activate (THR_DETACHED | THR_NEW_LWP, threads) == -1)
    ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
                ACE_TEXT ("Synch_Thread_Pool_Task::open")));
}

int
Synch_Thread_Pool_Task::svc (void)
{
  // Creates a factory of HTTP_Handlers binding to synchronous I/O strategy
  //Synch_HTTP_Handler_Factory factory;

  for (;;)
    {
      ACE_SOCK_Stream stream;

      // Lock in this accept.  When it returns, we have a connection.
      if (this->acceptor_.accept (stream) == -1)
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT("%p\n"),
                           ACE_TEXT ("HTTP_Acceptor::accept")), -1);

      ACE_Message_Block *mb;
      ACE_NEW_RETURN (mb,
                      ACE_Message_Block (HTTP_Handler::MAX_REQUEST_SIZE + 1),
                      -1);

      // Create an HTTP Handler to handle this request
      HTTP_Handler *handler = this->factory_.create_http_handler ();
      handler->open (stream.get_handle (), *mb);
      // Handler is destroyed when the I/O puts the Handler into the
      // done state.

      mb->release ();
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT (" (%t) in Synch_Thread_Pool_Task::svc, recycling\n")));
    }

  ACE_NOTREACHED(return 0);
}

int
HTTP_Server::thread_per_request (HTTP_Handler_Factory &factory)
{
  int grp_id = -1;

  // thread per request
  // Main thread opens the acceptor
  if (this->acceptor_.open (ACE_INET_Addr (this->port_), 1,
                            PF_INET, this->backlog_) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                       ACE_TEXT ("HTTP_Acceptor::open")), -1);

  ACE_SOCK_Stream stream;

  // When we are throttling, this is the amount of time to wait before
  // checking for runnability again.
  const ACE_Time_Value wait_time (0, 10);

  for (;;)
    {
      if (this->acceptor_.accept (stream) == -1)
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                           ACE_TEXT ("HTTP_Acceptor::accept")), -1);

      Thread_Per_Request_Task *t;
      // Pass grp_id as a constructor param instead of into open.
      ACE_NEW_RETURN (t, Thread_Per_Request_Task (stream.get_handle (),
                                                  this->tm_,
                                                  grp_id,
                                                  factory),
                      -1);


      if (t->open () != 0)
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                           ACE_TEXT ("Thread_Per_Request_Task::open")),
                          -1);

      // Throttling is not allowing too many threads to run away.
      // Should really use some sort of condition variable here.
      if (!this->throttle_)
        continue;

      // This works because each task has only one thread.
      while (this->tm_.num_tasks_in_group (grp_id) > this->threads_)
        this->tm_.wait (&wait_time);
    }

  ACE_NOTREACHED(return 0);
}

Thread_Per_Request_Task::Thread_Per_Request_Task (ACE_HANDLE handle,
                                                  ACE_Thread_Manager &tm,
                                                  int &grp_id,
                                                  HTTP_Handler_Factory &factory)
  : ACE_Task<ACE_NULL_SYNCH> (&tm),
    handle_ (handle),
    grp_id_ (grp_id),
    factory_ (factory)
{
}


// HEY!  Add a method to the thread_manager to return total number of
// threads managed in all the tasks.

int
Thread_Per_Request_Task::open (void *)
{
  int status = -1;

  if (this->grp_id_ == -1)
    status = this->grp_id_ = this->activate (THR_DETACHED | THR_NEW_LWP);
  else
    status = this->activate (THR_DETACHED | THR_NEW_LWP,
                             1, 0, -1, this->grp_id_, 0);

  if (status == -1)
    ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                       ACE_TEXT ("Thread_Per_Request_Task::open")),
                      -1);
  return 0;
}

int
Thread_Per_Request_Task::svc (void)
{
  ACE_Message_Block *mb;
  ACE_NEW_RETURN (mb, ACE_Message_Block (HTTP_Handler::MAX_REQUEST_SIZE + 1),
                  -1);
  //Synch_HTTP_Handler_Factory factory;
  HTTP_Handler *handler = this->factory_.create_http_handler ();
  handler->open (this->handle_, *mb);
  mb->release ();
  return 0;
}

int
Thread_Per_Request_Task::close (u_long)
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT (" (%t) Thread_Per_Request_Task::svc, dying\n")));
  delete this;
  return 0;
}

// Understanding the code below requires understanding of the
// WindowsNT asynchronous completion notification mechanism and the
// Proactor Pattern.

// (1) The application submits an asynchronous I/O request to the
//     operating system and a special handle with it (Asynchronous
//     Completion Token).
// (2) The operating system commits to performing the I/O request,
//     while application does its own thing.
// (3) Operating system finishes the I/O request and places ACT onto
//     the I/O Completion Port, which is a queue of finished
//     asynchronous requests.
// (4) The application eventually checks to see if the I/O request
//     is done by checking the I/O Completion Port, and retrieves the
//     ACT.

int
HTTP_Server::asynch_thread_pool (void)
{
// This only works on Win32
#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
  // Create the appropriate acceptor for this concurrency strategy and
  // an appropriate handler for this I/O strategy
  ACE_Asynch_Acceptor<Asynch_HTTP_Handler_Factory> acceptor;

  // Tell the acceptor to listen on this->port_, which makes an
  // asynchronous I/O request to the OS.
  if (acceptor.open (ACE_INET_Addr (this->port_),
                     HTTP_Handler::MAX_REQUEST_SIZE + 1) == -1)
    ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                       ACE_TEXT ("ACE_Asynch_Acceptor::open")), -1);

  // Create the thread pool.
  // Register threads with the proactor and thread manager.
  Asynch_Thread_Pool_Task t (*ACE_Proactor::instance (),
                             this->tm_);

  // The proactor threads are waiting on the I/O Completion Port.

  // Wait for the threads to finish.
  return this->tm_.wait ();
#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
  return -1;
}

// This only works on Win32
#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)

Asynch_Thread_Pool_Task::Asynch_Thread_Pool_Task (ACE_Proactor &proactor,
                                                  ACE_Thread_Manager &tm)
  : ACE_Task<ACE_NULL_SYNCH> (&tm),
    proactor_ (proactor)
{
  if (this->activate () == -1)
    ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
                ACE_TEXT ("Asynch_Thread_Pool_Task::open")));
}

int
Asynch_Thread_Pool_Task::svc (void)
{
  for (;;)
    if (this->proactor_.handle_events () == -1)
      ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                         ACE_TEXT ("ACE_Proactor::handle_events")),
                        -1);

  return 0;
}

#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */

// Define the factory function.
ACE_SVC_FACTORY_DEFINE (HTTP_Server)

// Define the object that describes the service.
ACE_STATIC_SVC_DEFINE (HTTP_Server, ACE_TEXT ("HTTP_Server"), ACE_SVC_OBJ_T,
                       &ACE_SVC_NAME (HTTP_Server),
                       ACE_Service_Type::DELETE_THIS
                       | ACE_Service_Type::DELETE_OBJ, 0)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -