test_proactor.cpp

来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 684 行 · 第 1/2 页

CPP
684
字号
// $Id: test_proactor.cpp 78962 2007-07-20 03:27:14Z sowayaa $// ============================================================================//// = LIBRARY//    examples//// = FILENAME//    test_proactor.cpp//// = DESCRIPTION//    This program illustrates how the <ACE_Proactor> can be used to//    implement an application that does various asynchronous//    operations.//// = AUTHOR//    Irfan Pyarali <irfan@cs.wustl.edu>//// ============================================================================#include "ace/OS_NS_string.h"#include "ace/OS_main.h"#include "ace/Service_Config.h"#include "ace/Proactor.h"#include "ace/Asynch_IO.h"#include "ace/Asynch_IO_Impl.h"#include "ace/Asynch_Acceptor.h"#include "ace/INET_Addr.h"#include "ace/SOCK_Connector.h"#include "ace/SOCK_Acceptor.h"#include "ace/SOCK_Stream.h"#include "ace/Message_Block.h"#include "ace/Get_Opt.h"#include "ace/Log_Msg.h"#include "ace/OS_NS_sys_stat.h"#include "ace/OS_NS_sys_socket.h"#include "ace/OS_NS_unistd.h"#include "ace/OS_NS_fcntl.h"ACE_RCSID(Proactor, test_proactor, "$Id: test_proactor.cpp 78962 2007-07-20 03:27:14Z sowayaa $")#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)  // This only works on Win32 platforms and on Unix platforms supporting  // POSIX aio calls.#include "test_proactor.h"// Host that we're connecting to.static ACE_TCHAR *host = 0;// Port that we're receiving connections on.static u_short port = ACE_DEFAULT_SERVER_PORT;// File that we're sending.static const ACE_TCHAR *file = ACE_TEXT("test_proactor.cpp");// Name of the output file.static const ACE_TCHAR *dump_file = ACE_TEXT("output");// Keep track of when we're done.static int done = 0;// Size of each initial asynchronous <read> operation.static int initial_read_size = BUFSIZ;Receiver::Receiver (void)  : dump_file_ (ACE_INVALID_HANDLE),    handle_ (ACE_INVALID_HANDLE){}Receiver::~Receiver (void){  ACE_OS::close (this->dump_file_);  ACE_OS::closesocket (this->handle_);}voidReceiver::open (ACE_HANDLE handle,                ACE_Message_Block &message_block){  ACE_DEBUG ((LM_DEBUG,              "%N:%l:Receiver::open called\n"));  // New connection, so initiate stuff.  // Cache the new connection  this->handle_ = handle;  // File offset starts at zero  this->file_offset_ = 0;  // Open dump file (in OVERLAPPED mode)  this->dump_file_ = ACE_OS::open (dump_file,                                   O_CREAT | O_RDWR | O_TRUNC | \                                   FILE_FLAG_OVERLAPPED);  if (this->dump_file_ == ACE_INVALID_HANDLE)    {      ACE_ERROR ((LM_ERROR,                  "%p\n",                  "ACE_OS::open"));      return;    }  // Initiate <ACE_Asynch_Write_File>.  if (this->wf_.open (*this,                      this->dump_file_) == -1)    {      ACE_ERROR ((LM_ERROR,                  "%p\n",                  "ACE_Asynch_Write_File::open"));      return;    }  // Initiate <ACE_Asynch_Read_Stream>.  if (this->rs_.open (*this, this->handle_) == -1)    {      ACE_ERROR ((LM_ERROR,                  "%p\n",                  "ACE_Asynch_Read_Stream::open"));      return;    }  // Fake the result and make the <handle_read_stream> get  // called. But, not, if there is '0' is transferred.  if (message_block.length () != 0)    {      // Duplicate the message block so that we can keep it around.      ACE_Message_Block &duplicate =        *message_block.duplicate ();      // Fake the result so that we will get called back.      ACE_Asynch_Read_Stream_Result_Impl *fake_result =        ACE_Proactor::instance ()->create_asynch_read_stream_result (this->proxy (),                                                                     this->handle_,                                                                     duplicate,                                                                     initial_read_size,                                                                     0,                                                                     ACE_INVALID_HANDLE,                                                                     0,                                                                     0);      size_t bytes_transferred = message_block.length ();      // <complete> for Accept would have already moved the <wr_ptr>      // forward. Update it to the beginning position.      duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);      // This will call the callback.      fake_result->complete (message_block.length (),                             1,                             0);      // Zap the fake result.      delete fake_result;    }  else    // Otherwise, make sure we proceed. Initiate reading the socket    // stream.    if (this->initiate_read_stream () == -1)      return;}intReceiver::initiate_read_stream (void){  // Create a new <Message_Block>.  Note that this message block will  // be used both to <read> data asynchronously from the socket and to  // <write> data asynchronously to the file.  ACE_Message_Block *mb = 0;  ACE_NEW_RETURN (mb,                  ACE_Message_Block (BUFSIZ + 1),                  -1);  // Inititiate read  if (this->rs_.read (*mb,                      mb->size () - 1) == -1)    ACE_ERROR_RETURN ((LM_ERROR,                       "%p\n",                       "ACE_Asynch_Read_Stream::read"),                      -1);  return 0;}voidReceiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){  ACE_DEBUG ((LM_DEBUG,              "handle_read_stream called\n"));  // Reset pointers.  result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));  ACE_DEBUG ((LM_DEBUG, "********************\n"));#if 0  // This can overrun the ACE_Log_Msg buffer and do bad things.  // Re-enable it at your risk.  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));#endif /* 0 */  if (result.success () && result.bytes_transferred () != 0)    {      // Successful read: write the data to the file asynchronously.      // Note how we reuse the <ACE_Message_Block> for the writing.      // Therefore, we do not delete this buffer because it is handled      // in <handle_write_stream>.      if (this->wf_.write (result.message_block (),                           result.bytes_transferred (),                           this->file_offset_) == -1)        {          ACE_ERROR ((LM_ERROR,                      "%p\n",                      "ACE_Asynch_Write_File::write"));          return;        }      // Initiate new read from the stream.      if (this->initiate_read_stream () == -1)        return;    }  else    {      ACE_DEBUG ((LM_DEBUG,                  "Receiver completed\n"));      // No need for this message block anymore.      result.message_block ().release ();      // Note that we are done with the test.      done = 1;      // We are done: commit suicide.      delete this;    }}voidReceiver::handle_write_file (const ACE_Asynch_Write_File::Result &result){  ACE_DEBUG ((LM_DEBUG, "handle_write_file called\n"));  ACE_DEBUG ((LM_DEBUG, "********************\n"));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));  ACE_DEBUG ((LM_DEBUG, "********************\n"));  result.message_block ().release ();  if (result.success ())    // Write successful:  Increment file offset    this->file_offset_ += result.bytes_transferred ();  // This code is not robust enough to deal with short file writes  // (which hardly ever happen) ;-)  ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());}class Sender : public ACE_Handler{  // = TITLE  //     The class will be created by <main>.  After connecting to the  //     host, this class will then read data from a file and send it  //     to the network connection.public:  Sender (void);  ~Sender (void);  //FUZZ: disable check_for_lack_ACE_OS  int open (const ACE_TCHAR *host,            u_short port);  //FUZZ: enable check_for_lack_ACE_OS  ACE_HANDLE handle (void) const;  void handle (ACE_HANDLE);protected:  // These methods are called by the freamwork  virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result);  // This is called when asynchronous transmit files complete  virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);  // This is called when asynchronous writes from the socket complete  virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);  // This is called when asynchronous reads from the socket completeprivate:  int transmit_file (void);  // Transmit the entire file in one fell swoop.  int initiate_read_file (void);  // Initiate an asynchronous file read.  ACE_SOCK_Stream stream_;  // Network I/O handle  ACE_Asynch_Write_Stream ws_;  // ws (write stream): for writing to the socket  ACE_Asynch_Read_File rf_;  // rf (read file): for writing from the file  ACE_Asynch_Transmit_File tf_;  // Transmit file.  ACE_HANDLE input_file_;  // File to read from  u_long file_offset_;  // Current file offset  u_long file_size_;  // File size  ACE_Message_Block welcome_message_;  // Welcome message  ACE_Asynch_Transmit_File::Header_And_Trailer header_and_trailer_;  // Header and trailer which goes with transmit_file  int stream_write_done_;  int transmit_file_done_;  // These flags help to determine when to close down the event loop};Sender::Sender (void)  : input_file_ (ACE_INVALID_HANDLE),    file_offset_ (0),

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?