📄 test_proactor3.cpp
字号:
// test_proactor3.cpp,v 1.6 2003/11/10 01:48:03 dhinton Exp
// ============================================================================
//
// = LIBRARY
// examples
//
// = FILENAME
// test_proactor3.cpp
//
// = DESCRIPTION
// This program illustrates how the <ACE_Proactor> can be used to
// implement an application that does various asynchronous
// operations.
//
// = AUTHOR
// Irfan Pyarali <irfan@cs.wustl.edu>
// modified by Alexander Libman <alibman@baltimore.com>
// from original test_proactor.cpp
// ============================================================================
#include "ace/Signal.h"
#include "ace/Service_Config.h"
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Get_Opt.h"
// FUZZ: disable check_for_streams_include
#include "ace/streams.h"
#include "ace/Task.h"
ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")
#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
// This only works on Win32 platforms and on Unix platforms
// supporting POSIX aio calls.
#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
# include "ace/WIN32_Proactor.h"
#elif defined (ACE_HAS_AIO_CALLS)
# include "ace/POSIX_Proactor.h"
# include "ace/SUN_Proactor.h"
#endif /* defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) */
// Some debug helper functions
static int disable_signal (int sigmin, int sigmax);
#if 0
static int print_sigmask (void);
#endif
#define COUT(X) cout << X; cout.flush ();
// Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB,
// 2-SIG, 3-SUN
static int proactor_type = 0;
// POSIX : > 0 max number aio operations proactor,
static int max_aio_operations = 0;
// Host that we're connecting to.
static ACE_TCHAR *host = 0;
// number of Senders instances
static int senders = 1;
static const int MaxSenders = 100;
// duplex mode: ==0 half-duplex
// !=0 full duplex
static int duplex = 0;
// number threads in the Proactor thread pool
static int threads = 1;
// Port that we're receiving connections on.
static u_short port = ACE_DEFAULT_SERVER_PORT;
class MyTask: public ACE_Task<ACE_MT_SYNCH>
{
// = TITLE
// MyTask plays role for Proactor threads pool
public:
MyTask (void) : threads_ (0), proactor_ (0) {}
int svc (void);
void waitready (void) { event_.wait (); }
private:
ACE_Recursive_Thread_Mutex mutex_;
int threads_;
ACE_Proactor *proactor_;
ACE_Manual_Event event_;
void create_proactor (void);
void delete_proactor (void);
};
void
MyTask::create_proactor (void)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
if (threads_ == 0)
{
#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
ACE_WIN32_Proactor *proactor = new ACE_WIN32_Proactor;
ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=WIN32"));
#elif defined (ACE_HAS_AIO_CALLS)
ACE_POSIX_Proactor *proactor = 0;
switch (proactor_type)
{
case 1: proactor = new ACE_POSIX_AIOCB_Proactor (max_aio_operations);
ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=AIOCB\n"));
break;
case 2: proactor = new ACE_POSIX_SIG_Proactor;
ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
break;
# if defined (sun)
case 3: proactor = new ACE_SUN_Proactor (max_aio_operations);
ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SUN\n"));
break;
# endif /* sun */
default:proactor = new ACE_POSIX_SIG_Proactor;
ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
break;
}
#endif
proactor_ = new ACE_Proactor (proactor, 1);
ACE_Proactor::instance(proactor_);
event_.signal ();
}
threads_++;
}
void
MyTask::delete_proactor (void)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
if (--threads_ == 0)
{
ACE_DEBUG ((LM_DEBUG, "(%t) Delete Proactor\n"));
ACE_Proactor::instance ((ACE_Proactor *) 0);
delete proactor_;
proactor_ = 0;
}
}
int
MyTask::svc (void)
{
ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));
create_proactor ();
disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
while (ACE_Proactor::event_loop_done () == 0)
ACE_Proactor::run_event_loop ();
delete_proactor ();
ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
return 0;
}
class Receiver : public ACE_Service_Handler
{
public:
Receiver (void);
~Receiver (void);
virtual void open (ACE_HANDLE handle,
ACE_Message_Block &message_block);
// This is called after the new connection has been accepted.
static long get_number_sessions (void) { return sessions_; }
protected:
// These methods are called by the framework
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
// This is called when asynchronous <read> operation from the socket
// complete.
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
// This is called when an asynchronous <write> to the file
// completes.
private:
int initiate_read_stream (void);
int initiate_write_stream (ACE_Message_Block & mb, int nBytes);
int check_destroy (void);
ACE_Asynch_Read_Stream rs_;
ACE_Asynch_Write_Stream ws_;
ACE_HANDLE handle_;
ACE_Recursive_Thread_Mutex mutex_;
long io_count_;
static long sessions_;
};
long Receiver::sessions_ = 0;
Receiver::Receiver (void)
: handle_ (ACE_INVALID_HANDLE),
io_count_ (0)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
sessions_++;
ACE_DEBUG ((LM_DEBUG, "Receiver Ctor sessions_=%d\n", sessions_));
}
Receiver::~Receiver (void)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
sessions_--;
ACE_OS::closesocket (this->handle_);
ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor sessions_=%d\n", sessions_));
}
// return true if we alive, false we commited suicide
int
Receiver::check_destroy (void)
{
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
if (io_count_ > 0)
return 1;
}
delete this;
return 0;
}
void
Receiver::open (ACE_HANDLE handle,
ACE_Message_Block &)
{
ACE_DEBUG ((LM_DEBUG,
"%N:%l:Receiver::open called\n"));
this->handle_ = handle;
if (this->ws_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Stream::open"));
else if (this->rs_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::open"));
else
initiate_read_stream ();
check_destroy ();
}
int
Receiver::initiate_read_stream (void)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
ACE_Message_Block (BUFSIZ + 1),
-1);
// Inititiate read
if (this->rs_.read (*mb, mb->size ()- 1) == -1)
{
mb->release ();
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::read"),
-1);
}
io_count_++;
return 0;
}
int
Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
if (nbytes <= 0)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
"ACE_Asynch_Write_Stream::write nbytes <0 "),
-1);
}
if (this->ws_.write (mb, nbytes) == -1)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Stream::write"),
-1);
}
io_count_++;
return 0;
}
void
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
// Reset pointers.
result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
if (result.bytes_transferred () == 0 || result.error () != 0)
{
ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n"));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
}
if (result.success () && result.bytes_transferred () != 0)
{
// Successful read: write the data to the file asynchronously.
// Note how we reuse the <ACE_Message_Block> for the writing.
// Therefore, we do not delete this buffer because it is handled
// in <handle_write_stream>.
if(this->initiate_write_stream (result.message_block (),
result.bytes_transferred ()) == 0)
{
if (duplex != 0)
{
// Initiate new read from the stream.
this->initiate_read_stream ();
}
}
}
else
{
result.message_block ().release ();
ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
}
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
io_count_--;
}
check_destroy ();
}
void
Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
if (result.bytes_transferred () == 0 || result.error () != 0)
{
ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
}
result.message_block ().release ();
if (result.success () && result.bytes_transferred () != 0)
{
// This code is not robust enough to deal with short file writes
// (which hardly ever happen);-)
// ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
if (duplex == 0)
initiate_read_stream ();
}
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
io_count_--;
}
check_destroy ();
}
class Sender : public ACE_Handler
{
// = TITLE
// Sends welcome messages receives them back.
public:
Sender (void);
~Sender (void);
int open (const ACE_TCHAR *host, u_short port);
void close (void);
ACE_HANDLE handle (void) const;
protected:
// These methods are called by the freamwork
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
// This is called when asynchronous reads from the socket complete
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
// This is called when asynchronous writes from the socket complete
private:
int initiate_read_stream (void);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -