⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_proactor2.cpp

📁 这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用于网络游戏医学图像网关的高qos要求.更详细的内容可阅读相应的材料
💻 CPP
📖 第 1 页 / 共 2 页
字号:
long    nIOCount ;
};

static char *data = "Welcome to Irfan World! Irfan RULES here !!\n";

Sender::Sender (void)
  :nIOCount ( 0 )
{
  // Moment of inspiration... :-)
  this->welcome_message_.init (data, ACE_OS::strlen (data));
}

Sender::~Sender (void)
{
  close ();
}

void Sender::close ()
{
  this->stream_.close ();
}

ACE_HANDLE Sender::handle (void) const
{
  return this->stream_.get_handle ();
}

void Sender::handle (ACE_HANDLE handle)
{
  this->stream_.set_handle (handle);
}

int Sender::open (const ACE_TCHAR *host, u_short port)
{
  // Initialize stuff
  // Connect to remote host
  ACE_INET_Addr address (port, host);
  ACE_SOCK_Connector connector;

  if (connector.connect (this->stream_,
                         address) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "%p\n",
                         "ACE_SOCK_Connector::connect"),
                        -1);
    }

  // Open ACE_Asynch_Write_Stream
  if (this->ws_.open (*this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Write_Stream::open"),
                      -1);

  // Open ACE_Asynch_Read_Stream
  if (this->rs_.open (*this) == -1)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "%p\n",
                       "ACE_Asynch_Read_File::open"),
                      -1);

  // Start an asynchronous transmit file
  if ( this->initiate_write_stream () == -1)
    return -1;

  if ( duplex != 0 )
    {
      // Start an asynchronous read file
      if (this->initiate_read_stream () == -1)
        return -1;
    }

  return 0;
}

int Sender::initiate_write_stream (void)
{
  ACE_Guard<MyMutex> locker (m_Mtx) ;


  welcome_message_.rd_ptr( welcome_message_.base ());
  welcome_message_.wr_ptr( welcome_message_.base ());
  welcome_message_.wr_ptr (ACE_OS::strlen (data));

  if (this->ws_.write (welcome_message_,
                       welcome_message_.length ()
                       ) == -1)
    {
      ACE_ERROR_RETURN((LM_ERROR,
                        "%p\n",
                        "ACE_Asynch_Write_File::write"),
                       -1);
    }

  nIOCount++ ;
  return 0;
}

int Sender::initiate_read_stream (void)
{
  ACE_Guard<MyMutex> locker (m_Mtx) ;

  // Create a new <Message_Block>.  Note that this message block will
  // be used both to <read> data asynchronously from the socket and to
  // <write> data asynchronously to the file.
  ACE_DEBUG ((LM_DEBUG,
              "initiate_read_stream called\n"));


  ACE_Message_Block *mb = 0;
  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (BUFSIZ + 1),
                  -1);

  // Inititiate read
  if (this->rs_.read (*mb, mb->size ()- 1) == -1)
    {
      mb->release () ;
      ACE_ERROR_RETURN ((LM_ERROR,
                         "%p\n",
                         "ACE_Asynch_Read_Stream::read"),
                        -1);
    }

  nIOCount++ ;
  return 0;
}


void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result
                                  &result)
{
  ACE_DEBUG ((LM_DEBUG,
              "handle_write_stream called\n"));

  // Reset pointers.
  result.message_block ().rd_ptr (result.message_block ().rd_ptr () -
                                  result.bytes_transferred ());


  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",
              result.bytes_to_write ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
              result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
              result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
              result.message_block ().rd_ptr ()));

  // Simplify just for Test
  if (result.success () && result.bytes_transferred () != 0)
    {
      if ( duplex != 0 )  // full duplex, continue write
        {
          initiate_write_stream () ;
        }
      else  // half-duplex   read reply, after read we will start
        // write
        {
          initiate_read_stream () ;
        }
    }

  {
    ACE_Guard<MyMutex> locker (m_Mtx) ;
    nIOCount-- ;
  }
}

void
Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,
              "handle_read_stream called\n"));

  // Reset pointers.
  result.message_block ().rd_ptr ()[result.bytes_transferred ()] =
    '\0';

  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read
              ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
              result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
              result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, "********************\n"));
  ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
              result.message_block ().rd_ptr ()));

  result.message_block().release ();

  if ( result.success () && result.bytes_transferred () != 0)
    {
      // Successful read: write the data to the file asynchronously.
      // Note how we reuse the <ACE_Message_Block> for the writing.
      // Therefore, we do not delete this buffer because it is handled
      // in <handle_write_stream>.

      if ( duplex != 0 )  // full duplex, continue read
        {
          initiate_read_stream () ;
        }
      else  // half-duplex  writey, after write we will start read
        {
          initiate_write_stream () ;
        }
    }

  {
    ACE_Guard<MyMutex> locker (m_Mtx) ;
    nIOCount-- ;
  }
}

//--------------------------------------------------------------------------

static int
parse_args (int argc, ACE_TCHAR *argv[])
{
  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:p:d:h:"));
  int c;

  while ((c = get_opt ()) != EOF)
    switch (c)
      {
      case 'h':
        host = get_opt.opt_arg ();
        break;
      case 'n':
        nThreads = ACE_OS::atoi (get_opt.opt_arg ()) ;
        break;
      case 'p':
        port = ACE_OS::atoi (get_opt.opt_arg ());
        break;
      case 'd':
        duplex = ACE_OS::atoi (get_opt.opt_arg ());
        break;
      default:
        ACE_ERROR ((LM_ERROR, "%p.\n",
                    "usage :\n"
                    "-h <host> for Sender mode\n"
                    "-d <duplex mode 1-on/0-off>\n"
                    "-p <port to listen/connect>\n"
                    "-n <number threads for Proactor pool>\n"));
        return -1;
      }

  return 0;
}

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  ACE_UNUSED_ARG (initial_read_size);

  if (parse_args (argc, argv) == -1)
    return -1;

#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)

  ACE_WIN32_Proactor *      pImpl = new ACE_WIN32_Proactor;

#elif defined (ACE_HAS_AIO_CALLS)

  //  ACE_POSIX_AIOCB_Proactor *  pImpl = new ACE_POSIX_AIOCB_Proactor;
  ACE_POSIX_SIG_Proactor *  pImpl = new ACE_POSIX_SIG_Proactor;
#endif

  ACE_Proactor Proactor ( pImpl ,1 );

  ACE_Proactor::instance( & Proactor );


  MyTask  Task1 ;

  if (Task1.activate (THR_NEW_LWP, nThreads ) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);
    }

  Sender sender;
  ACE_Asynch_Acceptor<Receiver> acceptor;

  int Rc = -1 ;

  if ( host == NULL ) // Acceptor
    {
      // Simplify , initial read with  zero size
      Rc = acceptor.open (ACE_INET_Addr (port),0,1);

    }
  else
    {
      Rc = sender.open (host, port);
    }

  if ( Rc == 0 )
    {
      char c ;
      cout << "Press any key to stop and exit=>\n" << flush ;
      cin.clear ();
      cin >> c ;
    }

  ACE_Proactor::end_event_loop () ;

  if ( host != NULL ) // we are sender
    {
      sender.close () ; // disconnect to get reciever error !!!
    }


  ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();

  pTM->wait_task ( & Task1 ) ;

  ACE_Proactor::instance( ( ACE_Proactor* )NULL );

  return 0;
}
//--------------------------------------------------------------------
//
//--------------------------------------------------------------------
int DisableSignal ( int SigNum )
{

#ifndef ACE_WIN32
  sigset_t signal_set;
  if ( sigemptyset (&signal_set) == - 1 )
    {
      ACE_ERROR ((LM_ERROR,
                  "Error:(%P | %t):%p\n",
                  "sigemptyset failed"));
    }

  sigaddset (&signal_set, SigNum);

  //  Put the <signal_set>.
  if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
    {
      ACE_ERROR ((LM_ERROR,
                  "Error:(%P | %t):%p\n",
                  "pthread_sigmask failed"));
    }
#else
  ACE_UNUSED_ARG(SigNum);
#endif

  return  1;
}
//--------------------------------------------------------------------
// Get the <signal_set> back from the OS.
//--------------------------------------------------------------------

int PrintSigMask ()
{
#ifndef ACE_WIN32

  sigset_t  mask ;
  int member = 0;

  COUT ( "\n=============Signal Mask==========" )

    if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask ) != 0)
      {
        ACE_ERROR ((LM_ERROR,
                    "Error:(%P | %t):%p\n",
                    "ACE_OS::pthread_sigmask failed"));
      }
    else  for (int i = 1 ; i < 1000; i++)
      {
        member = sigismember (&mask,i);

        COUT ( "\nSig " )
          COUT ( i )
          COUT ( " is " )
          COUT (member )

          if (member == -1)
            {
              break ;
            }
      }
#endif
  return 0;
}


#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Asynch_Acceptor<Receiver>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Asynch_Acceptor<Receiver>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -