📄 test_proactor2.cpp
字号:
long nIOCount ;
};
static char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
Sender::Sender (void)
:nIOCount ( 0 )
{
// Moment of inspiration... :-)
this->welcome_message_.init (data, ACE_OS::strlen (data));
}
Sender::~Sender (void)
{
close ();
}
void Sender::close ()
{
this->stream_.close ();
}
ACE_HANDLE Sender::handle (void) const
{
return this->stream_.get_handle ();
}
void Sender::handle (ACE_HANDLE handle)
{
this->stream_.set_handle (handle);
}
int Sender::open (const ACE_TCHAR *host, u_short port)
{
// Initialize stuff
// Connect to remote host
ACE_INET_Addr address (port, host);
ACE_SOCK_Connector connector;
if (connector.connect (this->stream_,
address) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_SOCK_Connector::connect"),
-1);
}
// Open ACE_Asynch_Write_Stream
if (this->ws_.open (*this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_Stream::open"),
-1);
// Open ACE_Asynch_Read_Stream
if (this->rs_.open (*this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_File::open"),
-1);
// Start an asynchronous transmit file
if ( this->initiate_write_stream () == -1)
return -1;
if ( duplex != 0 )
{
// Start an asynchronous read file
if (this->initiate_read_stream () == -1)
return -1;
}
return 0;
}
int Sender::initiate_write_stream (void)
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
welcome_message_.rd_ptr( welcome_message_.base ());
welcome_message_.wr_ptr( welcome_message_.base ());
welcome_message_.wr_ptr (ACE_OS::strlen (data));
if (this->ws_.write (welcome_message_,
welcome_message_.length ()
) == -1)
{
ACE_ERROR_RETURN((LM_ERROR,
"%p\n",
"ACE_Asynch_Write_File::write"),
-1);
}
nIOCount++ ;
return 0;
}
int Sender::initiate_read_stream (void)
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
// Create a new <Message_Block>. Note that this message block will
// be used both to <read> data asynchronously from the socket and to
// <write> data asynchronously to the file.
ACE_DEBUG ((LM_DEBUG,
"initiate_read_stream called\n"));
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
ACE_Message_Block (BUFSIZ + 1),
-1);
// Inititiate read
if (this->rs_.read (*mb, mb->size ()- 1) == -1)
{
mb->release () ;
ACE_ERROR_RETURN ((LM_ERROR,
"%p\n",
"ACE_Asynch_Read_Stream::read"),
-1);
}
nIOCount++ ;
return 0;
}
void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result)
{
ACE_DEBUG ((LM_DEBUG,
"handle_write_stream called\n"));
// Reset pointers.
result.message_block ().rd_ptr (result.message_block ().rd_ptr () -
result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",
result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
result.message_block ().rd_ptr ()));
// Simplify just for Test
if (result.success () && result.bytes_transferred () != 0)
{
if ( duplex != 0 ) // full duplex, continue write
{
initiate_write_stream () ;
}
else // half-duplex read reply, after read we will start
// write
{
initiate_read_stream () ;
}
}
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
nIOCount-- ;
}
}
void
Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
ACE_DEBUG ((LM_DEBUG,
"handle_read_stream called\n"));
// Reset pointers.
result.message_block ().rd_ptr ()[result.bytes_transferred ()] =
'\0';
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read
()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
result.completion_key ()));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
result.message_block ().rd_ptr ()));
result.message_block().release ();
if ( result.success () && result.bytes_transferred () != 0)
{
// Successful read: write the data to the file asynchronously.
// Note how we reuse the <ACE_Message_Block> for the writing.
// Therefore, we do not delete this buffer because it is handled
// in <handle_write_stream>.
if ( duplex != 0 ) // full duplex, continue read
{
initiate_read_stream () ;
}
else // half-duplex writey, after write we will start read
{
initiate_write_stream () ;
}
}
{
ACE_Guard<MyMutex> locker (m_Mtx) ;
nIOCount-- ;
}
}
//--------------------------------------------------------------------------
static int
parse_args (int argc, ACE_TCHAR *argv[])
{
ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:p:d:h:"));
int c;
while ((c = get_opt ()) != EOF)
switch (c)
{
case 'h':
host = get_opt.opt_arg ();
break;
case 'n':
nThreads = ACE_OS::atoi (get_opt.opt_arg ()) ;
break;
case 'p':
port = ACE_OS::atoi (get_opt.opt_arg ());
break;
case 'd':
duplex = ACE_OS::atoi (get_opt.opt_arg ());
break;
default:
ACE_ERROR ((LM_ERROR, "%p.\n",
"usage :\n"
"-h <host> for Sender mode\n"
"-d <duplex mode 1-on/0-off>\n"
"-p <port to listen/connect>\n"
"-n <number threads for Proactor pool>\n"));
return -1;
}
return 0;
}
int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
ACE_UNUSED_ARG (initial_read_size);
if (parse_args (argc, argv) == -1)
return -1;
#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
ACE_WIN32_Proactor * pImpl = new ACE_WIN32_Proactor;
#elif defined (ACE_HAS_AIO_CALLS)
// ACE_POSIX_AIOCB_Proactor * pImpl = new ACE_POSIX_AIOCB_Proactor;
ACE_POSIX_SIG_Proactor * pImpl = new ACE_POSIX_SIG_Proactor;
#endif
ACE_Proactor Proactor ( pImpl ,1 );
ACE_Proactor::instance( & Proactor );
MyTask Task1 ;
if (Task1.activate (THR_NEW_LWP, nThreads ) == -1)
{
ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);
}
Sender sender;
ACE_Asynch_Acceptor<Receiver> acceptor;
int Rc = -1 ;
if ( host == NULL ) // Acceptor
{
// Simplify , initial read with zero size
Rc = acceptor.open (ACE_INET_Addr (port),0,1);
}
else
{
Rc = sender.open (host, port);
}
if ( Rc == 0 )
{
char c ;
cout << "Press any key to stop and exit=>\n" << flush ;
cin.clear ();
cin >> c ;
}
ACE_Proactor::end_event_loop () ;
if ( host != NULL ) // we are sender
{
sender.close () ; // disconnect to get reciever error !!!
}
ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();
pTM->wait_task ( & Task1 ) ;
ACE_Proactor::instance( ( ACE_Proactor* )NULL );
return 0;
}
//--------------------------------------------------------------------
//
//--------------------------------------------------------------------
int DisableSignal ( int SigNum )
{
#ifndef ACE_WIN32
sigset_t signal_set;
if ( sigemptyset (&signal_set) == - 1 )
{
ACE_ERROR ((LM_ERROR,
"Error:(%P | %t):%p\n",
"sigemptyset failed"));
}
sigaddset (&signal_set, SigNum);
// Put the <signal_set>.
if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
{
ACE_ERROR ((LM_ERROR,
"Error:(%P | %t):%p\n",
"pthread_sigmask failed"));
}
#else
ACE_UNUSED_ARG(SigNum);
#endif
return 1;
}
//--------------------------------------------------------------------
// Get the <signal_set> back from the OS.
//--------------------------------------------------------------------
int PrintSigMask ()
{
#ifndef ACE_WIN32
sigset_t mask ;
int member = 0;
COUT ( "\n=============Signal Mask==========" )
if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask ) != 0)
{
ACE_ERROR ((LM_ERROR,
"Error:(%P | %t):%p\n",
"ACE_OS::pthread_sigmask failed"));
}
else for (int i = 1 ; i < 1000; i++)
{
member = sigismember (&mask,i);
COUT ( "\nSig " )
COUT ( i )
COUT ( " is " )
COUT (member )
if (member == -1)
{
break ;
}
}
#endif
return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Asynch_Acceptor<Receiver>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Asynch_Acceptor<Receiver>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -