test_proactor2.cpp

来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 825 行 · 第 1/2 页

CPP
825
字号
// $Id: test_proactor2.cpp 78962 2007-07-20 03:27:14Z sowayaa $// ============================================================================//// = LIBRARY//    examples//// = FILENAME//    test_proactor2.cpp//// = DESCRIPTION//    Alexander Libman <Alibman@baltimore.com> modified//    <test_proactor> and made this test. Instead of writing received//    data to the file, the receiver sends them back to the//    sender,i.e. ACE_Asynch_Write_File wf_  has been changed to//    ACE_Asynch_Write_Stream wf_.//// = AUTHOR//    Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman//    <Alibman@baltimore.com>.// ============================================================================#include "ace/Signal.h"#include "ace/Service_Config.h"#include "ace/Proactor.h"#include "ace/Asynch_IO.h"#include "ace/Asynch_IO_Impl.h"#include "ace/Asynch_Acceptor.h"#include "ace/INET_Addr.h"#include "ace/SOCK_Connector.h"#include "ace/SOCK_Acceptor.h"#include "ace/SOCK_Stream.h"#include "ace/Message_Block.h"#include "ace/Get_Opt.h"// FUZZ: disable check_for_streams_include#include "ace/streams.h"#include "ace/Task.h"#include "ace/OS_main.h"ACE_RCSID(Proactor, test_proactor2, "test_proactor2.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)  // This only works on Win32 platforms and on Unix platforms supporting  // POSIX aio calls.#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)#include "ace/WIN32_Proactor.h"#elif defined (ACE_HAS_AIO_CALLS)#include "ace/POSIX_Proactor.h"#endif  //  Some debug helper functions  int DisableSignal ( int SigNum );int PrintSigMask ();#define  COUT(X)  cout << X ; cout.flush ();// Host that we're connecting to.static ACE_TCHAR *host = 0;// duplex mode: ==0 half-duplex//              !=0 full duplexstatic int duplex = 0 ;// number threads in the Proactor thread poolstatic int nThreads = 1;// Port that we're receiving connections on.static u_short port = ACE_DEFAULT_SERVER_PORT;// Size of each initial asynchronous <read> operation.static int initial_read_size = BUFSIZ;#define MyMutex ACE_Recursive_Thread_Mutex//#define MyMutex ACE_Thread_Mutex//#define MyMutex ACE_Null_Mutex//--------------------------------------------------------------------------// MyTask plays role for Proactor threads pool//--------------------------------------------------------------------------class MyTask: public ACE_Task<ACE_MT_SYNCH>{public:  int svc (void) ;};int MyTask::svc (void ){  ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));  while ( ACE_Proactor::event_loop_done () == 0 )    {      ACE_Proactor::run_event_loop ();    }  ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));  return 0 ;}//-----------------------------------------------------------//  Receiver//-----------------------------------------------------------class Receiver : public ACE_Service_Handler{public:  Receiver (void);  ~Receiver (void);  //FUZZ: disable check_for_lack_ACE_OS  virtual void open (ACE_HANDLE handle,                     ACE_Message_Block &message_block);  // This is called after the new connection has been accepted.  //FUZZ: enable check_for_lack_ACE_OSprotected:  // These methods are called by the framework  virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result                                   &result);  // This is called when asynchronous <read> operation from the socket  // complete.  virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result                                    &result);  // This is called when an asynchronous <write> to the file  // completes.private:  int  initiate_read_stream  (void);  int  initiate_write_stream (ACE_Message_Block & mb, int nBytes );  bool check_destroy () ;  ACE_Asynch_Read_Stream rs_;  ACE_Asynch_Write_Stream ws_;  ACE_HANDLE handle_;  MyMutex m_Mtx ;  long    nIOCount ;  static long nSessions ;};long Receiver::nSessions = 0 ;Receiver::Receiver (void)  : handle_ (ACE_INVALID_HANDLE),    nIOCount ( 0 ){  ACE_Guard<MyMutex> locker (m_Mtx) ;  nSessions ++ ;  ACE_DEBUG ((LM_DEBUG, "Receiver Ctor nSessions=%d\n", nSessions ));}Receiver::~Receiver (void){  ACE_Guard<MyMutex> locker (m_Mtx) ;  nSessions -- ;  ACE_OS::closesocket (this->handle_);  ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor nSessions=%d\n", nSessions ));}//---------------------------------------------------------------------//  return true if we alive, false  we commited suicide////---------------------------------------------------------------------bool Receiver::check_destroy (){  {    ACE_Guard<MyMutex> locker (m_Mtx) ;    if ( nIOCount > 0 )      {        return true ;      }  }  delete this ;  return false ;}void Receiver::open (ACE_HANDLE handle,                     ACE_Message_Block &message_block){  ACE_UNUSED_ARG (message_block);  ACE_DEBUG ((LM_DEBUG,              "%N:%l:Receiver::open called\n"));  this->handle_ = handle;  if (this->ws_.open (*this, this->handle_ ) == -1)    {      ACE_ERROR ((LM_ERROR,                  "%p\n",                  "ACE_Asynch_Write_Stream::open"));    }  else if (this->rs_.open (*this, this->handle_) == -1)    {      ACE_ERROR ((LM_ERROR,                  "%p\n",                  "ACE_Asynch_Read_Stream::open"));    }  else    {      initiate_read_stream ();    }  check_destroy ();}int Receiver::initiate_read_stream (void){  ACE_Guard<MyMutex> locker (m_Mtx) ;  // Create a new <Message_Block>.  Note that this message block will  // be used both to <read> data asynchronously from the socket and to  // <write> data asynchronously to the file.  ACE_DEBUG ((LM_DEBUG,              "initiate_read_stream called\n"));  ACE_Message_Block *mb = 0;  ACE_NEW_RETURN (mb,                  ACE_Message_Block (BUFSIZ + 1),                  -1);  // Inititiate read  if (this->rs_.read (*mb, mb->size ()- 1) == -1)    {      mb->release () ;      ACE_ERROR_RETURN ((LM_ERROR,                         "%p\n",                         "ACE_Asynch_Read_Stream::read"),                        -1);    }  nIOCount++ ;  return 0;}int Receiver::initiate_write_stream (ACE_Message_Block & mb, int nBytes ){  ACE_Guard<MyMutex> locker (m_Mtx) ;  if (this->ws_.write (mb , nBytes ) == -1)    {      mb.release ();      ACE_ERROR_RETURN((LM_ERROR,                        "%p\n",                        "ACE_Asynch_Write_File::write"),                       -1);    }  nIOCount++ ;  return 0;}voidReceiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){  ACE_DEBUG ((LM_DEBUG,              "handle_read_stream called\n"));  // Reset pointers.  result.message_block ().rd_ptr ()[result.bytes_transferred ()] =    '\0';  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read              ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",              result.bytes_transferred ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)              result.completion_key ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",              result.message_block ().rd_ptr ()));  if ( result.success () && result.bytes_transferred () != 0)    {      // Successful read: write the data to the file asynchronously.      // Note how we reuse the <ACE_Message_Block> for the writing.      // Therefore, we do not delete this buffer because it is handled      // in <handle_write_stream>.      if(this->initiate_write_stream (result.message_block (),                                      result.bytes_transferred () ) == 0 )        {          if ( duplex != 0 )            {              // Initiate new read from the stream.              this->initiate_read_stream () ;            }        }    }  else    {      result.message_block ().release ();      ACE_DEBUG ((LM_DEBUG,  "Receiver completed\n"));    }  {    ACE_Guard<MyMutex> locker (m_Mtx) ;    nIOCount-- ;  }  check_destroy () ;}voidReceiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result                               &result){  ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",              result.bytes_to_write ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",              result.bytes_transferred ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)              result.completion_key ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));  ACE_DEBUG ((LM_DEBUG, "********************\n"));  result.message_block ().release ();  if (result.success ())    {      // This code is not robust enough to deal with short file writes      // (which hardly ever happen) ;-)      //ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());      if ( duplex == 0 )        {          initiate_read_stream () ;        }    }  {    ACE_Guard<MyMutex> locker (m_Mtx) ;    nIOCount-- ;  }  check_destroy () ;}//-------------------------------------------------------------------------// Sender:  sends indefinetely welcome message//  and  recieves it back//------------------------------------------------------------------------class Sender : public ACE_Handler{public:  Sender (void);  ~Sender (void);  //FUZZ: disable check_for_lack_ACE_OS  int open (const ACE_TCHAR *host, u_short port);  void close ();  //FUZZ: enable check_for_lack_ACE_OS  ACE_HANDLE handle (void) const;  void handle (ACE_HANDLE);protected:// These methods are called by the freamworkvirtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result&result);// This is called when asynchronous reads from the socket completevirtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result&result);// This is called when asynchronous writes from the socket completeprivate:int initiate_read_stream (void);int initiate_write_stream (void);ACE_SOCK_Stream stream_;// Network I/O handleACE_Asynch_Write_Stream ws_;// ws (write stream): for writing to the socketACE_Asynch_Read_Stream rs_;// rs (read file): for reading from the socketACE_Message_Block welcome_message_;// Welcome message

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?