📄 test_proactor.cpp
字号:
// test_proactor.cpp,v 1.33 2002/04/27 13:26:34 jwillemsen Exp
// ============================================================================
//
// = LIBRARY
// examples
//
// = FILENAME
// test_proactor.cpp
//
// = DESCRIPTION
// This program illustrates how the <ACE_Proactor> can be used to
// implement an application that does various asynchronous
// operations.
//
// = AUTHOR
// Irfan Pyarali <irfan@cs.wustl.edu>
//
// ============================================================================
#include "ace/Service_Config.h"
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Get_Opt.h"
#include "ace/streams.h"
ACE_RCSID(Proactor, test_proactor, "test_proactor.cpp,v 1.33 2002/04/27 13:26:34 jwillemsen Exp")
#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
// This only works on Win32 platforms and on Unix platforms supporting
// POSIX aio calls.
// Host that we're connecting to.
static ACE_TCHAR *host = 0;
// Port that we're receiving connections on.
static u_short port = ACE_DEFAULT_SERVER_PORT;
// File that we're sending.
static ACE_TCHAR *file = ACE_TEXT("test_proactor.cpp");
// Name of the output file.
static ACE_TCHAR *dump_file = ACE_TEXT("output");
// Keep track of when we're done.
static int done = 0;
// Size of each initial asynchronous <read> operation.
static int initial_read_size = BUFSIZ;
class Receiver : public ACE_Service_Handler
{
// = TITLE
// The class will be created by <ACE_Asynch_Acceptor> when new
// connections arrive. This class will then receive data from
// the network connection and dump it to a file.
public:
// = Initialization and termination.
Receiver (void);
~Receiver (void);
virtual void open (ACE_HANDLE handle,
ACE_Message_Block &message_block);
// This is called after the new connection has been accepted.
protected:
// These methods are called by the framework
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
// This is called when asynchronous <read> operation from the socket
// complete.
virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
// This is called when an asynchronous <write> to the file
// completes.
private:
int initiate_read_stream (void);
// Initiate an asynchronous <read> operation on the socket.
ACE_Asynch_Read_Stream rs_;
// rs (read stream): for reading from a socket.
ACE_HANDLE dump_file_;
// File for dumping data.
ACE_Asynch_Write_File wf_;
// wf (write file): for writing to a file.
u_long file_offset_;
// Offset for the file.
ACE_HANDLE handle_;
// Handle for IO to remote peer.
};
Receiver::Receiver (void)
: dump_file_ (ACE_INVALID_HANDLE),
handle_ (ACE_INVALID_HANDLE)
{
}
Receiver::~Receiver (void)
{
ACE_OS::close (this->dump_file_);
ACE_OS::closesocket (this->handle_);
}
void
Receiver::open (ACE_HANDLE handle,
ACE_Message_Block &message_block)
{
ACE_DEBUG ((LM_DEBUG,
"%N:%l:Receiver::open called\n"));
// New connection, so initiate stuff.
// Cache the new connection
this->handle_ = handle;
// File offset starts at zero
this->file_offset_ = 0;
// Open dump file (in OVERLAPPED mode)
this->dump_file_ = ACE_OS::open (dump_file,
O_CREAT | O_RDWR | O_TRUNC | FILE_FLAG_OVERLAPPED,
0644);
if (this->dump_file_ == ACE_INVALID_HANDLE)
{
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_OS::open"));
return;
}
// Initiate <ACE_Asynch_Write_File>.
if (this->wf_.open (*this,
this->dump_file_) == -1)
{
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_File::open"));
return;
}
// Initiate <ACE_Asynch_Read_Stream>.
if (this->rs_.open (*this, this->handle_) == -1)
{
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::open"));
return;
}
// Fake the result and make the <handle_read_stream> get
// called. But, not, if there is '0' is transferred.
if (message_block.length () != 0)
{
// Duplicate the message block so that we can keep it around.
ACE_Message_Block &duplicate =
*message_block.duplicate ();
// Fake the result so that we will get called back.
ACE_Asynch_Read_Stream_Result_Impl *fake_result =
ACE_Proactor::instance ()->create_asynch_read_stream_result (*this,
this->handle_,
duplicate,
initial_read_size,
0,
ACE_INVALID_HANDLE,
0,
0);
size_t bytes_transferred = message_block.length ();
// <complete> for Accept would have already moved the <wr_ptr>
// forward. Update it to the beginning position.
duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);
// This will call the callback.
fake_result->complete (message_block.length (),
1,
0);
// Zap the fake result.
delete fake_result;
}
else
// Otherwise, make sure we proceed. Initiate reading the socket
// stream.
if (this->initiate_read_stream () == -1)
return;
}
int
Receiver::initiate_read_stream (void)
{
// Create a new <Message_Block>. Note that this message block will
// be used both to <read> data asynchronously from the socket and to
// <write> data asynchronously to the file.
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
ACE_Message_Block (BUFSIZ + 1),
-1);
// Inititiate read
if (this->rs_.read (*mb,
mb->size () - 1) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::read"),
-1);
return 0;
}
void
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG,
"handle_read_stream called\n"));
// Reset pointers.
result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
#if 0
// This can overrun the ACE_Log_Msg buffer and do bad things.
// Re-enable it at your risk.
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
#endif /* 0 */
if (result.success () && result.bytes_transferred () != 0)
{
// Successful read: write the data to the file asynchronously.
// Note how we reuse the <ACE_Message_Block> for the writing.
// Therefore, we do not delete this buffer because it is handled
// in <handle_write_stream>.
if (this->wf_.write (result.message_block (),
result.bytes_transferred (),
this->file_offset_) == -1)
{
ACE_ERROR ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_File::write"));
return;
}
// Initiate new read from the stream.
if (this->initiate_read_stream () == -1)
return;
}
else
{
ACE_DEBUG ((LM_DEBUG,
"Receiver completed\n"));
// No need for this message block anymore.
result.message_block ().release ();
// Note that we are done with the test.
done = 1;
// We are done: commit suicide.
delete this;
}
}
void
Receiver::handle_write_file (const ACE_Asynch_Write_File::Result &result)
{
ACE_DEBUG ((LM_DEBUG, "handle_write_file called\n"));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
result.message_block ().release ();
if (result.success ())
// Write successful: Increment file offset
this->file_offset_ += result.bytes_transferred ();
// This code is not robust enough to deal with short file writes
// (which hardly ever happen) ;-)
ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
}
class Sender : public ACE_Handler
{
// = TITLE
// The class will be created by <main>. After connecting to the
// host, this class will then read data from a file and send it
// to the network connection.
public:
Sender (void);
~Sender (void);
int open (const ACE_TCHAR *host,
u_short port);
ACE_HANDLE handle (void) const;
void handle (ACE_HANDLE);
protected:
// These methods are called by the freamwork
virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result);
// This is called when asynchronous transmit files complete
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
// This is called when asynchronous writes from the socket complete
virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
// This is called when asynchronous reads from the socket complete
private:
int transmit_file (void);
// Transmit the entire file in one fell swoop.
int initiate_read_file (void);
// Initiate an asynchronous file read.
ACE_SOCK_Stream stream_;
// Network I/O handle
ACE_Asynch_Write_Stream ws_;
// ws (write stream): for writing to the socket
ACE_Asynch_Read_File rf_;
// rf (read file): for writing from the file
ACE_Asynch_Transmit_File tf_;
// Transmit file.
ACE_HANDLE input_file_;
// File to read from
u_long file_offset_;
// Current file offset
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -