event_server.cpp
来自「最新的版本ACE-5.6.8,刚从外文网上搬下,与大家分享.」· C++ 代码 · 共 274 行
CPP
274 行
// $Id: event_server.cpp 82610 2008-08-12 19:46:36Z parsons $
// Test the event server.
#include "ace/OS_main.h"
#include "ace/Stream.h"
#include "ace/Service_Config.h"
#include "ace/UPIPE_Acceptor.h"
#include "ace/UPIPE_Connector.h"
#include "ace/Truncate.h"
// FUZZ: disable check_for_streams_include
#include "ace/streams.h"
#include "Options.h"
#include "Consumer_Router.h"
#include "Event_Analyzer.h"
#include "Supplier_Router.h"
#include "ace/Sig_Adapter.h"
#include "ace/OS_NS_unistd.h"
ACE_RCSID (UPIPE_Event_Server,
event_server,
"$Id: event_server.cpp 82610 2008-08-12 19:46:36Z parsons $")
#if defined (ACE_HAS_THREADS)
typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
// Handle SIGINT and terminate the entire application.
class Quit_Handler : public ACE_Sig_Adapter
{
public:
Quit_Handler (void);
virtual int handle_input (ACE_HANDLE fd);
};
Quit_Handler::Quit_Handler (void)
: ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop))
{
// Register to trap input from the user.
if (ACE_Event_Handler::register_stdin_handler (this,
ACE_Reactor::instance (),
ACE_Thread_Manager::instance ()) == -1)
ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
// Register to trap the SIGINT signal.
else if (ACE_Reactor::instance ()->register_handler
(SIGINT, this) == -1)
ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
}
int
Quit_Handler::handle_input (ACE_HANDLE)
{
options.stop_timer ();
ACE_DEBUG ((LM_INFO, " (%t) closing down the test\n"));
options.print_results ();
ACE_Reactor::end_event_loop();
return 0;
}
static void *
consumer (void *)
{
ACE_UPIPE_Stream c_stream;
ACE_UPIPE_Addr c_addr (ACE_TEXT ("/tmp/conupipe"));
int verb = options.verbose ();
int msiz = ACE_Utils::truncate_cast<int> (options.message_size ());
time_t secs, par1, par2;
time_t currsec;
if (verb)
cout << "consumer starting connect" << endl;
ACE_UPIPE_Connector con;
if (con.connect (c_stream, c_addr) == -1)
ACE_DEBUG ((LM_INFO, " (%t) connect failed\n"));
else
cout << "consumer :we're connected" << endl;
ACE_Message_Block *mb_p;
int done = 0;
int cnt = 0;
ACE_OS::time (&currsec);
par1 = currsec;
while (done == 0
&& (c_stream.recv (mb_p) != -1))
if (mb_p->length () > 1)
{
cnt++;
if (verb)
cout << " consumer received message !!!!!! "
<< mb_p->rd_ptr () << endl;
}
else
{
if (verb)
cout << "consumer got last mb"
<< (char) * (mb_p->rd_ptr ()) << endl;
c_stream.close ();
done = 1;
}
ACE_OS::time (&currsec);
par2 = currsec;
secs = par2 - par1;
if (secs <= 0)
secs=1;
ACE_DEBUG ((LM_INFO,
ACE_TEXT ("consumer got %d messages of size %d ")
ACE_TEXT ("within %: seconds\n"),
cnt, msiz, secs));
ACE_OS::sleep (2);
cout << "consumer terminating " << endl;
return 0;
}
static void *
supplier (void *dummy)
{
ACE_UPIPE_Stream s_stream;
ACE_UPIPE_Addr serv_addr (ACE_TEXT ("/tmp/supupipe"));
ACE_UPIPE_Connector con;
int iter = ACE_Utils::truncate_cast<int> (options.iterations ());
int verb = options.verbose ();
int msiz = ACE_Utils::truncate_cast<int> (options.message_size ());
cout << "supplier starting connect" << endl;
if (con.connect (s_stream, serv_addr) == -1)
ACE_DEBUG ((LM_INFO, " (%t) connect failed\n"));
cout << "supplier : we're connected" << endl;
int n;
n = 0;
ACE_Message_Block * mb_p;
while (n < iter)
{
mb_p = new ACE_Message_Block (msiz);
ACE_OS::strcpy (mb_p->rd_ptr (), (char *) dummy);
mb_p->length (msiz);
if (verb)
cout << "supplier sending 1 message_block" << endl;
if (s_stream.send (mb_p) == -1)
{
cout << "supplier send failed" << endl;
return (void *) -1;
}
n++;
}
mb_p = new ACE_Message_Block (10);
mb_p->length (1);
*mb_p->rd_ptr () = 'g';
cout << "supplier sending last message_block" << endl;
if (s_stream.send (mb_p) == -1)
{
cout << "supplier send last mb failed" << endl;
return (void *) -1;
}
mb_p = new ACE_Message_Block (10);
mb_p->length (0);
if (verb)
cout << "supplier sending very last message_block" << endl;
if (s_stream.send (mb_p) == -1)
{
cout << "supplier send very last mb failed" << endl;
return (void *) -1;
}
ACE_OS::sleep (2);
cout << "supplier terminating" << endl;
return 0;
}
int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
ACE_Service_Config daemon;
options.parse_args (argc, argv);
options.start_timer ();
// Primary ACE_Stream for EVENT_SERVER application.
MT_Stream event_server;
// Enable graceful shutdowns....
Quit_Handler quit_handler;
// Create the modules..
MT_Module *sr = new MT_Module (ACE_TEXT ("Supplier_Router"),
new Supplier_Router (ACE_Thread_Manager::instance ()));
MT_Module *ea = new MT_Module (ACE_TEXT ("Event_Analyzer"),
new Event_Analyzer,
new Event_Analyzer);
MT_Module *cr = new MT_Module (ACE_TEXT ("Consumer_Router"),
0, // 0 triggers the creation of a ACE_Thru_Task...
new Consumer_Router (ACE_Thread_Manager::instance ()));
// Push the modules onto the event_server stream.
if (event_server.push (sr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("push (Supplier_Router)")), -1);
if (event_server.push (ea) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("push (Event_Analyzer)")), -1);
if (event_server.push (cr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("push (Consumer_Router)")), -1);
// Set the high and low water marks appropriately.
int wm = ACE_Utils::truncate_cast<int> (options.low_water_mark ());
if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("push (setting low watermark)")), -1);
wm = ACE_Utils::truncate_cast<int> (options.high_water_mark ());
if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("push (setting high watermark)")), -1);
// spawn the two threads.
if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer), (void *) 0,
THR_NEW_LWP | THR_DETACHED) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1);
else if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier), (void *) "hello",
THR_NEW_LWP | THR_DETACHED) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1);
// Perform the main event loop waiting for the user to type ^C or to
// enter a line on the ACE_STDIN.
ACE_Reactor::instance ()->run_reactor_event_loop ();
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("main exiting\n")));
return 0;
}
#else
int
ACE_TMAIN (int, ACE_TCHAR *[])
{
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("test not defined for this platform\n")),
-1);
}
#endif /* ACE_HAS_THREADS */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?