📄 test_proactor3.cpp
字号:
int initiate_write_stream (void);
ACE_SOCK_Stream stream_;
// Network I/O handle
ACE_Asynch_Write_Stream ws_;
// ws (write stream): for writing to the socket
ACE_Asynch_Read_Stream rs_;
// rs (read file): for reading from the socket
ACE_Message_Block welcome_message_;
// Welcome message
ACE_Recursive_Thread_Mutex mutex_;
long io_count_;
};
static char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
Sender::Sender (void)
: io_count_ (0)
{
// Moment of inspiration... :-)
this->welcome_message_.init (data, ACE_OS::strlen (data));
}
Sender::~Sender (void)
{
close ();
}
void Sender::close (void)
{
this->stream_.close ();
}
ACE_HANDLE Sender::handle (void) const
{
return this->stream_.get_handle ();
}
int Sender::open (const ACE_TCHAR *host, u_short port)
{
// Initialize stuff
// Connect to remote host
ACE_INET_Addr address (port, host);
ACE_SOCK_Connector connector;
if (connector.connect (this->stream_,
address) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_SOCK_Connector::connect"),
-1);
}
// Open ACE_Asynch_Write_Stream
if (this->ws_.open (*this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Stream::open"),
-1);
// Open ACE_Asynch_Read_Stream
if (this->rs_.open (*this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::open"),
-1);
// Start an asynchronous transmit file
if (this->initiate_write_stream () == -1)
return -1;
if (duplex != 0)
// Start an asynchronous read file
if (this->initiate_read_stream () == -1)
return -1;
return 0;
}
int
Sender::initiate_write_stream (void)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
welcome_message_.rd_ptr(welcome_message_.base ());
welcome_message_.wr_ptr(welcome_message_.base ());
welcome_message_.wr_ptr (ACE_OS::strlen (data));
if (this->ws_.write (welcome_message_,
welcome_message_.length ()) == -1)
ACE_ERROR_RETURN((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Stream::write"),
-1);
io_count_++;
return 0;
}
int
Sender::initiate_read_stream (void)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
// Create a new <Message_Block>. Note that this message block will
// be used both to <read> data asynchronously from the socket and to
// <write> data asynchronously to the file.
ACE_DEBUG ((LM_DEBUG,
"initiate_read_stream called\n"));
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
ACE_Message_Block (BUFSIZ + 1),
-1);
// Inititiate read
if (this->rs_.read (*mb, mb->size ()- 1) == -1)
{
mb->release ();
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::read"),
-1);
}
io_count_++;
return 0;
}
void
Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
if (result.bytes_transferred () == 0 || result.error () != 0)
{
ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
// Reset pointers.
result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
}
// Simplify just for Test
if (result.success () && result.bytes_transferred () != 0)
{
if (duplex != 0) // full duplex, continue write
initiate_write_stream ();
else // half-duplex read reply, after read we will start write
initiate_read_stream ();
}
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
io_count_--;
}
}
void
Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
if (result.bytes_transferred () == 0 || result.error () != 0)
{
ACE_DEBUG ((LM_DEBUG,
"handle_read_stream called\n"));
// Reset pointers.
result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
}
result.message_block().release ();
if (result.success () && result.bytes_transferred () != 0)
{
// Successful read: write the data to the file asynchronously.
// Note how we reuse the <ACE_Message_Block> for the writing.
// Therefore, we do not delete this buffer because it is handled
// in <handle_write_stream>.
if (duplex != 0) // full duplex, continue read
initiate_read_stream ();
else // half-duplex writey, after write we will start read
initiate_write_stream ();
}
{
ACE_Guard<ACE_Recursive_Thread_Mutex> locker (mutex_);
io_count_--;
}
}
static int
set_proactor_type (const char *ptype)
{
if (!ptype)
return false;
switch (toupper (*ptype))
{
case 'D' : proactor_type = 0; return true;
case 'A' : proactor_type = 1; return true;
case 'I' : proactor_type = 2; return true;
#if defined (sun)
case 'S' : proactor_type = 3; return true;
#endif /* sun */
}
return false;
}
static int
parse_args (int argc, ACE_TCHAR *argv[])
{
ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:o:n:p:d:h:s:u"));
int c;
while ((c = get_opt ()) != EOF)
switch (c)
{
case 'd': // duplex
duplex = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 'h': // host for sender
host = get_opt.opt_arg ();
break;
case 'p': // port number
port = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 'n': // thread pool size
threads = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 's': // number of senders
senders = ACE_OS::atoi (get_opt.opt_arg ());
if (senders > MaxSenders)
senders = MaxSenders;
break;
case 'o': // max number of aio for proactor
max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 't': // Proactor Type
if (set_proactor_type (get_opt.opt_arg ()))
break;
case 'u':
default:
ACE_ERROR ((LM_ERROR, "%p.",
"\nusage:"
"\n-o <max number of started aio operations for Proactor>"
"\n-t <Proactor type> UNIX-only, Win32-default always:"
"\n a AIOCB"
"\n i SIG"
"\n s SUN"
"\n d default"
"\n-d <duplex mode 1-on/0-off>"
"\n-h <host> for Sender mode"
"\n-n <number threads for Proactor pool>"
"\n-p <port to listen/connect>"
"\n-s <number of sender's instances>"
"\n-u show this message"
"\n"));
return -1;
}
return 0;
}
int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
#if defined (sun)
ACE_DEBUG ((LM_DEBUG, "\nSUN defined!\n"));
#endif
if (parse_args (argc, argv) == -1)
return -1;
disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
MyTask task1;
if (task1.activate (THR_NEW_LWP, threads) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p.\n",
"main"),
-1);
// wait for creation of Proactor
task1.waitready ();
Sender * send_list[MaxSenders];
ACE_Asynch_Acceptor<Receiver> acceptor;
int rc = -1;
int i;
char c;
if (host == 0) // Acceptor
{
// Simplify, initial read with zero size
if (acceptor.open (ACE_INET_Addr (port),0,1) == 0)
rc = 1;
}
else
{
for (i = 0; i < senders; ++i)
send_list[i] = new Sender;
for (i = 0; i < senders; ++i)
if (send_list[i]->open (host, port) == 0)
rc++;
}
if (rc > 0)
{
cout << "Press any key to stop=>" << flush;
cin.clear ();
cin >> c;
}
ACE_Proactor::end_event_loop ();
if (host != 0) // we are sender
{
for (i = 0; i < senders; ++i)
send_list[i]->close ();
}
ACE_Thread_Manager *tm =
ACE_Thread_Manager::instance();
tm->wait_task (&task1);
cout << "\nNumber of Receivers objects="
<< Receiver::get_number_sessions ()
<< flush;
for (i = 0; i < senders; ++i)
{
delete (send_list[i]);
send_list[i] = 0;
}
return 0;
}
static int
disable_signal (int sigmin, int sigmax)
{
#ifndef ACE_WIN32
sigset_t signal_set;
if (sigemptyset (&signal_set) == - 1)
ACE_ERROR ((LM_ERROR,
"Error:(%P | %t):%p\n",
"sigemptyset failed"));
for (int i = sigmin; i <= sigmax; i++)
sigaddset (&signal_set, i);
// Put the <signal_set>.
if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
ACE_ERROR ((LM_ERROR,
"Error:(%P | %t):%p\n",
"pthread_sigmask failed"));
#endif /* ACE_WIN32 */
return 1;
}
// Get the <signal_set> back from the OS.
#if 0
static int
print_sigmask (void)
{
#ifndef ACE_WIN32
sigset_t mask;
int member = 0;
COUT ("\n=============Signal Mask==========")
if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask) != 0)
ACE_ERROR ((LM_ERROR,
"Error:(%P | %t):%p\n",
"ACE_OS::pthread_sigmask failed"));
else
for (int i = 1; i < 1000; i++)
{
member = sigismember (&mask,i);
COUT ("\nSig ")
COUT (i)
COUT (" is ")
COUT (member)
if (member == -1)
break;
}
#endif /* ACE_WIN32 */
return 0;
}
#endif /* 0 */
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Asynch_Acceptor<Receiver>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Asynch_Acceptor<Receiver>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -