test_proactor3.cpp

来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 890 行 · 第 1/2 页

CPP
890
字号
// $Id: test_proactor3.cpp 78962 2007-07-20 03:27:14Z sowayaa $// ============================================================================//// = LIBRARY//    examples//// = FILENAME//    test_proactor3.cpp//// = DESCRIPTION//    This program illustrates how the <ACE_Proactor> can be used to//    implement an application that does various asynchronous//    operations.//// = AUTHOR//    Irfan Pyarali <irfan@cs.wustl.edu>//    modified by  Alexander Libman <alibman@baltimore.com>//    from original test_proactor.cpp// ============================================================================#include "ace/Signal.h"#include "ace/Service_Config.h"#include "ace/Proactor.h"#include "ace/Asynch_IO.h"#include "ace/Asynch_IO_Impl.h"#include "ace/Asynch_Acceptor.h"#include "ace/INET_Addr.h"#include "ace/Manual_Event.h"#include "ace/SOCK_Connector.h"#include "ace/SOCK_Acceptor.h"#include "ace/SOCK_Stream.h"#include "ace/Message_Block.h"#include "ace/Get_Opt.h"// FUZZ: disable check_for_streams_include#include "ace/streams.h"#include "ace/Task.h"ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)  // This only works on Win32 platforms and on Unix platforms  // supporting POSIX aio calls.#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)#	include "ace/WIN32_Proactor.h"#elif defined (ACE_HAS_AIO_CALLS)#	include "ace/POSIX_Proactor.h"#	include "ace/SUN_Proactor.h"#endif /* ACE_HAS_WIN32_OVERLAPPED_IO *///  Some debug helper functionsstatic int disable_signal (int sigmin, int sigmax);#if 0static int print_sigmask (void);#endif#define  COUT(X)  cout << X; cout.flush ();// Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB,// 2-SIG, 3-SUNstatic int proactor_type = 0;// POSIX : > 0 max number aio operations  proactor,static int max_aio_operations = 0;// Host that we're connecting to.static ACE_TCHAR *host = 0;// number of Senders instancesstatic int senders = 1;static const int MaxSenders = 100;// duplex mode: ==0 half-duplex//              !=0 full duplexstatic int duplex = 0;// number threads in the Proactor thread poolstatic int threads = 1;// Port that we're receiving connections on.static u_short port = ACE_DEFAULT_SERVER_PORT;class MyTask: public ACE_Task<ACE_MT_SYNCH>{  // = TITLE  //   MyTask plays role for Proactor threads poolpublic:  MyTask (void) : threads_ (0), proactor_ (0) {}  int svc (void);  void waitready (void) { event_.wait (); }private:  ACE_Recursive_Thread_Mutex mutex_;  int threads_;  ACE_Proactor *proactor_;  ACE_Manual_Event event_;  void create_proactor (void);  void delete_proactor (void);};voidMyTask::create_proactor (void){  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);  if (threads_ == 0)    {#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)      ACE_WIN32_Proactor *proactor = new ACE_WIN32_Proactor;      ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=WIN32"));#elif defined (ACE_HAS_AIO_CALLS)      ACE_POSIX_Proactor *proactor = 0;      switch (proactor_type)        {        case 1:	proactor = new ACE_POSIX_AIOCB_Proactor (max_aio_operations);          ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=AIOCB\n"));          break;        case 2:	proactor = new ACE_POSIX_SIG_Proactor;          ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));          break;#  if defined (sun)        case 3:	proactor = new ACE_SUN_Proactor (max_aio_operations);          ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SUN\n"));          break;#  endif /* sun */        default:proactor = new ACE_POSIX_SIG_Proactor;          ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));          break;        }#endif      proactor_ = new ACE_Proactor (proactor, 1);      ACE_Proactor::instance(proactor_);      event_.signal ();    }  threads_++;}voidMyTask::delete_proactor (void){  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);  if (--threads_ == 0)    {      ACE_DEBUG ((LM_DEBUG, "(%t) Delete Proactor\n"));      ACE_Proactor::instance ((ACE_Proactor *) 0);      delete proactor_;      proactor_ = 0;    }}intMyTask::svc (void){  ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));  create_proactor ();  disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);  while (ACE_Proactor::event_loop_done () == 0)    ACE_Proactor::run_event_loop ();  delete_proactor ();  ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));  return 0;}class Receiver : public ACE_Service_Handler{public:  Receiver (void);  ~Receiver (void);  //FUZZ: disable check_for_lack_ACE_OS  virtual void open (ACE_HANDLE handle,                     ACE_Message_Block &message_block);  // This is called after the new connection has been accepted.  //FUZZ: enable check_for_lack_ACE_OS  static long get_number_sessions (void) { return sessions_; }protected:  // These methods are called by the framework  virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);  // This is called when asynchronous <read> operation from the socket  // complete.  virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);  // This is called when an asynchronous <write> to the file  // completes.private:  int  initiate_read_stream (void);  int  initiate_write_stream (ACE_Message_Block & mb, int nBytes);  int check_destroy (void);  ACE_Asynch_Read_Stream rs_;  ACE_Asynch_Write_Stream ws_;  ACE_HANDLE handle_;  ACE_Recursive_Thread_Mutex mutex_;  long io_count_;  static long sessions_;};long Receiver::sessions_ = 0;Receiver::Receiver (void)  : handle_ (ACE_INVALID_HANDLE),    io_count_ (0){  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);  sessions_++;  ACE_DEBUG ((LM_DEBUG, "Receiver Ctor sessions_=%d\n", sessions_));}Receiver::~Receiver (void){  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);  sessions_--;  ACE_OS::closesocket (this->handle_);  ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor sessions_=%d\n", sessions_));}//  return true if we alive, false  we commited suicideintReceiver::check_destroy (void){  {    ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);    if (io_count_ > 0)      return 1;  }  delete this;  return 0;}voidReceiver::open (ACE_HANDLE handle,                ACE_Message_Block &){  ACE_DEBUG ((LM_DEBUG,              "%N:%l:Receiver::open called\n"));  this->handle_ = handle;  if (this->ws_.open (*this, this->handle_) == -1)    ACE_ERROR ((LM_ERROR,                "%p\n",                "ACE_Asynch_Write_Stream::open"));  else if (this->rs_.open (*this, this->handle_) == -1)    ACE_ERROR ((LM_ERROR,                "%p\n",                "ACE_Asynch_Read_Stream::open"));  else    initiate_read_stream ();  check_destroy ();}intReceiver::initiate_read_stream (void){  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);  ACE_Message_Block *mb = 0;  ACE_NEW_RETURN (mb,                  ACE_Message_Block (BUFSIZ + 1),                  -1);  // Inititiate read  if (this->rs_.read (*mb, mb->size ()- 1) == -1)    {      mb->release ();      ACE_ERROR_RETURN ((LM_ERROR,                         "%p\n",                         "ACE_Asynch_Read_Stream::read"),                        -1);    }  io_count_++;  return 0;}intReceiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes){  ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);  if (nbytes <= 0)    {      mb.release ();      ACE_ERROR_RETURN((LM_ERROR,                        "ACE_Asynch_Write_Stream::write nbytes <0 "),                       -1);    }  if (this->ws_.write (mb, nbytes) == -1)    {      mb.release ();      ACE_ERROR_RETURN((LM_ERROR,                        "%p\n",                        "ACE_Asynch_Write_Stream::write"),                       -1);    }  io_count_++;  return 0;}voidReceiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){  // Reset pointers.  result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';  if (result.bytes_transferred () == 0 || result.error () != 0)    {      ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n"));      ACE_DEBUG ((LM_DEBUG, "********************\n"));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));      ACE_DEBUG ((LM_DEBUG, "********************\n"));      ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));    }  if (result.success () && result.bytes_transferred () != 0)    {      // Successful read: write the data to the file asynchronously.      // Note how we reuse the <ACE_Message_Block> for the writing.      // Therefore, we do not delete this buffer because it is handled      // in <handle_write_stream>.      if(this->initiate_write_stream (result.message_block (),                                      result.bytes_transferred ()) == 0)        {          if (duplex != 0)            {              // Initiate new read from the stream.              this->initiate_read_stream ();            }        }    }  else    {      result.message_block ().release ();      ACE_DEBUG ((LM_DEBUG,  "Receiver completed\n"));    }  {    ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);    io_count_--;  }  check_destroy ();}voidReceiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){  if (result.bytes_transferred () == 0 || result.error () != 0)    {      ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));      ACE_DEBUG ((LM_DEBUG, "********************\n"));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));      ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));      ACE_DEBUG ((LM_DEBUG, "********************\n"));    }  result.message_block ().release ();  if (result.success () && result.bytes_transferred () != 0)    {      // This code is not robust enough to deal with short file writes      // (which hardly ever happen);-)      // ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());      if (duplex == 0)        initiate_read_stream ();    }  {    ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);    io_count_--;  }  check_destroy ();}class Sender : public ACE_Handler{  // = TITLE  //   Sends welcome messages receives them back.public:  Sender (void);  ~Sender (void);  //FUZZ: disable check_for_lack_ACE_OS  int open (const ACE_TCHAR *host, u_short port);  void close (void);  //FUZZ: enable check_for_lack_ACE_OS  ACE_HANDLE handle (void) const;  virtual void handle (ACE_HANDLE);protected:  // These methods are called by the freamwork  virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);  // This is called when asynchronous reads from the socket complete  virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);  // This is called when asynchronous writes from the socket completeprivate:  int initiate_read_stream (void);  int initiate_write_stream (void);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?