📄 consumer_router.cpp
字号:
// Consumer_Router.cpp,v 4.10 2004/01/05 22:57:06 shuston Exp
#include "ace/os_include/os_assert.h"
#include "ace/OS_NS_stdio.h"
#include "ace/OS_NS_string.h"
#include "Consumer_Router.h"
#include "Options.h"
ACE_RCSID(Event_Server, Consumer_Router, "Consumer_Router.cpp,v 4.10 2004/01/05 22:57:06 shuston Exp")
Consumer_Router::Consumer_Router (Peer_Router_Context *prc)
: Peer_Router (prc)
{
this->context ()->duplicate ();
}
// Initialize the Router.
int
Consumer_Router::open (void *)
{
if (this->is_writer ())
{
// Set the <Peer_Router_Context> to point back to us so that if
// any Consumer's "accidentally" send us data we'll be able to
// handle it by passing it down the stream.
this->context ()->peer_router (this);
// Increment the reference count.
this->context ()->duplicate ();
// Make this an active object to handle the error cases in a
// separate thread. This is mostly just for illustration, i.e.,
// it's probably overkill to use a thread for this!
return this->activate (Options::instance ()->t_flags ());
}
else // if (this->is_reader ())
// Nothing to do since this side is primarily used to transmit to
// Consumers, rather than receive.
return 0;
}
int
Consumer_Router::close (u_long)
{
ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router %s\n",
this->is_reader () ? "reader" : "writer"));
if (this->is_writer ())
// Inform the thread to shut down.
this->msg_queue ()->deactivate ();
// Both writer and reader call <release>, so the context knows when
// to clean itself up.
this->context ()->release ();
return 0;
}
// Handle incoming messages in a separate thread.
int
Consumer_Router::svc (void)
{
assert (this->is_writer ());
ACE_DEBUG ((LM_DEBUG,
"(%t) starting svc in Consumer_Router\n"));
for (ACE_Message_Block *mb = 0;
this->getq (mb) >= 0;
)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) warning: Consumer_Router is "
"forwarding a message to Supplier_Router\n"));
// Pass this message down to the next Module's writer Task.
if (this->put_next (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) send_peers failed in Consumer_Router\n"),
-1);
}
ACE_DEBUG ((LM_DEBUG,
"(%t) stopping svc in Consumer_Router\n"));
return 0;
// Note the implicit ACE_OS::thr_exit() via destructor.
}
// Send a <Message_Block> to the supplier(s).
int
Consumer_Router::put (ACE_Message_Block *mb,
ACE_Time_Value *)
{
// Perform the necessary control operations before passing the
// message down the stream.
if (mb->msg_type () == ACE_Message_Block::MB_IOCTL)
{
this->control (mb);
return this->put_next (mb);
}
// If we're the reader then we're responsible for broadcasting
// messages to Consumers.
else if (this->is_reader ())
{
if (this->context ()->send_peers (mb) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
"(%t) send_peers failed in Consumer_Router\n"),
-1);
else
return 0;
}
else // if (this->is_writer ())
// Queue up the message to processed by <Consumer_Router::svc>
// Since we don't expect to be getting many of these messages, we
// queue them up and run them in a separate thread to avoid taxing
// the main thread.
return this->putq (mb);
}
// Return information about the <Consumer_Router>.
int
Consumer_Router::info (char **strp, size_t length) const
{
char buf[BUFSIZ];
ACE_INET_Addr addr;
const char *mod_name = this->name ();
if (this->context ()->acceptor ().get_local_addr (addr) == -1)
return -1;
ACE_OS::sprintf (buf,
"%s\t %d/%s %s (%s)\n",
mod_name,
addr.get_port_number (),
"tcp",
"# consumer router",
this->is_reader () ? "reader" : "writer");
if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0)
return -1;
else
ACE_OS::strncpy (*strp, mod_name, length);
return ACE_OS::strlen (mod_name);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -