test_udp_proactor.cpp
来自「ace开发环境 用来开发网络程序 其运用了设计模式、多平台、C++等多种知识」· C++ 代码 · 共 436 行
CPP
436 行
// $Id: test_udp_proactor.cpp 78962 2007-07-20 03:27:14Z sowayaa $// ============================================================================//// = LIBRARY// examples//// = FILENAME// test_udp_proactor.cpp//// = DESCRIPTION// This program illustrates how the <ACE_Proactor> can be used to// implement an application that does asynchronous operations using// datagrams.//// = AUTHOR// Irfan Pyarali <irfan@cs.wustl.edu> and// Roger Tragin <r.tragin@computer.org>//// ============================================================================#include "ace/OS_NS_string.h"#include "ace/OS_main.h"#include "ace/Proactor.h"#include "ace/Asynch_IO.h"#include "ace/INET_Addr.h"#include "ace/SOCK_Dgram.h"#include "ace/Message_Block.h"#include "ace/Get_Opt.h"#include "ace/Log_Msg.h"ACE_RCSID(Proactor, test_udp_proactor, "test_proactor.cpp,v 1.29 2001/02/02 23:41:16 shuston Exp")#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) // This only works on asynch I/O-capable platforms.// Host that we're connecting to.static ACE_TCHAR *host = 0;// Port that we're receiving connections on.static u_short port = ACE_DEFAULT_SERVER_PORT;// Keep track of when we're done.static int done = 0;class Receiver : public ACE_Service_Handler{ // = TITLE // This class will receive data from // the network connection and dump it to a file.public: // = Initialization and termination. Receiver (void); ~Receiver (void); int open_addr (const ACE_INET_Addr &localAddr);protected: // These methods are called by the framework /// This method will be called when an asynchronous read completes on /// a UDP socket. virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);private: ACE_SOCK_Dgram sock_dgram_; ACE_Asynch_Read_Dgram rd_; // rd (read dgram): for reading from a UDP socket. const char* completion_key_; const char* act_;};Receiver::Receiver (void) : completion_key_ ("Receiver Completion Key"), act_ ("Receiver ACT"){}Receiver::~Receiver (void){ sock_dgram_.close ();}intReceiver::open_addr (const ACE_INET_Addr &localAddr){ ACE_DEBUG ((LM_DEBUG, "%N:%l:Receiver::open_addr called\n")); // Create a local UDP socket to receive datagrams. if (this->sock_dgram_.open (localAddr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_SOCK_Dgram::open"), -1); // Initialize the asynchronous read. if (this->rd_.open (*this, this->sock_dgram_.get_handle (), this->completion_key_, ACE_Proactor::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Dgram::open"), -1); // Create a buffer to read into. We are using scatter/gather to // read the message header and message body into 2 buffers // create a message block to read the message header ACE_Message_Block* msg = 0; ACE_NEW_RETURN (msg, ACE_Message_Block (1024), -1); // the next line sets the size of the header, even though we // allocated a the message block of 1k, by setting the size to 20 // bytes then the first 20 bytes of the reveived datagram will be // put into this message block. msg->size (20); // size of header to read is 20 bytes // create a message block to read the message body ACE_Message_Block* body = 0; ACE_NEW_RETURN (body, ACE_Message_Block (1024), -1); // The message body will not exceed 1024 bytes, at least not in this test. // set body as the cont of msg. This associates the 2 message // blocks so that a read will fill the first block (which is the // header) up to size (), and use the cont () block for the rest of // the data. You can chain up to IOV_MAX message block using this // method. msg->cont (body); // ok lets do the asynch read size_t number_of_bytes_recvd = 0; int res = rd_.recv (msg, number_of_bytes_recvd, 0, PF_INET, this->act_); switch (res) { case 0: // this is a good error. The proactor will call our handler when the // read has completed. break; case 1: // actually read something, we will handle it in the handler callback ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes recieved immediately", number_of_bytes_recvd)); ACE_DEBUG ((LM_DEBUG, "********************\n")); res = 0; break; case -1: // Something else went wrong. ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Dgram::recv")); // the handler will not get called in this case so lets clean up our msg msg->release (); break; default: // Something undocumented really went wrong. ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Read_Dgram::recv")); msg->release (); break; } return res;}voidReceiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result){ ACE_DEBUG ((LM_DEBUG, "handle_read_dgram called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_INET_Addr peerAddr; result.remote_address (peerAddr); ACE_DEBUG ((LM_DEBUG, "%s = %s:%d\n", "peer_address", peerAddr.get_host_addr (), peerAddr.get_port_number ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); if (result.success () && result.bytes_transferred () != 0) { // loop through our message block and print out the contents for (const ACE_Message_Block* msg = result.message_block (); msg != 0; msg = msg->cont ()) { // use msg->length () to get the number of bytes written to the message // block. ACE_DEBUG ((LM_DEBUG, "Buf=[size=<%d>", msg->length ())); for (u_long i = 0; i < msg->length (); ++i) ACE_DEBUG ((LM_DEBUG, "%c", (msg->rd_ptr ())[i])); ACE_DEBUG ((LM_DEBUG, "]\n")); } } ACE_DEBUG ((LM_DEBUG, "Receiver completed\n")); // No need for this message block anymore. result.message_block ()->release (); // Note that we are done with the test. done++;}class Sender : public ACE_Handler{ // = TITLE // The class will be created by <main>.public: Sender (void); ~Sender (void); //FUZZ: disable check_for_lack_ACE_OS int open (const ACE_TCHAR *host, u_short port); //FUZZ: enable check_for_lack_ACE_OSprotected: // These methods are called by the freamwork virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result); // This is called when asynchronous writes from the dgram socket // completeprivate: ACE_SOCK_Dgram sock_dgram_; // Network I/O handle ACE_Asynch_Write_Dgram wd_; // wd (write dgram): for writing to the socket const char* completion_key_; const char* act_;};Sender::Sender (void) : completion_key_ ("Sender completion key"), act_ ("Sender ACT"){}Sender::~Sender (void){ this->sock_dgram_.close ();}intSender::open (const ACE_TCHAR *host, u_short port){ // Initialize stuff if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_SOCK_Dgram::open"), -1); // Initialize the asynchronous read. if (this->wd_.open (*this, this->sock_dgram_.get_handle (), this->completion_key_, ACE_Proactor::instance ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Dgram::open"), -1); // We are using scatter/gather to send the message header and // message body using 2 buffers // create a message block for the message header ACE_Message_Block* msg = 0; ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1); const char raw_msg [] = "To be or not to be."; // Copy buf into the Message_Block and update the wr_ptr (). msg->copy (raw_msg, ACE_OS::strlen (raw_msg) + 1); // create a message block for the message body ACE_Message_Block* body = 0; ACE_NEW_RETURN (body, ACE_Message_Block (100), -1); ACE_OS::memset (body->wr_ptr (), 'X', 100); body->wr_ptr (100); // always remember to update the wr_ptr () // set body as the cont of msg. This associates the 2 message blocks so // that a send will send the first block (which is the header) up to // length (), and use the cont () to get the next block to send. You can // chain up to IOV_MAX message block using this method. msg->cont (body); // do the asynch send size_t number_of_bytes_sent = 0; ACE_INET_Addr serverAddr (port, host); int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_); switch (res) { case 0: // this is a good error. The proactor will call our handler when the // send has completed. break; case 1: // actually sent something, we will handle it in the handler callback ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes sent immediately", number_of_bytes_sent)); ACE_DEBUG ((LM_DEBUG, "********************\n")); res = 0; break; case -1: // Something else went wrong. ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Dgram::recv")); // the handler will not get called in this case so lets clean up our msg msg->release (); break; default: // Something undocumented really went wrong. ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Dgram::recv")); msg->release (); break; } return res;}voidSender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result){ ACE_DEBUG ((LM_DEBUG, "handle_write_dgram called\n")); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "Sender completed\n")); // No need for this message block anymore. result.message_block ()->release (); // Note that we are done with the test. done++;}static intparse_args (int argc, ACE_TCHAR *argv[]){ ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:p:")); int c; while ((c = get_opt ()) != EOF) switch (c) { case 'h': host = get_opt.opt_arg (); break; case 'p': port = ACE_OS::atoi (get_opt.opt_arg ()); break; default: ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "usage :\n" "-h <host>\n"), -1); } return 0;}intACE_TMAIN (int argc, ACE_TCHAR *argv[]){ if (parse_args (argc, argv) == -1) return -1; Sender sender; Receiver receiver; // If passive side if (host == 0) { if (receiver.open_addr (ACE_INET_Addr (port)) == -1) return -1; } // If active side else if (sender.open (host, port) == -1) return -1; for (int success = 1; success > 0 && !done; ) // Dispatch events via Proactor singleton. success = ACE_Proactor::instance ()->handle_events (); return 0;}#else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS*/intACE_TMAIN (int, ACE_TCHAR *[]){ ACE_DEBUG ((LM_DEBUG, "This example does not work on this platform.\n")); return 1;}#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?