⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_proactor2.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
// test_proactor2.cpp,v 1.8 2003/11/10 01:48:03 dhinton Exp

// ============================================================================
//
// = LIBRARY
//    examples
//
// = FILENAME
//    test_proactor2.cpp
//
// = DESCRIPTION
//    Alexander Libman <Alibman@baltimore.com> modified
//    <test_proactor> and made this test. Instead of writing received
//    data to the file, the receiver sends them back to the
//    sender,i.e. ACE_Asynch_Write_File wf_  has been changed to
//    ACE_Asynch_Write_Stream wf_.
//
// = AUTHOR
//    Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman
//    <Alibman@baltimore.com>.
// ============================================================================

#include "ace/Signal.h"

#include "ace/Service_Config.h"
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Stream.h"
#include "ace/Message_Block.h"
#include "ace/Get_Opt.h"

// FUZZ: disable check_for_streams_include
#include "ace/streams.h"

#include "ace/Task.h"
#include "ace/OS_main.h"

ACE_RCSID(Proactor, test_proactor2, "test_proactor2.cpp,v 1.27 2000/03/07 17:15:56 schmidt Exp")

#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS)))
  // This only works on Win32 platforms and on Unix platforms supporting
  // POSIX aio calls.

#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)

#include "ace/WIN32_Proactor.h"

#elif defined (ACE_HAS_AIO_CALLS)

#include "ace/POSIX_Proactor.h"

#endif

  //  Some debug helper functions
  int DisableSignal ( int SigNum );
int PrintSigMask ();

#define  COUT(X)  cout << X ; cout.flush ();

// Host that we're connecting to.
static ACE_TCHAR *host = 0;

// duplex mode: ==0 half-duplex
//              !=0 full duplex
static int duplex = 0 ;

// number threads in the Proactor thread pool
static int nThreads = 1;

// Port that we're receiving connections on.
static u_short port = ACE_DEFAULT_SERVER_PORT;

// Size of each initial asynchronous <read> operation.
static int initial_read_size = BUFSIZ;


#define MyMutex ACE_Recursive_Thread_Mutex
//#define MyMutex ACE_Thread_Mutex
//#define MyMutex ACE_Null_Mutex

//--------------------------------------------------------------------------
// MyTask plays role for Proactor threads pool
//--------------------------------------------------------------------------
class MyTask: public ACE_Task<ACE_MT_SYNCH>
{

public:

  int svc (void) ;
};


int MyTask::svc (void )
{
  ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));

  while ( ACE_Proactor::event_loop_done () == 0 )
    {
      ACE_Proactor::run_event_loop ();
    }

  ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
  return 0 ;
}

//-----------------------------------------------------------
//  Receiver
//-----------------------------------------------------------
class Receiver : public ACE_Service_Handler
{
public:

  Receiver (void);
  ~Receiver (void);

  virtual void open (ACE_HANDLE handle,
                     ACE_Message_Block &message_block);
  // This is called after the new connection has been accepted.

protected:
  // These methods are called by the framework

  virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
                                   &result);
  // This is called when asynchronous <read> operation from the socket
  // complete.

  virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
                                    &result);
  // This is called when an asynchronous <write> to the file
  // completes.

private:
  int  initiate_read_stream  (void);
  int  initiate_write_stream (ACE_Message_Block & mb, int nBytes );
  bool check_destroy () ;

  ACE_Asynch_Read_Stream rs_;
  ACE_Asynch_Write_Stream ws_;
  ACE_HANDLE handle_;
  MyMutex m_Mtx ;
  long    nIOCount ;
  static long nSessions ;
};


long Receiver::nSessions = 0 ;

Receiver::Receiver (void)
  : handle_ (ACE_INVALID_HANDLE),
    nIOCount ( 0 )
{
  ACE_Guard<MyMutex> locker (m_Mtx) ;
  nSessions ++ ;
  ACE_DEBUG ((LM_DEBUG, "Receiver Ctor nSessions=%d\n", nSessions ));
}

Receiver::~Receiver (void)
{
  ACE_Guard<MyMutex> locker (m_Mtx) ;
  nSessions -- ;
  ACE_OS::closesocket (this->handle_);
  ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor nSessions=%d\n", nSessions ));
}

//---------------------------------------------------------------------
//  return true if we alive, false  we commited suicide
//
//---------------------------------------------------------------------
bool Receiver::check_destroy ()
{
  {
    ACE_Guard<MyMutex> locker (m_Mtx) ;

    if ( nIOCount > 0 )
      {
        return true ;
      }
  }

  delete this ;
  return false ;
}


void Receiver::open (ACE_HANDLE handle,
                     ACE_Message_Block &message_block)
{
  ACE_UNUSED_ARG (message_block);

  ACE_DEBUG ((LM_DEBUG,
              "%N:%l:Receiver::open called\n"));


  this->handle_ = handle;

  if (this->ws_.open (*this, this->handle_ ) == -1)
    {
      ACE_ERROR ((LM_ERROR,
                  "%p\n",
                  "ACE_Asynch_Write_Stream::open"));

    }
  else if (this->rs_.open (*this, this->handle_) == -1)
    {
      ACE_ERROR ((LM_ERROR,
                  "%p\n",
                  "ACE_Asynch_Read_Stream::open"));
    }
  else
    {
      initiate_read_stream ();
    }


  check_destroy ();
}

int Receiver::initiate_read_stream (void)
{
  ACE_Guard<MyMutex> locker (m_Mtx) ;

  // Create a new <Message_Block>.  Note that this message block will
  // be used both to <read> data asynchronously from the socket and to
  // <write> data asynchronously to the file.
  ACE_DEBUG ((LM_DEBUG,
              "initiate_read_stream called\n"));


  ACE_Message_Block *mb = 0;
  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (BUFSIZ + 1),
                  -1);

  // Inititiate read
  if (this->rs_.read (*mb, mb->size ()- 1) == -1)
    {
      mb->release () ;
      ACE_ERROR_RETURN ((LM_ERROR,
                         "%p\n",
                         "ACE_Asynch_Read_Stream::read"),
                        -1);
    }

  nIOCount++ ;
  return 0;
}

int Receiver::initiate_write_stream (ACE_Message_Block & mb, int nBytes )
{
  ACE_Guard<MyMutex> locker (m_Mtx) ;
  if (this->ws_.write (mb , nBytes ) == -1)
    {
      mb.release ();
      ACE_ERROR_RETURN((LM_ERROR,
                        "%p\n",
                        "ACE_Asynch_Write_File::write"),
                       -1);
    }

  nIOCount++ ;
  return 0;
}

void
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,
              "handle_read_stream called\n"));

  // Reset pointers.
  result.message_block ().rd_ptr ()[result.bytes_transferred ()] =
    '\0';

  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read
              ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
              result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
              result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
              result.message_block ().rd_ptr ()));

  if ( result.success () && result.bytes_transferred () != 0)
    {
      // Successful read: write the data to the file asynchronously.
      // Note how we reuse the <ACE_Message_Block> for the writing.
      // Therefore, we do not delete this buffer because it is handled
      // in <handle_write_stream>.

      if(this->initiate_write_stream (result.message_block (),

                                      result.bytes_transferred () ) == 0 )
        {
          if ( duplex != 0 )
            {
              // Initiate new read from the stream.
              this->initiate_read_stream () ;
            }
        }
    }
  else
    {
      result.message_block ().release ();
      ACE_DEBUG ((LM_DEBUG,  "Receiver completed\n"));
    }

  {
    ACE_Guard<MyMutex> locker (m_Mtx) ;
    nIOCount-- ;
  }
  check_destroy () ;
}

void
Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result
                               &result)
{
  ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));

  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",
              result.bytes_to_write ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
              result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
              result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));

  result.message_block ().release ();

  if (result.success ())
    {
      // This code is not robust enough to deal with short file writes
      // (which hardly ever happen) ;-)
      //ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());

      if ( duplex == 0 )
        {
          initiate_read_stream () ;
        }
    }

  {
    ACE_Guard<MyMutex> locker (m_Mtx) ;
    nIOCount-- ;
  }
  check_destroy () ;
}

//-------------------------------------------------------------------------
// Sender:  sends indefinetely welcome message
//  and  recieves it back
//------------------------------------------------------------------------
class Sender : public ACE_Handler
{
public:
  Sender (void);
  ~Sender (void);
  int open (const ACE_TCHAR *host, u_short port);
  void close ();
  ACE_HANDLE handle (void) const;
  void handle (ACE_HANDLE);

protected:
// These methods are called by the freamwork

virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result);
// This is called when asynchronous reads from the socket complete

virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result);
// This is called when asynchronous writes from the socket complete

private:

int initiate_read_stream (void);
int initiate_write_stream (void);

ACE_SOCK_Stream stream_;
// Network I/O handle

ACE_Asynch_Write_Stream ws_;
// ws (write stream): for writing to the socket

ACE_Asynch_Read_Stream rs_;
// rs (read file): for reading from the socket

ACE_Message_Block welcome_message_;
// Welcome message

MyMutex m_Mtx ;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -