📄 tdatabinary.c
字号:
TData::TDHandle TDataBinary::InitGetRecs(RecId lowId, RecId highId,
Boolean asyncAllowed,
ReleaseMemoryCallback *callback)
{
DOASSERT((long)lowId < _totalRecs && (long)highId < _totalRecs
&& highId >= lowId, "Invalid record parameters");
TDataRequest *req = new TDataRequest;
DOASSERT(req, "Out of memory");
req->nextId = lowId;
req->endId = highId;
req->relcb = callback;
return req;
}
Boolean TDataBinary::GetRecs(TDHandle req, void *buf, int bufSize,
RecId &startRid, int &numRecs, int &dataSize)
{
DOASSERT(req, "Invalid request handle");
#if defined(DEBUG)
printf("TDataBinary::GetRecs buf = 0x%p\n", buf);
#endif
numRecs = bufSize / _recSize;
DOASSERT(numRecs > 0, "Not enough record buffer space");
if (req->nextId > req->endId)
return false;
int num = req->endId - req->nextId + 1;
if (num < numRecs)
numRecs = num;
ReadRec(req->nextId, numRecs, buf);
startRid = req->nextId;
dataSize = numRecs * _recSize;
req->nextId += numRecs;
_bytesFetched += dataSize;
return true;
}
void TDataBinary::DoneGetRecs(TDHandle req)
{
DOASSERT(req, "Invalid request handle");
delete req;
}
void TDataBinary::GetIndex(RecId id, int *&indices)
{
static int index[1];
index[0] = id;
indices = index;
}
int TDataBinary::GetModTime()
{
if (!CheckFileStatus())
return -1;
return _data->GetModTime();
}
char *TDataBinary::MakeIndexFileName(char *name, char *type)
{
char *fname = StripPath(name);
int nameLen = strlen(Init::WorkDir()) + 1 + strlen(fname) + 1;
char *fn = new char[nameLen];
sprintf(fn, "%s/%s", Init::WorkDir(), fname);
return fn;
}
void TDataBinary::Initialize()
{
_indexFileName = MakeIndexFileName(_name, _type);
if (!CheckFileStatus())
return;
if (_data->isBuf()) {
BuildIndex();
return;
}
if (!_indexP->Initialize(_indexFileName, _data, this, _lastPos,
_totalRecs).IsComplete()) goto error;
_initTotalRecs = _totalRecs;
_initLastPos = _lastPos;
/* continue to build index */
BuildIndex();
return;
error:
/* recover from error by building index from scratch */
#if defined(DEBUG)
printf("Rebuilding index...\n");
#endif
RebuildIndex();
}
void TDataBinary::Checkpoint()
{
if (!CheckFileStatus()) {
printf("Cannot checkpoint %s\n", _name);
return;
}
if (_data->isBuf()) {
BuildIndex();
return;
}
printf("Checkpointing %s: %ld total records, %ld new\n", _name,
_totalRecs, _totalRecs - _initTotalRecs);
if (_lastPos == _initLastPos && _totalRecs == _initTotalRecs)
/* no need to checkpoint */
return;
if (!_indexP->Checkpoint(_indexFileName, _data, this, _lastPos,
_totalRecs).IsComplete()) goto error;
_currPos = _data->Tell();
return;
error:
_currPos = _data->Tell();
}
/* Build index for the file. This code should work when file size
is extended dynamically. Before calling this function, position
should be at the last place where file was scanned. */
void TDataBinary::BuildIndex()
{
char physRec[_physRecSize];
char recBuf[_recSize];
int oldTotal = _totalRecs;
_currPos = _lastPos - _lastIncompleteLen;
// First go to last valid position of file
if (_data->Seek(_currPos, SEEK_SET) < 0) {
reportErrSys("fseek");
return;
}
_lastIncompleteLen = 0;
while(1) {
int len = 0;
len = _data->Fread(physRec, 1, _physRecSize);
if (!len)
break;
DOASSERT(len >= 0, "Cannot read data stream");
if (len == _physRecSize) {
if (Decode(recBuf, _currPos / _physRecSize, physRec)) {
_indexP->Set(_totalRecs++, _currPos);
} else {
#if defined(DEBUG)
printf("Ignoring invalid or non-matching record\n");
#endif
}
_lastIncompleteLen = 0;
} else {
#if defined(DEBUG)
printf("Ignoring incomplete record (%d bytes)\n", len);
#endif
_lastIncompleteLen = len;
}
_currPos += len;
}
// last position is > current position because TapeDrive advances
// bufferOffset to the next block, past the EOF, when tape file
// ends
_lastPos = _data->Tell();
DOASSERT(_lastPos >= _currPos, "Incorrect file position");
#ifdef DEBUG
printf("Index for %s: %ld total records, %ld new\n", _name,
_totalRecs, _totalRecs - oldTotal);
#endif
if (_totalRecs <= 0)
fprintf(stderr, "No valid records for data stream %s\n"
" (check schema/data correspondence)\n", _name);
}
/* Rebuild index */
void TDataBinary::RebuildIndex()
{
InvalidateIndex();
_indexP->Clear();
_initTotalRecs = _totalRecs = 0;
_initLastPos = _lastPos = 0;
_lastIncompleteLen = 0;
BuildIndex();
}
TD_Status TDataBinary::ReadRec(RecId id, int numRecs, void *buf)
{
#if defined(DEBUG)
printf("TDataBinary::ReadRec %ld,%d,0x%p\n", id, numRecs, buf);
#endif
char *ptr = (char *)buf;
for(int i = 0; i < numRecs; i++) {
long recloc = _indexP->Get(id + i);
// Note that if the data source is a tape, we _always_ seek, even if
// we think we're already at the right place. This was copied from
// the previously-existing code. RKW 5/21/96.
if (_data->isTape() || (_currPos != recloc)) {
if (_data->Seek(recloc, SEEK_SET) < 0) {
reportErrSys("fseek");
DOASSERT(0, "Cannot perform file seek");
}
_currPos = recloc;
}
if (_data->Fread(ptr, _physRecSize, 1) != 1) {
reportErrSys("fread");
DOASSERT(0, "Cannot read from file");
}
Boolean valid = Decode(ptr, _currPos / _physRecSize, ptr);
DOASSERT(valid, "Inconsistent validity flag");
ptr += _recSize;
_currPos += _physRecSize;
}
return TD_OK;
}
void TDataBinary::WriteRecs(RecId startRid, int numRecs, void *buf)
{
DOASSERT(!_data->isTape(), "Writing to tape not supported yet");
_totalRecs += numRecs;
_indexP->Set(_totalRecs - 1, _lastPos);
int len = numRecs * _physRecSize;
if (_data->append(buf, len) != len) {
reportErrSys("tapewrite");
DOASSERT(0, "Cannot append to file");
}
_lastPos = _data->Tell();
_currPos = _lastPos;
}
void TDataBinary::WriteLine(void *rec)
{
WriteRecs(0, 1, rec);
}
void TDataBinary::Cleanup()
{
Checkpoint();
if (_data->isTape())
_data->printStats();
}
void TDataBinary::PrintIndices()
{
int cnt = 0;
for(long i = 0; i < _totalRecs; i++) {
printf("%ld ", _indexP->Get(i));
if (cnt++ == 10) {
printf("\n");
cnt = 0;
}
}
printf("\n");
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -