test_proactor.cpp
来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 684 行 · 第 1/2 页
CPP
684 行
// $Id: test_proactor.cpp 78962 2007-07-20 03:27:14Z sowayaa $// ============================================================================//// = LIBRARY// examples//// = FILENAME// test_proactor.cpp//// = DESCRIPTION// This program illustrates how the <ACE_Proactor> can be used to// implement an application that does various asynchronous// operations.//// = AUTHOR// Irfan Pyarali <irfan@cs.wustl.edu>//// ============================================================================#include "ace/OS_NS_string.h"#include "ace/OS_main.h"#include "ace/Service_Config.h"#include "ace/Proactor.h"#include "ace/Asynch_IO.h"#include "ace/Asynch_IO_Impl.h"#include "ace/Asynch_Acceptor.h"#include "ace/INET_Addr.h"#include "ace/SOCK_Connector.h"#include "ace/SOCK_Acceptor.h"#include "ace/SOCK_Stream.h"#include "ace/Message_Block.h"#include "ace/Get_Opt.h"#include "ace/Log_Msg.h"#include "ace/OS_NS_sys_stat.h"#include "ace/OS_NS_sys_socket.h"#include "ace/OS_NS_unistd.h"#include "ace/OS_NS_fcntl.h"ACE_RCSID(Proactor, test_proactor, "$Id: test_proactor.cpp 78962 2007-07-20 03:27:14Z sowayaa $")#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) // This only works on Win32 platforms and on Unix platforms supporting // POSIX aio calls.#include "test_proactor.h"// Host that we're connecting to.static ACE_TCHAR *host = 0;// Port that we're receiving connections on.static u_short port = ACE_DEFAULT_SERVER_PORT;// File that we're sending.static const ACE_TCHAR *file = ACE_TEXT("test_proactor.cpp");// Name of the output file.static const ACE_TCHAR *dump_file = ACE_TEXT("output");// Keep track of when we're done.static int done = 0;// Size of each initial asynchronous <read> operation.static int initial_read_size = BUFSIZ;Receiver::Receiver (void) : dump_file_ (ACE_INVALID_HANDLE), handle_ (ACE_INVALID_HANDLE){}Receiver::~Receiver (void){ ACE_OS::close (this->dump_file_); ACE_OS::closesocket (this->handle_);}voidReceiver::open (ACE_HANDLE handle, ACE_Message_Block &message_block){ ACE_DEBUG ((LM_DEBUG, "%N:%l:Receiver::open called\n")); // New connection, so initiate stuff. // Cache the new connection this->handle_ = handle; // File offset starts at zero this->file_offset_ = 0; // Open dump file (in OVERLAPPED mode) this->dump_file_ = ACE_OS::open (dump_file, O_CREAT | O_RDWR | O_TRUNC | \ FILE_FLAG_OVERLAPPED); if (this->dump_file_ == ACE_INVALID_HANDLE) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_OS::open")); return; } // Initiate <ACE_Asynch_Write_File>. if (this->wf_.open (*this, this->dump_file_) == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::open")); return; } // Initiate <ACE_Asynch_Read_Stream>. if (this->rs_.open (*this, this->handle_) == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::open")); return; } // Fake the result and make the <handle_read_stream> get // called. But, not, if there is '0' is transferred. if (message_block.length () != 0) { // Duplicate the message block so that we can keep it around. ACE_Message_Block &duplicate = *message_block.duplicate (); // Fake the result so that we will get called back. ACE_Asynch_Read_Stream_Result_Impl *fake_result = ACE_Proactor::instance ()->create_asynch_read_stream_result (this->proxy (), this->handle_, duplicate, initial_read_size, 0, ACE_INVALID_HANDLE, 0, 0); size_t bytes_transferred = message_block.length (); // <complete> for Accept would have already moved the <wr_ptr> // forward. Update it to the beginning position. duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred); // This will call the callback. fake_result->complete (message_block.length (), 1, 0); // Zap the fake result. delete fake_result; } else // Otherwise, make sure we proceed. Initiate reading the socket // stream. if (this->initiate_read_stream () == -1) return;}intReceiver::initiate_read_stream (void){ // Create a new <Message_Block>. Note that this message block will // be used both to <read> data asynchronously from the socket and to // <write> data asynchronously to the file. ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); // Inititiate read if (this->rs_.read (*mb, mb->size () - 1) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1); return 0;}voidReceiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){ ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); // Reset pointers. result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n"));#if 0 // This can overrun the ACE_Log_Msg buffer and do bad things. // Re-enable it at your risk. ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));#endif /* 0 */ if (result.success () && result.bytes_transferred () != 0) { // Successful read: write the data to the file asynchronously. // Note how we reuse the <ACE_Message_Block> for the writing. // Therefore, we do not delete this buffer because it is handled // in <handle_write_stream>. if (this->wf_.write (result.message_block (), result.bytes_transferred (), this->file_offset_) == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write")); return; } // Initiate new read from the stream. if (this->initiate_read_stream () == -1) return; } else { ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); // No need for this message block anymore. result.message_block ().release (); // Note that we are done with the test. done = 1; // We are done: commit suicide. delete this; }}voidReceiver::handle_write_file (const ACE_Asynch_Write_File::Result &result){ ACE_DEBUG ((LM_DEBUG, "handle_write_file called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); result.message_block ().release (); if (result.success ()) // Write successful: Increment file offset this->file_offset_ += result.bytes_transferred (); // This code is not robust enough to deal with short file writes // (which hardly ever happen) ;-) ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());}class Sender : public ACE_Handler{ // = TITLE // The class will be created by <main>. After connecting to the // host, this class will then read data from a file and send it // to the network connection.public: Sender (void); ~Sender (void); //FUZZ: disable check_for_lack_ACE_OS int open (const ACE_TCHAR *host, u_short port); //FUZZ: enable check_for_lack_ACE_OS ACE_HANDLE handle (void) const; void handle (ACE_HANDLE);protected: // These methods are called by the freamwork virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result); // This is called when asynchronous transmit files complete virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); // This is called when asynchronous writes from the socket complete virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); // This is called when asynchronous reads from the socket completeprivate: int transmit_file (void); // Transmit the entire file in one fell swoop. int initiate_read_file (void); // Initiate an asynchronous file read. ACE_SOCK_Stream stream_; // Network I/O handle ACE_Asynch_Write_Stream ws_; // ws (write stream): for writing to the socket ACE_Asynch_Read_File rf_; // rf (read file): for writing from the file ACE_Asynch_Transmit_File tf_; // Transmit file. ACE_HANDLE input_file_; // File to read from u_long file_offset_; // Current file offset u_long file_size_; // File size ACE_Message_Block welcome_message_; // Welcome message ACE_Asynch_Transmit_File::Header_And_Trailer header_and_trailer_; // Header and trailer which goes with transmit_file int stream_write_done_; int transmit_file_done_; // These flags help to determine when to close down the event loop};Sender::Sender (void) : input_file_ (ACE_INVALID_HANDLE), file_offset_ (0),
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?