📄 htbp_channel.cpp
字号:
/* -*- C++ -*- */
//=============================================================================
/**
* @file HTBP_Channel.cpp
*
* $Id: HTBP_Channel.cpp 82543 2008-08-06 18:21:48Z parsons $
*
* @author Phil Mesnier, Priyanka Gontla
*/
//=============================================================================
#include "HTBP_Channel.h"
#if !defined (__ACE_INLINE__)
#include "HTBP_Channel.inl"
#endif
#include "HTBP_Session.h"
#include "HTBP_Filter_Factory.h"
#include "ace/Message_Block.h"
#include "ace/Reactor.h"
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
// Initialization and termination methods.
/// Constructor.
ACE::HTBP::Channel::Channel (ACE::HTBP::Session *s)
: filter_ (0),
session_ (s),
ace_stream_ (),
notifier_ (0),
leftovers_ (1001),
data_len_ (0),
data_consumed_ (0),
state_ (Init),
error_buffer_ (0)
{
ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));
this->filter_ = get_filter ();
this->request_count_ = static_cast<unsigned long> (ACE_OS::time());
}
/// Constructor, takes ownership of the supplied stream
ACE::HTBP::Channel::Channel (ACE_SOCK_Stream &s)
: filter_ (0),
session_ (0),
ace_stream_ (s.get_handle()),
notifier_ (0),
leftovers_ (1001),
data_len_ (0),
data_consumed_ (0),
state_ (Init),
error_buffer_ (0)
{
filter_ = get_filter ();
this->request_count_ = static_cast<unsigned long> (ACE_OS::time());
}
ACE::HTBP::Channel::Channel (ACE_HANDLE h)
: filter_ (0),
session_ (0),
ace_stream_ (h),
notifier_ (0),
leftovers_ (1001),
data_len_ (0),
data_consumed_ (0),
state_ (Init),
error_buffer_ (0)
{
filter_ = get_filter ();
this->request_count_ = static_cast<unsigned long> (ACE_OS::time());
}
/// Destructor.
ACE::HTBP::Channel::~Channel (void)
{
delete this->notifier_;
delete this->filter_;
}
/// Dump the state of an object.
void
ACE::HTBP::Channel::dump (void) const
{
}
unsigned long
ACE::HTBP::Channel::request_count (void)
{
return this->request_count_++;
}
void
ACE::HTBP::Channel::register_notifier (ACE_Reactor *r)
{
if (r == 0)
return;
if (this->notifier_ == 0)
{
ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));
}
else
{
if (notifier_->get_handle() == ACE_INVALID_HANDLE)
{
delete this->notifier_;
ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this));
}
}
r->register_handler(notifier_,ACE_Event_Handler::READ_MASK);
}
ACE::HTBP::Notifier *
ACE::HTBP::Channel::notifier (void)
{
return this->notifier_;
}
ACE_HANDLE
ACE::HTBP::Channel::get_handle (void) const
{
return this->ace_stream_.get_handle ();
}
void
ACE::HTBP::Channel::data_consumed (size_t n)
{
this->data_consumed_ += n;
if (this->data_consumed_ == this->data_len_)
{
this->filter_->recv_data_trailer(this);
this->filter_->send_ack(this);
}
}
int
ACE::HTBP::Channel::load_buffer (void)
{
this->leftovers_.crunch();
if (this->state() == Detached ||
this->state() == Ack_Sent)
{
this->data_len_ = 0;
this->data_consumed_ = 0;
}
ssize_t nread = 0;
errno = 0;
#if 0
if (this->session_ &&
(this->session_->sock_flags() & ACE_NONBLOCK == ACE_NONBLOCK))
#endif
{
nread =
ACE::handle_read_ready (this->ace_stream().get_handle(),
&ACE_Time_Value::zero);
if (nread == -1 && errno == ETIME)
errno = EWOULDBLOCK;
}
if (nread != -1)
nread = this->ace_stream().recv (this->leftovers_.wr_ptr(),
this->leftovers_.space()-1);
if (nread < 1)
{
if (nread == 0 || (errno != EWOULDBLOCK && errno != EAGAIN))
{
this->state_ = Closed;
#if 0
ACE_ERROR ((LM_ERROR,
"load_buffer[%d] %p\n",
this->ace_stream_.get_handle(),"recv"));
#endif
}
return nread;
}
this->leftovers_.wr_ptr(nread);
*this->leftovers_.wr_ptr() = '\0';
return nread;
}
int
ACE::HTBP::Channel::flush_buffer (void)
{
if (this->session_)
return this->session_->flush_outbound_queue();
return 0;
}
int
ACE::HTBP::Channel::send_ack (void)
{
return this->filter_->send_ack(this);
}
int
ACE::HTBP::Channel::recv_ack (void)
{
if (load_buffer() == -1)
return -1;
return this->filter_->recv_ack(this);
}
void
ACE::HTBP::Channel::state (ACE::HTBP::Channel::State s)
{
if (s == Detached)
{
this->session_->detach(this);
this->session_ = 0;
}
this->state_ = s;
}
int
ACE::HTBP::Channel::consume_error (void)
{
if (error_buffer_ == 0)
{
ACE_NEW_RETURN (error_buffer_,
ACE_Message_Block (this->data_len_ + 1),
0);
}
ssize_t result = 0;
size_t n = error_buffer_->size();
char *buf = error_buffer_->wr_ptr();
if (this->leftovers_.length() > 0)
{
result = ACE_MIN (n,this->leftovers_.length());
ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);
this->leftovers_.rd_ptr(result);
buf += result;
}
if (result < (ssize_t)n &&
result < (ssize_t)data_len_)
{
n -= result;
result += this->ace_stream_.recv(buf, n);
}
if (result > 0)
{
this->error_buffer_->wr_ptr(result);
this->data_consumed_ += result;
if (this->data_consumed_ == this->data_len_)
{
*this->error_buffer_->wr_ptr() = '\0';
if (ACE::debug())
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("ACE::HTBP::Channel::consume_error ")
ACE_TEXT("Received entire error buffer: \n%s\n"),
this->error_buffer_->rd_ptr()));
delete error_buffer_;
error_buffer_ = 0;
return 1;
}
}
return 0;
}
//---------------------------------------------------------------------------
// = I/O functions.
/// The ACE::HTBP::Channel is a sibling of the ACE_SOCK_IO class, rather than a
/// decendant. This is due to the requirement to wrap all messages with
/// an HTTP request or reply wrapper, and to send application data in only
/// one direction on one stream.
int
ACE::HTBP::Channel::pre_recv(void)
{
if (ACE::debug())
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
ACE_TEXT ("in initial state = %d\n"),state_));
if (this->state_ == Init ||
this->state_ == Detached ||
this->state_ == Header_Pending ||
this->state_ == Ack_Sent)
{
if (this->load_buffer() == -1 && this->leftovers_.length() == 0)
{
if (errno != EWOULDBLOCK)
this->state_ = Closed;
if (ACE::debug())
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
ACE_TEXT ("pre_recv returning -1, state = %d, %p\n"),
state_, ACE_TEXT("load_buffer()")));
return -1;
}
if (this->filter_->recv_data_header(this) == -1)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
ACE_TEXT ("recv_data_header failed, %p\n"),
ACE_TEXT("pre_recv")));
}
switch (this->state_)
{
case Data_Queued:
case Ack_Sent:
case Ready:
return 0;
case Header_Pending:
errno = EWOULDBLOCK;
return -1;
default:
if (ACE::debug())
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ")
ACE_TEXT("channel[%d] state = %d, %p\n"),
this->get_handle(),
this->state_,
ACE_TEXT("pre_recv")));
}
return -1;
}
/// Recv an <n> byte buffer from the connected socket.
ssize_t
ACE::HTBP::Channel::recv (void *buf,
size_t n,
int flags,
const ACE_Time_Value *timeout)
{
ssize_t result = 0;
if (this->pre_recv() == -1 && this->leftovers_.length() == 0)
return -1;
if (this->leftovers_.length() > 0)
{
result = ACE_MIN (n,this->leftovers_.length());
ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);
this->leftovers_.rd_ptr(result);
buf = (char *)buf + result;
}
if (result < (ssize_t)n &&
result < (ssize_t)data_len_)
{
n -= result;
result += this->ace_stream_.recv(buf, n, flags, timeout);
}
if (result > 0)
data_consumed((size_t)result);
return result;
}
/// Recv an <n> byte buffer from the connected socket.
ssize_t
ACE::HTBP::Channel::recv (void *buf,
size_t n,
const ACE_Time_Value *timeout)
{
ssize_t result = 0;
if (this->pre_recv() == -1)
return -1;
result = 0;
if (this->leftovers_.length() > 0)
{
result = ACE_MIN (n,this->leftovers_.length());
ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result);
this->leftovers_.rd_ptr(result);
buf = (char *)buf + result;
}
if ((size_t)result < n && (size_t)result < this->data_len())
{
n -= result;
result += this->ace_stream_.recv(buf, n, timeout);
}
if (result > 0)
this->data_consumed((size_t)result);
return result;
}
/// Recv an <iovec> of size <n> from the connected socket.
ssize_t
ACE::HTBP::Channel::recvv (iovec iov[],
int iovcnt,
const ACE_Time_Value *timeout)
{
ssize_t result = 0;
if (this->pre_recv() == -1)
return -1;
if (this->leftovers_.length())
{
int ndx = 0;
iovec *iov2 = new iovec[iovcnt];
for (int i = 0; i < iovcnt; i++)
{
size_t n = ACE_MIN ((size_t) iov[i].iov_len ,
(size_t) this->leftovers_.length());
if (n > 0)
{
ACE_OS::memcpy (iov[i].iov_base,this->leftovers_.rd_ptr(), n);
this->leftovers_.rd_ptr(n);
result += n;
}
if (n < (size_t) iov[i].iov_len)
{
iov2[ndx].iov_len = iov[i].iov_len - n;
iov2[ndx].iov_base = (char *)iov[i].iov_base + n;
ndx++;
}
}
if (ndx > 0)
result += this->ace_stream_.recvv(iov2,ndx,timeout);
delete [] iov2;
}
else
result = this->ace_stream_.recvv(iov,iovcnt,timeout);
if (result > 0)
this->data_consumed((size_t)result);
return result;
}
ssize_t
ACE::HTBP::Channel::recvv (iovec *io_vec,
const ACE_Time_Value *timeout)
{
ssize_t result = 0;
if (this->pre_recv() == -1)
return -1;
if (ACE::debug())
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("ACE::HTBP::Channel::recvv ")
ACE_TEXT("recvv, leftover len = %d\n"),
this->leftovers_.length()));
if (this->leftovers_.length())
{
io_vec->iov_base = 0;
io_vec->iov_len = 0;
ACE_NEW_RETURN (io_vec->iov_base,
char[this->leftovers_.length()],-1);
io_vec->iov_len = this->leftovers_.length();
ACE_OS::memcpy (io_vec->iov_base,
this->leftovers_.rd_ptr(),
io_vec->iov_len);
result = io_vec->iov_len;
this->leftovers_.length(0);
}
else
result = this->ace_stream_.recvv(io_vec,timeout);
if (result > 0)
this->data_consumed((size_t)result);
return result;
}
ssize_t
ACE::HTBP::Channel::send (const void *buf,
size_t n,
int flags,
const ACE_Time_Value *timeout)
{
ssize_t result = 0;
if (this->filter_->send_data_header(n,this) == -1)
return -1;
result = this->ace_stream_.send(buf,n,flags,timeout);
if (result == -1)
return -1;
if (this->filter_->send_data_trailer(this) == -1)
return -1;
return result;
}
ssize_t
ACE::HTBP::Channel::send (const void *buf,
size_t n,
const ACE_Time_Value *timeout)
{
ssize_t result = 0;
if (this->filter_ == 0)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::send: filter ")
ACE_TEXT ("is null\n")),-1);
if (this->filter_->send_data_header(n,this) == -1)
return -1;
result = this->ace_stream_.send (buf,n,timeout);
if (result == -1)
return -1;
if (this->filter_->send_data_trailer(this) == -1)
return -1;
return result;
}
ssize_t
ACE::HTBP::Channel::sendv (const iovec iov[],
int iovcnt,
const ACE_Time_Value *timeout)
{
if (this->ace_stream_.get_handle() == ACE_INVALID_HANDLE)
this->session_->inbound();
ssize_t result = 0;
size_t n = 0;
for (int i = 0; i < iovcnt; n += iov[i++].iov_len)
{
// No action.
}
if (this->filter_->send_data_header(n,this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
ACE_TEXT("%p\n"),
ACE_TEXT("send_data_header")),-1);
result = this->ace_stream_.sendv (iov,iovcnt,timeout);
if (result == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
ACE_TEXT("%p\n"),
ACE_TEXT("ace_stream_.sendv")),-1);
if (this->filter_->send_data_trailer(this) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ")
ACE_TEXT("%p\n"),
ACE_TEXT("send_data_trailer\n")),-1);
return result;
}
int
ACE::HTBP::Channel::enable (int value) const
{
this->ace_stream_.enable(value);
return 0; //this->ace_stream_.enable(value);
}
int
ACE::HTBP::Channel::disable (int value) const
{
this->ace_stream_.disable(value);
return 0;//this->ace_stream_.disable(value);
}
ACE::HTBP::Filter *
ACE::HTBP::Channel::get_filter ()
{
ACE::HTBP::Filter_Factory *factory = 0;
// @todo Should I be throwing an exception here if
// memory is not allocated right ?
ACE_NEW_RETURN (factory,
ACE::HTBP::Filter_Factory,
0);
int inside = (this->session_ != 0);
return factory->get_filter (inside);
}
ACE_END_VERSIONED_NAMESPACE_DECL
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -