📄 connectionbase.cxx
字号:
#if defined(HAVE_CONFIG_H)
#include "resip/stack/config.hxx"
#endif
#include "rutil/Logger.hxx"
#include "resip/stack/ConnectionBase.hxx"
#include "resip/stack/SipMessage.hxx"
#include "resip/stack/Security.hxx"
#include "resip/stack/TlsConnection.hxx"
#include "rutil/WinLeakCheck.hxx"
#ifdef USE_SIGCOMP
#include <osc/Stack.h>
#include <osc/TcpStream.h>
#include <osc/SigcompMessage.h>
#include <osc/StateChanges.h>
#endif
using namespace resip;
#define RESIPROCATE_SUBSYSTEM Subsystem::TRANSPORT
char
ConnectionBase::connectionStates[ConnectionBase::MAX][32] = { "NewMessage", "ReadingHeaders", "PartialBody" };
ConnectionBase::ConnectionBase()
: mSendPos(0),
mWho(),
mFailureReason(TransportFailure::None),
mCompression(Compression::Disabled),
#ifdef USE_SIGCOMP
mSigcompStack(0),
mSigcompFramer(0),
#endif
mSendingTransmissionFormat(Unknown),
mReceivingTransmissionFormat(Unknown),
mMessage(0),
mBuffer(0),
mBufferPos(0),
mBufferSize(0),
mLastUsed(0),
mConnState(NewMessage)
{
DebugLog (<< "ConnectionBase::ConnectionBase, no params: " << this);
}
ConnectionBase::ConnectionBase(const Tuple& who, Compression &compression)
: mSendPos(0),
mWho(who),
mFailureReason(TransportFailure::None),
mCompression(compression),
#ifdef USE_SIGCOMP
mSigcompStack(0),
mSigcompFramer(0),
#endif
mSendingTransmissionFormat(Unknown),
mReceivingTransmissionFormat(Unknown),
mMessage(0),
mBuffer(0),
mBufferPos(0),
mBufferSize(0),
mLastUsed(Timer::getTimeMs()),
mConnState(NewMessage)
{
DebugLog (<< "ConnectionBase::ConnectionBase, who: " << mWho << " " << this);
#ifdef USE_SIGCOMP
if (mCompression.isEnabled())
{
DebugLog (<< "Compression enabled for connection: " << this);
mSigcompStack = new osc::Stack(mCompression.getStateHandler());
mCompression.addCompressorsToStack(mSigcompStack);
}
else
{
DebugLog (<< "Compression disabled for connection: " << this);
}
#else
DebugLog (<< "No compression library available: " << this);
#endif
}
ConnectionBase::~ConnectionBase()
{
if (mWho.transport)
{
mWho.transport->connectionTerminated(getId());
}
while (!mOutstandingSends.empty())
{
SendData* sendData = mOutstandingSends.front();
mWho.transport->fail(sendData->transactionId, mFailureReason);
delete sendData;
mOutstandingSends.pop_front();
}
DebugLog (<< "ConnectionBase::~ConnectionBase " << this);
delete [] mBuffer;
delete mMessage;
#ifdef USE_SIGCOMP
delete mSigcompStack;
#endif
}
ConnectionId
ConnectionBase::getId() const
{
return mWho.connectionId;
}
void
ConnectionBase::preparseNewBytes(int bytesRead, Fifo<TransactionMessage>& fifo)
{
assert(mWho.transport);
DebugLog(<< "In State: " << connectionStates[mConnState]);
//getConnectionManager().touch(this); -- !dcm!
start: // If there is an overhang come back here, effectively recursing
switch(mConnState)
{
case NewMessage:
{
if (strncmp(mBuffer + mBufferPos, Symbols::CRLFCRLF, 4) == 0)
{
StackLog(<<"Throwing away incoming firewall keep-alive");
mBufferPos += 4;
bytesRead -= 4;
if (bytesRead)
{
goto start;
}
else
{
delete [] mBuffer;
mBuffer = 0;
return;
}
}
assert(mWho.transport);
mMessage = new SipMessage(mWho.transport);
DebugLog(<< "ConnectionBase::process setting source " << mWho);
mMessage->setSource(mWho);
mMessage->setTlsDomain(mWho.transport->tlsDomain());
// Set TlsPeerName if message is from TlsConnection
TlsConnection *tlsConnection = dynamic_cast<TlsConnection *>(this);
if(tlsConnection)
{
mMessage->setTlsPeerNames(tlsConnection->getPeerNames());
}
mMsgHeaderScanner.prepareForMessage(mMessage);
// Fall through to the next case.
}
case ReadingHeaders:
{
unsigned int chunkLength = mBufferPos + bytesRead;
char *unprocessedCharPtr;
MsgHeaderScanner::ScanChunkResult scanChunkResult =
mMsgHeaderScanner.scanChunk(mBuffer,
chunkLength,
&unprocessedCharPtr);
if (scanChunkResult == MsgHeaderScanner::scrError)
{
//.jacob. Not a terribly informative warning.
WarningLog(<< "Discarding preparse!");
delete [] mBuffer;
mBuffer = 0;
delete mMessage;
mMessage = 0;
//.jacob. Shouldn't the state also be set here?
delete this;
return;
}
mMessage->addBuffer(mBuffer);
unsigned int numUnprocessedChars =
(mBuffer + chunkLength) - unprocessedCharPtr;
if (scanChunkResult == MsgHeaderScanner::scrNextChunk)
{
// Message header is incomplete...
if (numUnprocessedChars == 0)
{
// ...but the chunk is completely processed.
//.jacob. I've discarded the "assigned" concept.
//DebugLog(<< "Data assigned, not fragmented, not complete");
mBuffer = MsgHeaderScanner::allocateBuffer(ChunkSize);
mBufferPos = 0;
mBufferSize = ChunkSize;
}
else
{
// ...but some of the chunk must be shifted into the next one.
size_t size = numUnprocessedChars*3/2;
if (size < ConnectionBase::ChunkSize)
{
size = ConnectionBase::ChunkSize;
}
char* newBuffer = MsgHeaderScanner::allocateBuffer(size);
memcpy(newBuffer, unprocessedCharPtr, numUnprocessedChars);
mBuffer = newBuffer;
mBufferPos = numUnprocessedChars;
mBufferSize = size;
}
mConnState = ReadingHeaders;
}
else
{
size_t contentLength = 0;
try
{
// The message header is complete.
contentLength=mMessage->header(h_ContentLength).value();
}
catch(resip::ParseBuffer::Exception& e)
{
WarningLog(<<"Malformed Content-Length in connection-based transport"
". Not much we can do to fix this. " << e);
// !bwc! Bad Content-Length. We are hosed.
delete mMessage;
mMessage = 0;
mBuffer = 0;
// !bwc! mMessage just took ownership of mBuffer, so we don't
// delete it here. We do zero it though, for completeness.
//.jacob. Shouldn't the state also be set here?
delete this;
return;
}
if (numUnprocessedChars < contentLength)
{
// The message body is incomplete.
DebugLog(<< "partial body received");
char* newBuffer = MsgHeaderScanner::allocateBuffer(contentLength);
memcpy(newBuffer, unprocessedCharPtr, numUnprocessedChars);
mBufferPos = numUnprocessedChars;
mBufferSize = contentLength;
mBuffer = newBuffer;
mConnState = PartialBody;
}
else
{
// The message body is complete.
mMessage->setBody(unprocessedCharPtr, contentLength);
if (!transport()->basicCheck(*mMessage))
{
delete mMessage;
mMessage = 0;
}
else
{
Transport::stampReceived(mMessage);
DebugLog(<< "##Connection: " << *this << " received: " << *mMessage);
fifo.add(mMessage);
mMessage = 0;
}
int overHang = numUnprocessedChars - contentLength;
mConnState = NewMessage;
mBuffer = 0;
if (overHang > 0)
{
// The next message has been partially read.
size_t size = overHang*3/2;
if (size < ConnectionBase::ChunkSize)
{
size = ConnectionBase::ChunkSize;
}
char* newBuffer = MsgHeaderScanner::allocateBuffer(size);
memcpy(newBuffer,
unprocessedCharPtr + contentLength,
overHang);
mBuffer = newBuffer;
mBufferPos = 0;
mBufferSize = size;
DebugLog (<< "Extra bytes after message: " << overHang);
DebugLog (<< Data(mBuffer, overHang));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -