📄 network.cpp
字号:
// network.cpp - written and placed in the public domain by Wei Dai
#include "pch.h"
#include "network.h"
#include "wait.h"
#define CRYPTOPP_TRACE_NETWORK 0
NAMESPACE_BEGIN(CryptoPP)
#ifdef HIGHRES_TIMER_AVAILABLE
lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
{
if (!m_maxBytesPerSecond)
return ULONG_MAX;
double curTime = GetCurTimeAndCleanUp();
lword total = 0;
for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
total += m_ops[i].second;
return SaturatingSubtract(m_maxBytesPerSecond, total);
}
double LimitedBandwidth::TimeToNextTransceive()
{
if (!m_maxBytesPerSecond)
return 0;
if (!m_nextTransceiveTime)
ComputeNextTransceiveTime();
return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
}
void LimitedBandwidth::NoteTransceive(lword size)
{
if (m_maxBytesPerSecond)
{
double curTime = GetCurTimeAndCleanUp();
m_ops.push_back(std::make_pair(curTime, size));
m_nextTransceiveTime = 0;
}
}
void LimitedBandwidth::ComputeNextTransceiveTime()
{
double curTime = GetCurTimeAndCleanUp();
lword total = 0;
for (unsigned int i=0; i!=m_ops.size(); ++i)
total += m_ops[i].second;
m_nextTransceiveTime =
(total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
}
double LimitedBandwidth::GetCurTimeAndCleanUp()
{
if (!m_maxBytesPerSecond)
return 0;
double curTime = m_timer.ElapsedTimeAsDouble();
while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
m_ops.pop_front();
return curTime;
}
void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
{
double nextTransceiveTime = TimeToNextTransceive();
if (nextTransceiveTime)
container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
}
// *************************************************************
size_t NonblockingSource::GeneralPump2(
lword& byteCount, bool blockingOutput,
unsigned long maxTime, bool checkDelimiter, byte delimiter)
{
m_blockedBySpeedLimit = false;
if (!GetMaxBytesPerSecond())
{
size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
m_doPumpBlocked = (ret != 0);
return ret;
}
bool forever = (maxTime == INFINITE_TIME);
unsigned long timeToGo = maxTime;
Timer timer(Timer::MILLISECONDS, forever);
lword maxSize = byteCount;
byteCount = 0;
timer.StartTimer();
while (true)
{
lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
if (curMaxSize || m_doPumpBlocked)
{
if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
m_doPumpBlocked = (ret != 0);
if (curMaxSize)
{
NoteTransceive(curMaxSize);
byteCount += curMaxSize;
}
if (ret)
return ret;
}
if (maxSize != ULONG_MAX && byteCount >= maxSize)
break;
if (!forever)
{
timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
if (!timeToGo)
break;
}
double waitTime = TimeToNextTransceive();
if (!forever && waitTime > timeToGo)
{
m_blockedBySpeedLimit = true;
break;
}
WaitObjectContainer container;
LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
container.Wait((unsigned long)waitTime);
}
return 0;
}
size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
{
if (messageCount == 0)
return 0;
messageCount = 0;
lword byteCount;
do {
byteCount = LWORD_MAX;
RETURN_IF_NONZERO(Pump2(byteCount, blocking));
} while(byteCount == LWORD_MAX);
if (!m_messageEndSent && SourceExhausted())
{
RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
m_messageEndSent = true;
messageCount = 1;
}
return 0;
}
lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
{
m_blockedBySpeedLimit = false;
size_t curBufSize = GetCurrentBufferSize();
if (curBufSize <= targetSize && (targetSize || !EofPending()))
return 0;
if (!GetMaxBytesPerSecond())
return DoFlush(maxTime, targetSize);
bool forever = (maxTime == INFINITE_TIME);
unsigned long timeToGo = maxTime;
Timer timer(Timer::MILLISECONDS, forever);
lword totalFlushed = 0;
timer.StartTimer();
while (true)
{
size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
if (flushSize || EofPending())
{
if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
if (ret)
{
NoteTransceive(ret);
curBufSize -= ret;
totalFlushed += ret;
}
}
if (curBufSize <= targetSize && (targetSize || !EofPending()))
break;
if (!forever)
{
timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
if (!timeToGo)
break;
}
double waitTime = TimeToNextTransceive();
if (!forever && waitTime > timeToGo)
{
m_blockedBySpeedLimit = true;
break;
}
WaitObjectContainer container;
LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
container.Wait((unsigned long)waitTime);
}
return totalFlushed;
}
bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
{
TimedFlush(blocking ? INFINITE_TIME : 0);
return hardFlush && (!!GetCurrentBufferSize() || EofPending());
}
// *************************************************************
NetworkSource::NetworkSource(BufferedTransformation *attachment)
: NonblockingSource(attachment), m_buf(1024*16)
, m_waitingForResult(false), m_outputBlocked(false)
, m_dataBegin(0), m_dataEnd(0)
{
}
unsigned int NetworkSource::GetMaxWaitObjectCount() const
{
return LimitedBandwidth::GetMaxWaitObjectCount()
+ GetReceiver().GetMaxWaitObjectCount()
+ AttachedTransformation()->GetMaxWaitObjectCount();
}
void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
{
if (BlockedBySpeedLimit())
LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
else if (!m_outputBlocked)
{
if (m_dataBegin == m_dataEnd)
AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
else
container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
}
AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
}
size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
{
NetworkReceiver &receiver = AccessReceiver();
lword maxSize = byteCount;
byteCount = 0;
bool forever = maxTime == INFINITE_TIME;
Timer timer(Timer::MILLISECONDS, forever);
BufferedTransformation *t = AttachedTransformation();
if (m_outputBlocked)
goto DoOutput;
while (true)
{
if (m_dataBegin == m_dataEnd)
{
if (receiver.EofReceived())
break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -