📄 test_proactor2.cpp
字号:
// test_proactor2.cpp,v 1.8 2003/11/10 01:48:03 dhinton Exp
// ============================================================================
//
// = LIBRARY
// examples
//
// = FILENAME
// test_proactor2.cpp
//
// = DESCRIPTION
// Alexander Libman <Alibman@baltimore.com> modified
// <test_proactor> and made this test. Instead of writing received
// data to the file, the receiver sends them back to the
// sender,i.e. ACE_Asynch_Write_File wf_ has been changed to
// ACE_Asynch_Write_Stream wf_.
//
// = AUTHOR
// Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman
// <Alibman@baltimore.com>.
// ============================================================================
#include "ace/Signal.h"
#include "ace/Service_Config.h"
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Get_Opt.h"
// FUZZ: disable check_for_streams_include
#include "ace/streams.h"
#include "ace/Task.h"
#include "ace/OS_main.h"
ACE_RCSID(Proactor, test_proactor2, "test_proactor2.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")
#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
// This only works on Win32 platforms and on Unix platforms supporting
// POSIX aio calls.
#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
#include "ace/WIN32_Proactor.h"
#elif defined (ACE_HAS_AIO_CALLS)
#include "ace/POSIX_Proactor.h"
#endif
// Some debug helper functions
int DisableSignal ( int SigNum );
int PrintSigMask ();
#define COUT(X) cout << X ; cout.flush ();
// Host that we're connecting to.
static ACE_TCHAR *host = 0;
// duplex mode: ==0 half-duplex
// !=0 full duplex
static int duplex = 0 ;
// number threads in the Proactor thread pool
static int nThreads = 1;
// Port that we're receiving connections on.
static u_short port = ACE_DEFAULT_SERVER_PORT;
// Size of each initial asynchronous <read> operation.
static int initial_read_size = BUFSIZ;
#define MyMutex ACE_Recursive_Thread_Mutex
//#define MyMutex ACE_Thread_Mutex
//#define MyMutex ACE_Null_Mutex
//--------------------------------------------------------------------------
// MyTask plays role for Proactor threads pool
//--------------------------------------------------------------------------
class MyTask: public ACE_Task<ACE_MT_SYNCH>
{
public:
int svc (void) ;
};
int MyTask::svc (void )
{
ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));
while ( ACE_Proactor::event_loop_done () == 0 )
{
ACE_Proactor::run_event_loop ();
}
ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
return 0 ;
}
//-----------------------------------------------------------
// Receiver
//-----------------------------------------------------------
class Receiver : public ACE_Service_Handler
{
public:
Receiver (void);
~Receiver (void);
virtual void open (ACE_HANDLE handle,
ACE_Message_Block &message_block);
// This is called after the new connection has been accepted.
protected:
// These methods are called by the framework
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result);
// This is called when asynchronous <read> operation from the socket
// complete.
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result);
// This is called when an asynchronous <write> to the file
// completes.
private:
int initiate_read_stream (void);
int initiate_write_stream (ACE_Message_Block & mb, int nBytes );
bool check_destroy () ;
ACE_Asynch_Read_Stream rs_;
ACE_Asynch_Write_Stream ws_;
ACE_HANDLE handle_;
MyMutex m_Mtx ;
long nIOCount ;
static long nSessions ;
};
long Receiver::nSessions = 0 ;
Receiver::Receiver (void)
: handle_ (ACE_INVALID_HANDLE),
nIOCount ( 0 )
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
nSessions ++ ;
ACE_DEBUG ((LM_DEBUG, "Receiver Ctor nSessions=%d\n", nSessions ));
}
Receiver::~Receiver (void)
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
nSessions -- ;
ACE_OS::closesocket (this->handle_);
ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor nSessions=%d\n", nSessions ));
}
//---------------------------------------------------------------------
// return true if we alive, false we commited suicide
//
//---------------------------------------------------------------------
bool Receiver::check_destroy ()
{
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
if ( nIOCount > 0 )
{
return true ;
}
}
delete this ;
return false ;
}
void Receiver::open (ACE_HANDLE handle,
ACE_Message_Block &message_block)
{
ACE_UNUSED_ARG (message_block);
ACE_DEBUG ((LM_DEBUG,
"%N:%l:Receiver::open called\n"));
this->handle_ = handle;
if (this->ws_.open (*this, this->handle_ ) == -1)
{
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Stream::open"));
}
else if (this->rs_.open (*this, this->handle_) == -1)
{
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::open"));
}
else
{
initiate_read_stream ();
}
check_destroy ();
}
int Receiver::initiate_read_stream (void)
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
// Create a new <Message_Block>. Note that this message block will
// be used both to <read> data asynchronously from the socket and to
// <write> data asynchronously to the file.
ACE_DEBUG ((LM_DEBUG,
"initiate_read_stream called\n"));
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
ACE_Message_Block (BUFSIZ + 1),
-1);
// Inititiate read
if (this->rs_.read (*mb, mb->size ()- 1) == -1)
{
mb->release () ;
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::read"),
-1);
}
nIOCount++ ;
return 0;
}
int Receiver::initiate_write_stream (ACE_Message_Block & mb, int nBytes )
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
if (this->ws_.write (mb , nBytes ) == -1)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_File::write"),
-1);
}
nIOCount++ ;
return 0;
}
void
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG,
"handle_read_stream called\n"));
// Reset pointers.
result.message_block ().rd_ptr ()[result.bytes_transferred ()] =
'\0';
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read
()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
result.message_block ().rd_ptr ()));
if ( result.success () && result.bytes_transferred () != 0)
{
// Successful read: write the data to the file asynchronously.
// Note how we reuse the <ACE_Message_Block> for the writing.
// Therefore, we do not delete this buffer because it is handled
// in <handle_write_stream>.
if(this->initiate_write_stream (result.message_block (),
result.bytes_transferred () ) == 0 )
{
if ( duplex != 0 )
{
// Initiate new read from the stream.
this->initiate_read_stream () ;
}
}
}
else
{
result.message_block ().release ();
ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
}
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
nIOCount-- ;
}
check_destroy () ;
}
void
Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result)
{
ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",
result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
result.message_block ().release ();
if (result.success ())
{
// This code is not robust enough to deal with short file writes
// (which hardly ever happen) ;-)
//ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
if ( duplex == 0 )
{
initiate_read_stream () ;
}
}
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
nIOCount-- ;
}
check_destroy () ;
}
//-------------------------------------------------------------------------
// Sender: sends indefinetely welcome message
// and recieves it back
//------------------------------------------------------------------------
class Sender : public ACE_Handler
{
public:
Sender (void);
~Sender (void);
int open (const ACE_TCHAR *host, u_short port);
void close ();
ACE_HANDLE handle (void) const;
void handle (ACE_HANDLE);
protected:
// These methods are called by the freamwork
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result);
// This is called when asynchronous reads from the socket complete
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result);
// This is called when asynchronous writes from the socket complete
private:
int initiate_read_stream (void);
int initiate_write_stream (void);
ACE_SOCK_Stream stream_;
// Network I/O handle
ACE_Asynch_Write_Stream ws_;
// ws (write stream): for writing to the socket
ACE_Asynch_Read_Stream rs_;
// rs (read file): for reading from the socket
ACE_Message_Block welcome_message_;
// Welcome message
MyMutex m_Mtx ;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -