test_proactor2.cpp
来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 825 行 · 第 1/2 页
CPP
825 行
// $Id: test_proactor2.cpp 78962 2007-07-20 03:27:14Z sowayaa $// ============================================================================//// = LIBRARY// examples//// = FILENAME// test_proactor2.cpp//// = DESCRIPTION// Alexander Libman <Alibman@baltimore.com> modified// <test_proactor> and made this test. Instead of writing received// data to the file, the receiver sends them back to the// sender,i.e. ACE_Asynch_Write_File wf_ has been changed to// ACE_Asynch_Write_Stream wf_.//// = AUTHOR// Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman// <Alibman@baltimore.com>.// ============================================================================#include "ace/Signal.h"#include "ace/Service_Config.h"#include "ace/Proactor.h"#include "ace/Asynch_IO.h"#include "ace/Asynch_IO_Impl.h"#include "ace/Asynch_Acceptor.h"#include "ace/INET_Addr.h"#include "ace/SOCK_Connector.h"#include "ace/SOCK_Acceptor.h"#include "ace/SOCK_Stream.h"#include "ace/Message_Block.h"#include "ace/Get_Opt.h"// FUZZ: disable check_for_streams_include#include "ace/streams.h"#include "ace/Task.h"#include "ace/OS_main.h"ACE_RCSID(Proactor, test_proactor2, "test_proactor2.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) // This only works on Win32 platforms and on Unix platforms supporting // POSIX aio calls.#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)#include "ace/WIN32_Proactor.h"#elif defined (ACE_HAS_AIO_CALLS)#include "ace/POSIX_Proactor.h"#endif // Some debug helper functions int DisableSignal ( int SigNum );int PrintSigMask ();#define COUT(X) cout << X ; cout.flush ();// Host that we're connecting to.static ACE_TCHAR *host = 0;// duplex mode: ==0 half-duplex// !=0 full duplexstatic int duplex = 0 ;// number threads in the Proactor thread poolstatic int nThreads = 1;// Port that we're receiving connections on.static u_short port = ACE_DEFAULT_SERVER_PORT;// Size of each initial asynchronous <read> operation.static int initial_read_size = BUFSIZ;#define MyMutex ACE_Recursive_Thread_Mutex//#define MyMutex ACE_Thread_Mutex//#define MyMutex ACE_Null_Mutex//--------------------------------------------------------------------------// MyTask plays role for Proactor threads pool//--------------------------------------------------------------------------class MyTask: public ACE_Task<ACE_MT_SYNCH>{public: int svc (void) ;};int MyTask::svc (void ){ ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n")); while ( ACE_Proactor::event_loop_done () == 0 ) { ACE_Proactor::run_event_loop (); } ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n")); return 0 ;}//-----------------------------------------------------------// Receiver//-----------------------------------------------------------class Receiver : public ACE_Service_Handler{public: Receiver (void); ~Receiver (void); //FUZZ: disable check_for_lack_ACE_OS virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); // This is called after the new connection has been accepted. //FUZZ: enable check_for_lack_ACE_OSprotected: // These methods are called by the framework virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); // This is called when asynchronous <read> operation from the socket // complete. virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); // This is called when an asynchronous <write> to the file // completes.private: int initiate_read_stream (void); int initiate_write_stream (ACE_Message_Block & mb, int nBytes ); bool check_destroy () ; ACE_Asynch_Read_Stream rs_; ACE_Asynch_Write_Stream ws_; ACE_HANDLE handle_; MyMutex m_Mtx ; long nIOCount ; static long nSessions ;};long Receiver::nSessions = 0 ;Receiver::Receiver (void) : handle_ (ACE_INVALID_HANDLE), nIOCount ( 0 ){ ACE_Guard<MyMutex> locker (m_Mtx) ; nSessions ++ ; ACE_DEBUG ((LM_DEBUG, "Receiver Ctor nSessions=%d\n", nSessions ));}Receiver::~Receiver (void){ ACE_Guard<MyMutex> locker (m_Mtx) ; nSessions -- ; ACE_OS::closesocket (this->handle_); ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor nSessions=%d\n", nSessions ));}//---------------------------------------------------------------------// return true if we alive, false we commited suicide////---------------------------------------------------------------------bool Receiver::check_destroy (){ { ACE_Guard<MyMutex> locker (m_Mtx) ; if ( nIOCount > 0 ) { return true ; } } delete this ; return false ;}void Receiver::open (ACE_HANDLE handle, ACE_Message_Block &message_block){ ACE_UNUSED_ARG (message_block); ACE_DEBUG ((LM_DEBUG, "%N:%l:Receiver::open called\n")); this->handle_ = handle; if (this->ws_.open (*this, this->handle_ ) == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::open")); } else if (this->rs_.open (*this, this->handle_) == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::open")); } else { initiate_read_stream (); } check_destroy ();}int Receiver::initiate_read_stream (void){ ACE_Guard<MyMutex> locker (m_Mtx) ; // Create a new <Message_Block>. Note that this message block will // be used both to <read> data asynchronously from the socket and to // <write> data asynchronously to the file. ACE_DEBUG ((LM_DEBUG, "initiate_read_stream called\n")); ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); // Inititiate read if (this->rs_.read (*mb, mb->size ()- 1) == -1) { mb->release () ; ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1); } nIOCount++ ; return 0;}int Receiver::initiate_write_stream (ACE_Message_Block & mb, int nBytes ){ ACE_Guard<MyMutex> locker (m_Mtx) ; if (this->ws_.write (mb , nBytes ) == -1) { mb.release (); ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"), -1); } nIOCount++ ; return 0;}voidReceiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){ ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); // Reset pointers. result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); if ( result.success () && result.bytes_transferred () != 0) { // Successful read: write the data to the file asynchronously. // Note how we reuse the <ACE_Message_Block> for the writing. // Therefore, we do not delete this buffer because it is handled // in <handle_write_stream>. if(this->initiate_write_stream (result.message_block (), result.bytes_transferred () ) == 0 ) { if ( duplex != 0 ) { // Initiate new read from the stream. this->initiate_read_stream () ; } } } else { result.message_block ().release (); ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); } { ACE_Guard<MyMutex> locker (m_Mtx) ; nIOCount-- ; } check_destroy () ;}voidReceiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){ ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); result.message_block ().release (); if (result.success ()) { // This code is not robust enough to deal with short file writes // (which hardly ever happen) ;-) //ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ()); if ( duplex == 0 ) { initiate_read_stream () ; } } { ACE_Guard<MyMutex> locker (m_Mtx) ; nIOCount-- ; } check_destroy () ;}//-------------------------------------------------------------------------// Sender: sends indefinetely welcome message// and recieves it back//------------------------------------------------------------------------class Sender : public ACE_Handler{public: Sender (void); ~Sender (void); //FUZZ: disable check_for_lack_ACE_OS int open (const ACE_TCHAR *host, u_short port); void close (); //FUZZ: enable check_for_lack_ACE_OS ACE_HANDLE handle (void) const; void handle (ACE_HANDLE);protected:// These methods are called by the freamworkvirtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result&result);// This is called when asynchronous reads from the socket completevirtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result&result);// This is called when asynchronous writes from the socket completeprivate:int initiate_read_stream (void);int initiate_write_stream (void);ACE_SOCK_Stream stream_;// Network I/O handleACE_Asynch_Write_Stream ws_;// ws (write stream): for writing to the socketACE_Asynch_Read_Stream rs_;// rs (read file): for reading from the socketACE_Message_Block welcome_message_;// Welcome message
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?