📄 tdataascii.c
字号:
Boolean TDataAscii::GetRecs(TDHandle req, void *buf, int bufSize,
RecId &startRid, int &numRecs, int &dataSize)
{
DOASSERT(req, "Invalid request handle");
#if DEBUGLVL >= 3
printf("TDataAscii::GetRecs: handle %d, buf = 0x%p\n", req->iohandle, buf);
#endif
DOASSERT(req->iohandle >= 0, "I/O request not initialized properly");
numRecs = bufSize / _recSize;
DOASSERT(numRecs > 0, "Not enough record buffer space");
if (req->nextId > req->endId)
return false;
int num = req->endId - req->nextId + 1;
if (num < numRecs)
numRecs = num;
if (req->iohandle == 0)
ReadRec(req->nextId, numRecs, buf);
else
ReadRecAsync(req, req->nextId, numRecs, buf);
startRid = req->nextId;
dataSize = numRecs * _recSize;
req->nextId += numRecs;
_bytesFetched += dataSize;
if (req->nextId > req->endId)
req->iohandle = -1;
return true;
}
void TDataAscii::DoneGetRecs(TDHandle req)
{
DOASSERT(req, "Invalid request handle");
/*
Release chunk of memory cached from pipe.
*/
if (req->relcb && req->lastOrigChunk)
req->relcb->ReleaseMemory(MemMgr::Buffer, req->lastOrigChunk, 1);
if (req->iohandle > 0) {
/*
Flush data from pipe. We would also like to tell the DataSource
(which is at the other end of the pipe) to stop, but we can't
do that yet.
*/
while (1) {
char *chunk;
streampos_t offset;
iosize_t bytes;
int status = _data->Consume(chunk, offset, bytes);
DOASSERT(status >= 0, "Cannot consume data");
if (bytes <= 0)
break;
/*
Release chunk so buffer manager (or whoever gets the following
call) can make use of it.
*/
if (req->relcb)
req->relcb->ReleaseMemory(MemMgr::Buffer, chunk, 1);
}
}
delete req;
}
void TDataAscii::GetIndex(RecId id, int *&indices)
{
static int index[1];
index[0] = id;
indices = index;
}
int TDataAscii::GetModTime()
{
if (!CheckFileStatus())
return -1;
return _data->GetModTime();
}
char *TDataAscii::MakeIndexFileName(char *name, char *type)
{
char *fname = StripPath(name);
int nameLen = strlen(Init::WorkDir()) + 1 + strlen(fname) + 1;
char *fn = new char [nameLen];
sprintf(fn, "%s/%s", Init::WorkDir(), fname);
return fn;
}
void TDataAscii::Initialize()
{
_indexFileName = MakeIndexFileName(_name, _type);
if (!CheckFileStatus())
return;
if (_data->isBuf()) {
BuildIndex();
return;
}
if (!_indexP->Initialize(_indexFileName, _data, this, _lastPos,
_totalRecs).IsComplete()) goto error;
_initTotalRecs = _totalRecs;
_initLastPos = _lastPos;
/* continue to build index */
BuildIndex();
return;
error:
/* recover from error by building index from scratch */
RebuildIndex();
}
void TDataAscii::Checkpoint()
{
if (!CheckFileStatus()) {
printf("Cannot checkpoint %s\n", _name);
return;
}
if (_data->isBuf())
return;
printf("Checkpointing %s: %ld total records, %ld new\n", _name,
_totalRecs, _totalRecs - _initTotalRecs);
if (_lastPos == _initLastPos && _totalRecs == _initTotalRecs)
/* no need to checkpoint */
return;
if (!_indexP->Checkpoint(_indexFileName, _data, this, _lastPos,
_totalRecs).IsComplete()) goto error;
_currPos = _data->Tell();
return;
error:
_currPos = _data->Tell();
}
void TDataAscii::InvalidateTData()
{
if (_data->IsOk()) {
RebuildIndex();
TData::InvalidateTData();
}
}
/* Build index for the file. This code should work when file size
is extended dynamically. Before calling this function, position
should be at the last place where file was scanned. */
void TDataAscii::BuildIndex()
{
char buf[LINESIZE];
char recBuf[_recSize];
int oldTotal = _totalRecs;
_currPos = _lastPos - _lastIncompleteLen;
/* First go to last valid position of file */
if (_data->Seek(_currPos, SEEK_SET) < 0) {
reportErrSys("fseek");
return;
}
_lastIncompleteLen = 0;
while(1) {
int len = 0;
if (_data->Fgets(buf, LINESIZE) == NULL)
break;
len = strlen(buf);
if (len > 0 && buf[len - 1] == '\n') {
buf[len - 1] = 0;
if (Decode(recBuf, _currPos, buf)) {
_indexP->Set(_totalRecs++, _currPos);
} else {
#if DEBUGLVL >= 7
printf("Ignoring invalid record: \"%s\"\n", buf);
#endif
}
_lastIncompleteLen = 0;
} else {
#if DEBUGLVL >= 7
printf("Ignoring incomplete record: \"%s\"\n", buf);
#endif
_lastIncompleteLen = len;
}
_currPos += len;
}
/*
Last position is > current position because TapeDrive advances
bufferOffset to the next block, past the EOF, when tape file ends.
*/
_lastPos = _data->Tell();
DOASSERT(_lastPos >= _currPos, "Incorrect file position");
#if DEBUGLVL >= 3
printf("Index for %s: %ld total records, %ld new\n", _name,
_totalRecs, _totalRecs - oldTotal);
#endif
if (_totalRecs <= 0)
fprintf(stderr, "No valid records for data stream %s\n"
" (check schema/data correspondence)\n", _name);
}
/* Rebuild index */
void TDataAscii::RebuildIndex()
{
#if DEBUGLVL >= 3
printf("Rebuilding index...\n");
#endif
InvalidateIndex();
_indexP->Clear();
_initTotalRecs = _totalRecs = 0;
_initLastPos = _lastPos = 0;
_lastIncompleteLen = 0;
BuildIndex();
}
TD_Status TDataAscii::ReadRec(RecId id, int numRecs, void *buf)
{
#if DEBUGLVL >= 3
printf("TDataAscii::ReadRec %ld,%d,0x%p\n", id, numRecs, buf);
#endif
char line[LINESIZE];
char *ptr = (char *)buf;
for(int i = 0; i < numRecs; i++) {
int len;
if (_currPos != (long) _indexP->Get(id + i)) {
if (_data->Seek(_indexP->Get(id + i), SEEK_SET) < 0) {
perror("fseek");
DOASSERT(0, "Cannot perform file seek");
}
_currPos = _indexP->Get(id + i);
}
if (_data->Fgets(line, LINESIZE) == NULL) {
reportErrSys("fgets");
DOASSERT(0, "Cannot read from file");
}
len = strlen(line);
if (len > 0 ) {
DOASSERT(line[len - 1] == '\n', "Data record too long");
line[len - 1] = '\0';
}
Boolean valid = Decode(ptr, _currPos, line);
DOASSERT(valid, "Inconsistent validity flag");
ptr += _recSize;
_currPos += len;
}
return TD_OK;
}
TD_Status TDataAscii::ReadRecAsync(TDataRequest *req, RecId id,
int numRecs, void *buf)
{
#if DEBUGLVL >= 3
printf("TDataAscii::ReadRecAsync %ld,%d,0x%p\n", id, numRecs, buf);
#endif
int recs = 0;
char *ptr = (char *)buf;
char line[LINESIZE];
int partialRecSize = 0;
while (recs < numRecs) {
/* Take chunk from cached values if present */
char *chunk = req->lastChunk;
char *origChunk = req->lastOrigChunk;
iosize_t bytes = req->lastChunkBytes;
/* No chunk in cache, get next chunk from data source */
if (!chunk) {
streampos_t offset;
int status = _data->Consume(chunk, offset, bytes);
DOASSERT(status >= 0, "Cannot consume data");
DOASSERT((off_t)offset == _currPos, "Invalid data chunk consumed");
_currPos += bytes;
origChunk = chunk;
} else {
req->lastChunk = req->lastOrigChunk = NULL;
req->lastChunkBytes = 0;
}
DOASSERT(chunk && origChunk, "Inconsistent state");
if (bytes <= 0)
break;
while (recs < numRecs && bytes > 0) {
char *eol = (char *)memchr(chunk, '\n', bytes);
if (!eol) {
DOASSERT(partialRecSize == 0, "Two consecutive partial records");
/* Store fraction of record for next loop */
memcpy(line, chunk, bytes);
line[bytes] = 0;
partialRecSize = bytes;
#if DEBUGLVL >= 3
printf("Caching remainder of chunk (%d bytes): \"%s\"\n",
partialRecSize, line);
#endif
break;
}
/*
Append record to existing record from previous iteration of
outer loop if there is a fragment of it left. Terminating
newline is first replaced with a null, then appended, and
then the newline is put back.
*/
char *record = chunk;
int recSize = eol - record;
int fullRecSize = recSize;
char oldch = *eol;
*eol = 0;
if (partialRecSize > 0) {
fullRecSize += partialRecSize;
memcpy(&line[partialRecSize], record, recSize + 1);
#if DEBUGLVL >= 5
printf("Got %d-byte record (%d partial): \"%s\"\n", fullRecSize,
partialRecSize, line);
#endif
partialRecSize = 0;
record = line;
} else {
#if DEBUGLVL >= 5
printf("Got %d-byte full record: \"%s\"\n", fullRecSize, record);
#endif
}
Boolean valid = Decode(ptr, _currPos, record);
if (valid) {
ptr += _recSize;
recs++;
}
*eol = oldch;
chunk = eol + 1;
bytes -= recSize + 1;
}
if (recs == numRecs && bytes > 0) {
/* Save unused piece of chunk for next call to this function */
req->lastChunk = chunk;
req->lastOrigChunk = origChunk;
req->lastChunkBytes = bytes;
#if DEBUGLVL >= 3
printf("Saving %ld bytes of chunk 0x%p for next function call\n",
bytes, origChunk);
#endif
} else {
/*
Release chunk so buffer manager (or whoever gets the following
call) can make use of it.
*/
if (req->relcb)
req->relcb->ReleaseMemory(MemMgr::Buffer, origChunk, 1);
}
}
if (recs != numRecs)
fprintf(stderr, "Data source produced %d records, not %d\n",
recs, numRecs);
DOASSERT(recs == numRecs, "Incomplete data transfer");
return TD_OK;
}
void TDataAscii::WriteRecs(RecId startRid, int numRecs, void *buf)
{
DOASSERT(!_data->isTape(), "Writing to tape not supported yet");
_totalRecs += numRecs;
_indexP->Set(_totalRecs - 1, _lastPos);
int len = strlen((char *)buf);
if (_data->append(buf, len) != len) {
reportErrSys("append");
DOASSERT(0, "Cannot append to file");
}
_lastPos = _data->Tell();
_currPos = _lastPos;
}
void TDataAscii::WriteLine(void *line)
{
DOASSERT(!_data->isTape(), "Writing to tape not supported yet");
int len = strlen((char *)line);
if (_data->append(line, len) != len) {
reportErrSys("append");
DOASSERT(0, "Cannot append to file");
}
_lastPos = _data->Tell();
_currPos = _lastPos;
}
void TDataAscii::Cleanup()
{
Checkpoint();
if (_data->isTape())
_data->printStats();
}
void TDataAscii::PrintIndices()
{
int cnt = 0;
for(long i = 0; i < _totalRecs; i++) {
printf("%ld ", _indexP->Get(i));
if (cnt++ == 10) {
printf("\n");
cnt = 0;
}
}
printf("\n");
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -