test_proactor3.cpp
来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 890 行 · 第 1/2 页
CPP
890 行
// $Id: test_proactor3.cpp 78962 2007-07-20 03:27:14Z sowayaa $// ============================================================================//// = LIBRARY// examples//// = FILENAME// test_proactor3.cpp//// = DESCRIPTION// This program illustrates how the <ACE_Proactor> can be used to// implement an application that does various asynchronous// operations.//// = AUTHOR// Irfan Pyarali <irfan@cs.wustl.edu>// modified by Alexander Libman <alibman@baltimore.com>// from original test_proactor.cpp// ============================================================================#include "ace/Signal.h"#include "ace/Service_Config.h"#include "ace/Proactor.h"#include "ace/Asynch_IO.h"#include "ace/Asynch_IO_Impl.h"#include "ace/Asynch_Acceptor.h"#include "ace/INET_Addr.h"#include "ace/Manual_Event.h"#include "ace/SOCK_Connector.h"#include "ace/SOCK_Acceptor.h"#include "ace/SOCK_Stream.h"#include "ace/Message_Block.h"#include "ace/Get_Opt.h"// FUZZ: disable check_for_streams_include#include "ace/streams.h"#include "ace/Task.h"ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) // This only works on Win32 platforms and on Unix platforms // supporting POSIX aio calls.#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)# include "ace/WIN32_Proactor.h"#elif defined (ACE_HAS_AIO_CALLS)# include "ace/POSIX_Proactor.h"# include "ace/SUN_Proactor.h"#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */// Some debug helper functionsstatic int disable_signal (int sigmin, int sigmax);#if 0static int print_sigmask (void);#endif#define COUT(X) cout << X; cout.flush ();// Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB,// 2-SIG, 3-SUNstatic int proactor_type = 0;// POSIX : > 0 max number aio operations proactor,static int max_aio_operations = 0;// Host that we're connecting to.static ACE_TCHAR *host = 0;// number of Senders instancesstatic int senders = 1;static const int MaxSenders = 100;// duplex mode: ==0 half-duplex// !=0 full duplexstatic int duplex = 0;// number threads in the Proactor thread poolstatic int threads = 1;// Port that we're receiving connections on.static u_short port = ACE_DEFAULT_SERVER_PORT;class MyTask: public ACE_Task<ACE_MT_SYNCH>{ // = TITLE // MyTask plays role for Proactor threads poolpublic: MyTask (void) : threads_ (0), proactor_ (0) {} int svc (void); void waitready (void) { event_.wait (); }private: ACE_Recursive_Thread_Mutex mutex_; int threads_; ACE_Proactor *proactor_; ACE_Manual_Event event_; void create_proactor (void); void delete_proactor (void);};voidMyTask::create_proactor (void){ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); if (threads_ == 0) {#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) ACE_WIN32_Proactor *proactor = new ACE_WIN32_Proactor; ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=WIN32"));#elif defined (ACE_HAS_AIO_CALLS) ACE_POSIX_Proactor *proactor = 0; switch (proactor_type) { case 1: proactor = new ACE_POSIX_AIOCB_Proactor (max_aio_operations); ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=AIOCB\n")); break; case 2: proactor = new ACE_POSIX_SIG_Proactor; ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n")); break;# if defined (sun) case 3: proactor = new ACE_SUN_Proactor (max_aio_operations); ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SUN\n")); break;# endif /* sun */ default:proactor = new ACE_POSIX_SIG_Proactor; ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n")); break; }#endif proactor_ = new ACE_Proactor (proactor, 1); ACE_Proactor::instance(proactor_); event_.signal (); } threads_++;}voidMyTask::delete_proactor (void){ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); if (--threads_ == 0) { ACE_DEBUG ((LM_DEBUG, "(%t) Delete Proactor\n")); ACE_Proactor::instance ((ACE_Proactor *) 0); delete proactor_; proactor_ = 0; }}intMyTask::svc (void){ ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n")); create_proactor (); disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); while (ACE_Proactor::event_loop_done () == 0) ACE_Proactor::run_event_loop (); delete_proactor (); ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n")); return 0;}class Receiver : public ACE_Service_Handler{public: Receiver (void); ~Receiver (void); //FUZZ: disable check_for_lack_ACE_OS virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); // This is called after the new connection has been accepted. //FUZZ: enable check_for_lack_ACE_OS static long get_number_sessions (void) { return sessions_; }protected: // These methods are called by the framework virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); // This is called when asynchronous <read> operation from the socket // complete. virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); // This is called when an asynchronous <write> to the file // completes.private: int initiate_read_stream (void); int initiate_write_stream (ACE_Message_Block & mb, int nBytes); int check_destroy (void); ACE_Asynch_Read_Stream rs_; ACE_Asynch_Write_Stream ws_; ACE_HANDLE handle_; ACE_Recursive_Thread_Mutex mutex_; long io_count_; static long sessions_;};long Receiver::sessions_ = 0;Receiver::Receiver (void) : handle_ (ACE_INVALID_HANDLE), io_count_ (0){ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); sessions_++; ACE_DEBUG ((LM_DEBUG, "Receiver Ctor sessions_=%d\n", sessions_));}Receiver::~Receiver (void){ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); sessions_--; ACE_OS::closesocket (this->handle_); ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor sessions_=%d\n", sessions_));}// return true if we alive, false we commited suicideintReceiver::check_destroy (void){ { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); if (io_count_ > 0) return 1; } delete this; return 0;}voidReceiver::open (ACE_HANDLE handle, ACE_Message_Block &){ ACE_DEBUG ((LM_DEBUG, "%N:%l:Receiver::open called\n")); this->handle_ = handle; if (this->ws_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::open")); else if (this->rs_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::open")); else initiate_read_stream (); check_destroy ();}intReceiver::initiate_read_stream (void){ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); // Inititiate read if (this->rs_.read (*mb, mb->size ()- 1) == -1) { mb->release (); ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Stream::read"), -1); } io_count_++; return 0;}intReceiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes){ ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); if (nbytes <= 0) { mb.release (); ACE_ERROR_RETURN((LM_ERROR, "ACE_Asynch_Write_Stream::write nbytes <0 "), -1); } if (this->ws_.write (mb, nbytes) == -1) { mb.release (); ACE_ERROR_RETURN((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write"), -1); } io_count_++; return 0;}voidReceiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){ // Reset pointers. result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; if (result.bytes_transferred () == 0 || result.error () != 0) { ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); } if (result.success () && result.bytes_transferred () != 0) { // Successful read: write the data to the file asynchronously. // Note how we reuse the <ACE_Message_Block> for the writing. // Therefore, we do not delete this buffer because it is handled // in <handle_write_stream>. if(this->initiate_write_stream (result.message_block (), result.bytes_transferred ()) == 0) { if (duplex != 0) { // Initiate new read from the stream. this->initiate_read_stream (); } } } else { result.message_block ().release (); ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); } { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); io_count_--; } check_destroy ();}voidReceiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){ if (result.bytes_transferred () == 0 || result.error () != 0) { ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); } result.message_block ().release (); if (result.success () && result.bytes_transferred () != 0) { // This code is not robust enough to deal with short file writes // (which hardly ever happen);-) // ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ()); if (duplex == 0) initiate_read_stream (); } { ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_); io_count_--; } check_destroy ();}class Sender : public ACE_Handler{ // = TITLE // Sends welcome messages receives them back.public: Sender (void); ~Sender (void); //FUZZ: disable check_for_lack_ACE_OS int open (const ACE_TCHAR *host, u_short port); void close (void); //FUZZ: enable check_for_lack_ACE_OS ACE_HANDLE handle (void) const; virtual void handle (ACE_HANDLE);protected: // These methods are called by the freamwork virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); // This is called when asynchronous reads from the socket complete virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); // This is called when asynchronous writes from the socket completeprivate: int initiate_read_stream (void); int initiate_write_stream (void);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?