📄 message_queue.cpp
字号:
// Message_Queue.cpp,v 1.2 2004/01/02 23:12:19 shuston Exp
#include "ace/SOCK_Acceptor.h"
#include "ace/Acceptor.h"
#include "Message_Receiver.h"
// Listing 5 code/ch12
int
HA_CommandHandler::svc (void)
{
while(1)
{
ACE_Message_Block *mb;
if (this->getq (mb) == -1)
break;
if (mb->msg_type () == ACE_Message_Block::MB_HANGUP)
{
mb->release ();
break;
}
else
{
// Get header pointer, then move past header to payload.
DeviceCommandHeader *dch
= (DeviceCommandHeader*)mb->rd_ptr ();
mb->rd_ptr (sizeof (DeviceCommandHeader));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("Message for device #%d with ")
ACE_TEXT ("command payload of:\n%s"),
dch->deviceId_, mb->rd_ptr ()));
this->rep_.update_device (dch->deviceId_,
mb->rd_ptr ());
mb->release ();
}
}
ACE_Reactor::instance ()->end_reactor_event_loop ();
return 0;
}
// Listing 5
// Listing 4 code/ch12
ACE_Message_Block *
Message_Receiver::shut_down_message (void)
{
ACE_Message_Block *mb;
ACE_NEW_RETURN
(mb, ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP), 0);
return mb;
}
// Listing 4
int
Message_Receiver::read_header (DeviceCommandHeader *dch)
{
ssize_t result =
this->peer ().recv_n (dch, sizeof (DeviceCommandHeader));
if (result <= 0)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("Recieve Failure")),
-1);
return 0;
}
// Listing 3 code/ch12
int
Message_Receiver::copy_payload (ACE_Message_Block *mb,
int payload_length)
{
int result =
this->peer ().recv_n (mb->wr_ptr (), payload_length);
if (result <= 0)
{
mb->release ();
return result;
}
mb->wr_ptr (payload_length);
return 0;
}
// Listing 3
// Listing 2 code/ch12
int
Message_Receiver::handle_input (ACE_HANDLE)
{
DeviceCommandHeader dch;
if (this->read_header (&dch) < 0)
return -1;
if (dch.deviceId_ < 0)
{
// Handle shutdown.
this->handler_->putq (shut_down_message ());
return -1;
}
ACE_Message_Block *mb;
ACE_NEW_RETURN
(mb, ACE_Message_Block (dch.length_ + sizeof dch), -1);
// Copy the header.
mb->copy ((const char*)&dch, sizeof dch);
// Copy the payload.
if (this->copy_payload (mb, dch.length_) < 0)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("Recieve Failure")), -1);
// Pass it off to the handler thread.
this->handler_->putq (mb);
return 0;
}
// Listing 2
static void report_usage (int argc, ACE_TCHAR *argv[])
{
if (argc < 2)
{
ACE_DEBUG ((LM_ERROR, ACE_TEXT ("%s port\n"), argv[1]));
ACE_OS::exit (-1);
}
}
class Acceptor : public ACE_Acceptor<Message_Receiver, ACE_SOCK_ACCEPTOR>
{
public:
Acceptor(HA_CommandHandler *handler) : handler_(handler)
{ }
protected:
virtual int make_svc_handler (Message_Receiver *&mr)
{
ACE_NEW_RETURN (mr, Message_Receiver (handler_), -1);
return 0;
}
private:
HA_CommandHandler *handler_;
};
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
report_usage (argc, argv);
u_short port = ACE_OS::atoi (argv[1]);
HA_Device_Repository rep;
HA_CommandHandler handler (rep);
ACE_ASSERT(handler.activate()==0);
//start up the handler.
Acceptor acceptor (&handler);
ACE_INET_Addr addr (port);
if (acceptor.open (addr) == -1)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("Failed to open connection")), -1);
ACE_Reactor::run_event_loop ();
//run the reactive event loop
handler.wait ();
//reap the handler before exiting.
return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Acceptor<Message_Receiver, ACE_SOCK_ACCEPTOR>;
template class ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Acceptor<Message_Receiver, ACE_SOCK_ACCEPTOR>
#pragma instantiate ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -