⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 htbp_channel.cpp

📁 ACE自适配通信环境(ADAPTIVE Communication Environment)是可以自由使用、开放源码的面向对象(OO)框架(Framework)
💻 CPP
字号:
/* -*- C++ -*- *///=============================================================================/** *  @file    HTBP_Channel.cpp * *  HTBP_Channel.cpp,v 1.4 2005/11/27 12:11:35 ossama Exp * *  @author Phil Mesnier, Priyanka Gontla *///=============================================================================#include "HTBP_Channel.h"#if !defined (__ACE_INLINE__)#include "HTBP_Channel.inl"#endif#include "HTBP_Session.h"#include "HTBP_Filter_Factory.h"#include "ace/Message_Block.h"#include "ace/Reactor.h"ACE_BEGIN_VERSIONED_NAMESPACE_DECL// Initialization and termination methods./// Constructor.ACE::HTBP::Channel::Channel (ACE::HTBP::Session *s)  : filter_ (0),    session_ (s),    ace_stream_ (),    notifier_ (0),    leftovers_ (1000),    data_len_ (0),    data_consumed_ (0),    state_ (Init),    error_buffer_ (0){  ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));  this->filter_ = get_filter ();  this->request_count_ = static_cast<unsigned long> (ACE_OS::time());}/// Constructor, takes ownership of the supplied streamACE::HTBP::Channel::Channel (ACE_SOCK_Stream &s)  : filter_ (0),    session_ (0),    ace_stream_ (s.get_handle()),    notifier_ (0),    leftovers_ (1000),    data_len_ (0),    data_consumed_ (0),    state_ (Init),    error_buffer_ (0){  filter_ = get_filter ();  this->request_count_ = static_cast<unsigned long> (ACE_OS::time());}ACE::HTBP::Channel::Channel (ACE_HANDLE h)  : filter_ (0),    session_ (0),    ace_stream_ (h),    notifier_ (0),    leftovers_ (1000),    data_len_ (0),    data_consumed_ (0),    state_ (Init),    error_buffer_ (0){  filter_ = get_filter ();  this->request_count_ = static_cast<unsigned long> (ACE_OS::time());}/// Destructor.ACE::HTBP::Channel::~Channel (void){  delete this->filter_;  delete this->notifier_;}  /// Dump the state of an object.voidACE::HTBP::Channel::dump (void) const{}unsigned longACE::HTBP::Channel::request_count (void){  return this->request_count_++;}voidACE::HTBP::Channel::register_notifier (ACE_Reactor *r){  if (r == 0)    return;  if (this->notifier_ == 0)    {      ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));    }  else    {      if (notifier_->get_handle() == ACE_INVALID_HANDLE)        {          delete this->notifier_;          ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));        }    }  r->register_handler(notifier_,ACE_Event_Handler::READ_MASK);}ACE::HTBP::Notifier *ACE::HTBP::Channel::notifier (void){  return this->notifier_;}ACE_HANDLEACE::HTBP::Channel::get_handle (void) const{  return this->ace_stream_.get_handle ();}voidACE::HTBP::Channel::data_consumed (size_t n){  this->data_consumed_ += n;  if (this->data_consumed_ == this->data_len_)    {      this->filter_->recv_data_trailer(this);      this->filter_->send_ack(this);    }}intACE::HTBP::Channel::load_buffer (void){  this->leftovers_.crunch();  if (this->state() == Detached ||      this->state() == Ack_Sent)    {      this->data_len_ = 0;      this->data_consumed_ = 0;    }  ssize_t nread = 0;  errno = 0;#if 0  if (this->session_ &&      (this->session_->sock_flags() & ACE_NONBLOCK == ACE_NONBLOCK))#endif    {      nread =        ACE::handle_read_ready (this->ace_stream().get_handle(),                                &ACE_Time_Value::zero);      if (nread == -1 && errno == ETIME)        errno = EWOULDBLOCK;    }  if (nread != -1)    nread = this->ace_stream().recv (this->leftovers_.wr_ptr(),                                     this->leftovers_.space());  if (nread < 1)    {      if (nread == 0 || (errno != EWOULDBLOCK && errno != EAGAIN))        {          this->state_ = Closed;#if 0          ACE_ERROR ((LM_ERROR,                      "load_buffer[%d] %p\n",                      this->ace_stream_.get_handle(),"recv"));#endif        }      return nread;    }  this->leftovers_.wr_ptr(nread);  *this->leftovers_.wr_ptr() = '\0';#if 0  ACE_DEBUG ((LM_DEBUG,"load_buffer[%d] received %d \n",              this->ace_stream_.get_handle(),leftovers_.length()));  ACE_HEX_DUMP ((LM_DEBUG,leftovers_.rd_ptr(),leftovers_.length()));#endif  return nread;}intACE::HTBP::Channel::flush_buffer (void){  if (this->session_)    return this->session_->flush_outbound_queue();  return 0;}intACE::HTBP::Channel::send_ack (void){  return this->filter_->send_ack(this);}intACE::HTBP::Channel::recv_ack (void){  if (load_buffer() == -1)    return -1;  return this->filter_->recv_ack(this);}voidACE::HTBP::Channel::state (ACE::HTBP::Channel::State s){  if (s == Detached)    {      this->session_->detach(this);      this->session_ = 0;    }  this->state_ = s;}intACE::HTBP::Channel::consume_error (void){  if (error_buffer_ == 0)    {      ACE_NEW_RETURN (error_buffer_,                      ACE_Message_Block (this->data_len_ + 1),                      0);    }  ssize_t result = 0;  size_t n = error_buffer_->size();  char *buf = error_buffer_->wr_ptr();  if (this->leftovers_.length() > 0)    {      result = ACE_MIN (n,this->leftovers_.length());      ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);      this->leftovers_.rd_ptr(result);      buf += result;    }  if (result < (ssize_t)n &&      result < (ssize_t)data_len_)    {      n -= result;      result += this->ace_stream_.recv(buf, n);    }  if (result > 0)    {      this->error_buffer_->wr_ptr(result);      this->data_consumed_ += result;      if (this->data_consumed_ == this->data_len_)        {          *this->error_buffer_->wr_ptr() = '\0';          ACE_DEBUG ((LM_DEBUG,"Received entire error buffer: \n%s\n",                      this->error_buffer_->rd_ptr()));          delete error_buffer_;          error_buffer_ = 0;          return 1;        }    }  return 0;}//---------------------------------------------------------------------------// = I/O functions./// The ACE::HTBP::Channel is a sibling of the ACE_SOCK_IO class, rather than a/// decendant. This is due to the requirement to wrap all messages with/// an HTTP request or reply wrapper, and to send application data in only/// one direction on one stream.intACE::HTBP::Channel::pre_recv(void){  if (this->state_ == Init ||      this->state_ == Detached ||      this->state_ == Header_Pending ||      this->state_ == Ack_Sent)    {      if (this->load_buffer() == -1 && this->leftovers_.length() == 0)        {          if (errno != EWOULDBLOCK)            this->state_ = Closed;          ACE_DEBUG ((LM_DEBUG,"pre_recv returning -1, state = %d\n",state_));          return -1;        }      if (this->filter_->recv_data_header(this) == -1)        ACE_DEBUG ((LM_DEBUG,"recv_data_header failed, %p\n","pre_recv"));    }  switch (this->state_)    {    case Data_Queued:    case Ack_Sent:    case Ready:      return 0;    case Header_Pending:      errno = EWOULDBLOCK;      return -1;    default:      ACE_DEBUG ((LM_DEBUG,"channel[%d] state = %d, %p\n",                  this->get_handle(),                  this->state_,"pre_recv"));    }  return -1;}/// Recv an <n> byte buffer from the connected socket.ssize_tACE::HTBP::Channel::recv (void *buf,                 size_t n,                 int flags,                 const ACE_Time_Value *timeout){  ssize_t result = 0;  if (this->pre_recv() == -1 && this->leftovers_.length() == 0)    return -1;  if (this->leftovers_.length() > 0)    {      result = ACE_MIN (n,this->leftovers_.length());      ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);      this->leftovers_.rd_ptr(result);      buf = (char *)buf + result;    }  if (result < (ssize_t)n &&      result < (ssize_t)data_len_)    {      n -= result;      result += this->ace_stream_.recv(buf, n, flags, timeout);    }  if (result > 0)    data_consumed((size_t)result);  return result;}  /// Recv an <n> byte buffer from the connected socket.ssize_tACE::HTBP::Channel::recv (void *buf,                 size_t n,                 const ACE_Time_Value *timeout){  ssize_t result = 0;  if (this->pre_recv() == -1)    return -1;  result = 0;  if (this->leftovers_.length() > 0)    {      result = ACE_MIN (n,this->leftovers_.length());      ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);      this->leftovers_.rd_ptr(result);      buf = (char *)buf + result;    }  if ((size_t)result < n && (size_t)result < this->data_len())    {      n -= result;      result += this->ace_stream_.recv(buf, n, timeout);    }  if (result > 0)    this->data_consumed((size_t)result);  return result;}  /// Recv an <iovec> of size <n> from the connected socket.ssize_tACE::HTBP::Channel::recvv (iovec iov[],                  int iovcnt,                  const ACE_Time_Value *timeout){  ssize_t result = 0;  if (this->pre_recv() == -1)    return -1;  if (this->leftovers_.length())    {      int ndx = 0;      iovec *iov2 = new iovec[iovcnt];      for (int i = 0; i < iovcnt; i++)        {          size_t n = ACE_MIN ((size_t) iov[i].iov_len ,                              (size_t) this->leftovers_.length());          if (n > 0)            {              ACE_OS::memcpy (iov[i].iov_base,this->leftovers_.rd_ptr(), n);              this->leftovers_.rd_ptr(n);              result += n;            }          if (n < (size_t) iov[i].iov_len)            {              iov2[ndx].iov_len = iov[i].iov_len - n;              iov2[ndx].iov_base = (char *)iov[i].iov_base + n;              ndx++;            }        }      if (ndx > 0)        result += this->ace_stream_.recvv(iov2,ndx,timeout);      delete [] iov2;    }  else    result = this->ace_stream_.recvv(iov,iovcnt,timeout);  if (result > 0)    this->data_consumed((size_t)result);  return result;}ssize_tACE::HTBP::Channel::recvv (iovec *io_vec,                  const ACE_Time_Value *timeout){  ssize_t result = 0;  if (this->pre_recv() == -1)    return -1;  ACE_DEBUG ((LM_DEBUG,"recvv, leftover len = %d\n",              this->leftovers_.length()));  if (this->leftovers_.length())    {      io_vec->iov_base = 0;      io_vec->iov_len = 0;      ACE_NEW_RETURN (io_vec->iov_base,                      char[this->leftovers_.length()],-1);      io_vec->iov_len = this->leftovers_.length();      ACE_OS::memcpy (io_vec->iov_base,                      this->leftovers_.rd_ptr(),                      io_vec->iov_len);      result = io_vec->iov_len;      this->leftovers_.length(0);    }  else    result = this->ace_stream_.recvv(io_vec,timeout);  if (result > 0)    this->data_consumed((size_t)result);  return result;}ssize_tACE::HTBP::Channel::send (const void *buf,                 size_t n,                 int flags,                 const ACE_Time_Value *timeout){  ssize_t result = 0;  if (this->filter_->send_data_header(n,this) == -1)    return -1;  result = this->ace_stream_.send(buf,n,flags,timeout);  if (result == -1)    return -1;  if (this->filter_->send_data_trailer(this) == -1)    return -1;  return result;}ssize_tACE::HTBP::Channel::send (const void *buf,                  size_t n,                  const ACE_Time_Value *timeout){  ssize_t result = 0;  if (this->filter_ == 0)    ACE_ERROR_RETURN ((LM_DEBUG, "ACE::HTBP::Channel::send: filter is null\n"),-1);  if (this->filter_->send_data_header(n,this) == -1)    return -1;  result = this->ace_stream_.send (buf,n,timeout);  if (result == -1)    return -1;  if (this->filter_->send_data_trailer(this) == -1)    return -1;  return result;}ssize_tACE::HTBP::Channel::sendv (const iovec iov[],                  int iovcnt,                  const ACE_Time_Value *timeout){  if (this->ace_stream_.get_handle() == ACE_INVALID_HANDLE)    this->session_->inbound();  ssize_t result = 0;  size_t n = 0;  for (int i = 0; i < iovcnt; n += iov[i++].iov_len);  if (this->filter_->send_data_header(n,this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,"sendv, %p\n","send_data_header"),-1);  result = this->ace_stream_.sendv (iov,iovcnt,timeout);  if (result == -1)    ACE_ERROR_RETURN ((LM_ERROR,"sendv, %p\n","ace_stream_.sendv"),-1);  if (this->filter_->send_data_trailer(this) == -1)    ACE_ERROR_RETURN ((LM_ERROR,"sendv, %p\n","send_data_trailer\n"),-1);  return result;}intACE::HTBP::Channel::enable (int value) const{  this->ace_stream_.enable(value);  return 0; //this->ace_stream_.enable(value);}intACE::HTBP::Channel::disable (int value) const{  this->ace_stream_.disable(value);  return 0;//this->ace_stream_.disable(value);}ACE::HTBP::Filter *ACE::HTBP::Channel::get_filter (){  ACE::HTBP::Filter_Factory *factory = 0;  // @todo Should I be throwing an exception here if  // memory is not allocated right ?  ACE_NEW_RETURN (factory,                  ACE::HTBP::Filter_Factory,                  0);  int inside = (this->session_ != 0);  return factory->get_filter (inside);}ACE_END_VERSIONED_NAMESPACE_DECL

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -