📄 externalsort.cc
字号:
// Tools Library//// Copyright (C) 2004 Navel Ltd.//// This library is free software; you can redistribute it and/or// modify it under the terms of the GNU Lesser General Public// License as published by the Free Software Foundation; either// version 2.1 of the License, or (at your option) any later version.//// This library is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU// Lesser General Public License for more details.//// You should have received a copy of the GNU Lesser General Public// License along with this library; if not, write to the Free Software// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA//// Contact information:// Mailing address:// Marios Hadjieleftheriou// University of California, Riverside// Department of Computer Science// Surge Building, Room 310// Riverside, CA 92521//// Email:// marioh@cs.ucr.edu
#include <stdio.h>
#include <unistd.h>
#include <Tools.h>
#include "ExternalSort.h"
using namespace std;
Tools::ExternalSort::PQEntry::PQEntry(
SmartPointer<ISerializable>& r,
IObjectComparator* pComp,
SmartPointer<TemporaryFile>& f)
: m_spRecord(r), m_pComp(pComp), m_spFile(f)
{
}
bool Tools::ExternalSort::PQEntry::ascendingComparator::operator()(SmartPointer<PQEntry> x, SmartPointer<PQEntry> y) const
{
if (x->m_pComp != 0)
{
int ret = x->m_pComp->compare(
dynamic_cast<IObject*>(x->m_spRecord.get()),
dynamic_cast<IObject*>(y->m_spRecord.get()));
if (ret == 1) return true;
return false;
}
else
{
IComparable* pX = dynamic_cast<IComparable*>(x->m_spRecord.get());
IComparable* pY = dynamic_cast<IComparable*>(y->m_spRecord.get());
if (pX == 0 || pY == 0)
throw IllegalArgumentException("Tools::ExternalSort::PQEntry::ascendingComparator: argument is not comparable.");
if (*pX > *pY) return true;
return false;
}
}
Tools::ExternalSort::ExternalSort(IObjectStream& source, unsigned long bufferSize)
: m_pExternalSource(&source),
m_cMaxBufferSize(bufferSize),
m_bFitsInBuffer(false),
m_cNumberOfSortedRecords(0),
m_pNextRecord(0),
m_pComp(0)
{
//m_spTemplateRecord = SmartPointer<IObject>();
//m_spSortedFile = SmartPointer<TemporaryFile>();
#ifndef NDEBUG
m_cReturnedRecords = 0;
#endif
mergeRuns();
}
Tools::ExternalSort::ExternalSort(IObjectStream& source, IObjectComparator& comp, unsigned long bufferSize)
: m_pExternalSource(&source),
m_cMaxBufferSize(bufferSize),
m_bFitsInBuffer(false),
m_cNumberOfSortedRecords(0),
m_pNextRecord(0),
m_pComp(&comp)
{
#ifndef NDEBUG
m_cReturnedRecords = 0;
#endif
mergeRuns();
}
Tools::ExternalSort::~ExternalSort()
{
}
void Tools::ExternalSort::initializeRuns(deque<SmartPointer<TemporaryFile> >& runs)
{
bool bEOF = false;
while (! bEOF)
{
while (m_buffer.size() < m_cMaxBufferSize)
{
IObject* o = m_pExternalSource->getNext();
if (o == 0)
{
bEOF = true;
break;
}
SmartPointer<ISerializable> spO(dynamic_cast<ISerializable*>(o));
if (spO.get() == 0) throw IllegalStateException("Tools::ExternalSort::initializeRuns: object is not serializable.");
m_cNumberOfSortedRecords++;
#ifndef NDEBUG
if (m_cNumberOfSortedRecords % 1000000 == 0)
std::cerr << "Tools::ExternalSort::initializeRuns: loaded " << m_cNumberOfSortedRecords << std::endl;
#endif
if (m_spTemplateRecord.get() == 0)
m_spTemplateRecord = SmartPointer<IObject>(o->clone());
SmartPointer<TemporaryFile> tf;
m_buffer.push(SmartPointer<PQEntry>(new PQEntry(spO, m_pComp, tf)));
}
if (bEOF && runs.size() == 0)
{
#ifndef NDEBUG
std::cerr << "Tools::ExternalSort: Dataset fits in main memory buffer." << std::endl;
#endif
m_bFitsInBuffer = true;
break;
}
else if (m_buffer.empty())
{
break;
}
SmartPointer<TemporaryFile> tf(new TemporaryFile());
while (! m_buffer.empty())
{
SmartPointer<PQEntry> pqe = m_buffer.top(); m_buffer.pop();
tf->storeNextObject(pqe->m_spRecord.get());
}
tf->rewindForReading();
runs.push_back(tf);
}
}
void Tools::ExternalSort::mergeRuns()
{
deque<SmartPointer<TemporaryFile> > newruns;
deque<SmartPointer<TemporaryFile> > runs;
initializeRuns(runs);
if (m_bFitsInBuffer)
{
if (! m_buffer.empty())
{
SmartPointer<PQEntry> pqe = m_buffer.top(); m_buffer.pop();
m_pNextRecord = dynamic_cast<ISerializable*>(dynamic_cast<IObject*>(pqe->m_spRecord.get())->clone());
}
}
else
{
while (runs.size() > 1)
{
SmartPointer<TemporaryFile> output(new TemporaryFile());
priority_queue<SmartPointer<PQEntry>, vector<SmartPointer<PQEntry> >, PQEntry::ascendingComparator> buffer;
unsigned long cRun = 0, cMaxRun = 0;
while (buffer.size() < m_cMaxBufferSize)
{
SmartPointer<ISerializable> spO(dynamic_cast<ISerializable*>(m_spTemplateRecord->clone()));
try
{
runs[cRun]->loadNextObject(spO.get());
buffer.push(SmartPointer<PQEntry>(new PQEntry(spO, m_pComp, runs[cRun])));
}
catch (EndOfStreamException& e)
{
// if there are no more records in the file, do nothing.
}
cMaxRun = std::max(cRun, cMaxRun);
cRun++;
if (cRun == runs.size()) cRun = 0;
}
while (! buffer.empty())
{
SmartPointer<PQEntry> pqe = buffer.top(); buffer.pop();
output->storeNextObject(pqe->m_spRecord.get());
try
{
// no one else is pointing to the same record (hopefully).
pqe->m_spFile->loadNextObject(pqe->m_spRecord.get());
buffer.push(pqe);
}
catch (EndOfStreamException& e)
{
// if there are no more records in the file, do nothing.
}
}
output->rewindForReading();
newruns.push_back(output);
for (cRun = 0; cRun <= cMaxRun; cRun++) runs.pop_front();
if (runs.empty())
{
runs.push_back(newruns.front());
newruns.pop_front();
}
}
runs[0]->rewindForReading();
m_spSortedFile = runs[0];
try
{
m_pNextRecord = dynamic_cast<ISerializable*>(m_spTemplateRecord->clone());
m_spSortedFile->loadNextObject(m_pNextRecord);
}
catch (EndOfStreamException& e)
{
delete m_pNextRecord;
m_pNextRecord = 0;
}
}
}
Tools::IObject* Tools::ExternalSort::getNext()
{
if (m_pNextRecord == 0)
{
#ifndef NDEBUG
assert(m_cReturnedRecords == m_cNumberOfSortedRecords);
#endif
return 0;
}
IObject* ret = dynamic_cast<IObject*>(m_pNextRecord);
if (m_bFitsInBuffer)
{
if (m_buffer.empty()) m_pNextRecord = 0;
else
{
SmartPointer<PQEntry> pqe = m_buffer.top(); m_buffer.pop();
m_pNextRecord = dynamic_cast<ISerializable*>(dynamic_cast<IObject*>(pqe->m_spRecord.get())->clone());
}
}
else
{
try
{
m_pNextRecord = dynamic_cast<ISerializable*>(m_spTemplateRecord->clone());
m_spSortedFile->loadNextObject(m_pNextRecord);
}
catch (EndOfStreamException& e)
{
delete m_pNextRecord;
m_pNextRecord = 0;
}
}
#ifndef NDEBUG
m_cReturnedRecords++;
#endif
return ret;
}
bool Tools::ExternalSort::hasNext()
{
if (m_pNextRecord == 0)
{
#ifndef NDEBUG
assert(m_cReturnedRecords == m_cNumberOfSortedRecords);
#endif
return false;
}
return true;
}
unsigned long Tools::ExternalSort::size() throw (NotSupportedException)
{
return m_cNumberOfSortedRecords;
}
bool Tools::ExternalSort::rewind() throw (NotSupportedException)
{
try
{
m_spSortedFile->rewindForReading();
return true;
}
catch (...)
{
return false;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -