⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tdataascii.c

📁 数据挖掘经典的hierarchial clustering algorithm
💻 C
📖 第 1 页 / 共 2 页
字号:
Boolean TDataAscii::GetRecs(TDHandle req, void *buf, int bufSize,
                            RecId &startRid, int &numRecs, int &dataSize)
{
  DOASSERT(req, "Invalid request handle");

#if DEBUGLVL >= 3
  printf("TDataAscii::GetRecs: handle %d, buf = 0x%p\n", req->iohandle, buf);
#endif

  DOASSERT(req->iohandle >= 0, "I/O request not initialized properly");

  numRecs = bufSize / _recSize;
  DOASSERT(numRecs > 0, "Not enough record buffer space");

  if (req->nextId > req->endId)
    return false;

  int num = req->endId - req->nextId + 1;
  if (num < numRecs)
    numRecs = num;

  if (req->iohandle == 0)
    ReadRec(req->nextId, numRecs, buf);
  else
    ReadRecAsync(req, req->nextId, numRecs, buf);

  startRid = req->nextId;
  dataSize = numRecs * _recSize;
  req->nextId += numRecs;

  _bytesFetched += dataSize;

  if (req->nextId > req->endId)
    req->iohandle = -1;

  return true;
}

void TDataAscii::DoneGetRecs(TDHandle req)
{
  DOASSERT(req, "Invalid request handle");

  /*
     Release chunk of memory cached from pipe.
  */
  if (req->relcb && req->lastOrigChunk)
      req->relcb->ReleaseMemory(MemMgr::Buffer, req->lastOrigChunk, 1);

  if (req->iohandle > 0) {
    /*
       Flush data from pipe. We would also like to tell the DataSource
       (which is at the other end of the pipe) to stop, but we can't
       do that yet.
    */
    while (1) {
      char *chunk;
      streampos_t offset;
      iosize_t bytes;
      int status = _data->Consume(chunk, offset, bytes);
      DOASSERT(status >= 0, "Cannot consume data");
      if (bytes <= 0)
        break;
      /*
         Release chunk so buffer manager (or whoever gets the following
         call) can make use of it.
      */
      if (req->relcb)
        req->relcb->ReleaseMemory(MemMgr::Buffer, chunk, 1);
    }
  }

  delete req;
}

void TDataAscii::GetIndex(RecId id, int *&indices)
{
  static int index[1];
  index[0] = id;
  indices = index;
}

int TDataAscii::GetModTime()
{
  if (!CheckFileStatus())
    return -1;

  return _data->GetModTime();
}

char *TDataAscii::MakeIndexFileName(char *name, char *type)
{
  char *fname = StripPath(name);
  int nameLen = strlen(Init::WorkDir()) + 1 + strlen(fname) + 1;
  char *fn = new char [nameLen];
  sprintf(fn, "%s/%s", Init::WorkDir(), fname);
  return fn;
}

void TDataAscii::Initialize()
{
  _indexFileName = MakeIndexFileName(_name, _type);

  if (!CheckFileStatus())
    return;

  if (_data->isBuf()) {
      BuildIndex();
      return;
  }

  if (!_indexP->Initialize(_indexFileName, _data, this, _lastPos,
                           _totalRecs).IsComplete()) goto error;

  _initTotalRecs = _totalRecs;
  _initLastPos  = _lastPos;

  /* continue to build index */
  BuildIndex();
  return;

 error:
  /* recover from error by building index from scratch  */
  RebuildIndex();
}

void TDataAscii::Checkpoint()
{
  if (!CheckFileStatus()) {
    printf("Cannot checkpoint %s\n", _name);
    return;
  }

  if (_data->isBuf())
      return;

  printf("Checkpointing %s: %ld total records, %ld new\n", _name,
         _totalRecs, _totalRecs - _initTotalRecs);

  if (_lastPos == _initLastPos && _totalRecs == _initTotalRecs)
    /* no need to checkpoint */
    return;

  if (!_indexP->Checkpoint(_indexFileName, _data, this, _lastPos,
    _totalRecs).IsComplete()) goto error;

  _currPos = _data->Tell();

  return;

 error:
  _currPos = _data->Tell();
}


void TDataAscii::InvalidateTData()
{
  if (_data->IsOk()) {
    RebuildIndex();
    TData::InvalidateTData();
  }
}


/* Build index for the file. This code should work when file size
   is extended dynamically. Before calling this function, position
   should be at the last place where file was scanned. */

void TDataAscii::BuildIndex()
{
  char buf[LINESIZE];
  char recBuf[_recSize];
  int oldTotal = _totalRecs;

  _currPos = _lastPos - _lastIncompleteLen;

  /* First go to last valid position of file */
  if (_data->Seek(_currPos, SEEK_SET) < 0) {
    reportErrSys("fseek");
    return;
  }

  _lastIncompleteLen = 0;

  while(1) {

    int len = 0;

    if (_data->Fgets(buf, LINESIZE) == NULL)
      break;

    len = strlen(buf);

    if (len > 0 && buf[len - 1] == '\n') {
      buf[len - 1] = 0;
      if (Decode(recBuf, _currPos, buf)) {
        _indexP->Set(_totalRecs++, _currPos);
      } else {
#if DEBUGLVL >= 7
        printf("Ignoring invalid record: \"%s\"\n", buf);
#endif
      }
      _lastIncompleteLen = 0;
    } else {
#if DEBUGLVL >= 7
      printf("Ignoring incomplete record: \"%s\"\n", buf);
#endif
      _lastIncompleteLen = len;
    }

    _currPos += len;
  }

  /*
     Last position is > current position because TapeDrive advances
     bufferOffset to the next block, past the EOF, when tape file ends.
  */
  _lastPos = _data->Tell();
  DOASSERT(_lastPos >= _currPos, "Incorrect file position");

#if DEBUGLVL >= 3
  printf("Index for %s: %ld total records, %ld new\n", _name,
         _totalRecs, _totalRecs - oldTotal);
#endif

  if (_totalRecs <= 0)
      fprintf(stderr, "No valid records for data stream %s\n"
              "    (check schema/data correspondence)\n", _name);
}

/* Rebuild index */

void TDataAscii::RebuildIndex()
{
#if DEBUGLVL >= 3
  printf("Rebuilding index...\n");
#endif

  InvalidateIndex();

  _indexP->Clear();
  _initTotalRecs = _totalRecs = 0;
  _initLastPos = _lastPos = 0;
  _lastIncompleteLen = 0;

  BuildIndex();
}

TD_Status TDataAscii::ReadRec(RecId id, int numRecs, void *buf)
{
#if DEBUGLVL >= 3
  printf("TDataAscii::ReadRec %ld,%d,0x%p\n", id, numRecs, buf);
#endif

  char line[LINESIZE];

  char *ptr = (char *)buf;

  for(int i = 0; i < numRecs; i++) {

    int len;
    if (_currPos != (long) _indexP->Get(id + i)) {
      if (_data->Seek(_indexP->Get(id + i), SEEK_SET) < 0) {
        perror("fseek");
        DOASSERT(0, "Cannot perform file seek");
      }
      _currPos = _indexP->Get(id + i);
    }
    if (_data->Fgets(line, LINESIZE) == NULL) {
      reportErrSys("fgets");
      DOASSERT(0, "Cannot read from file");
    }
    len = strlen(line);

    if (len > 0 ) {
      DOASSERT(line[len - 1] == '\n', "Data record too long");
      line[len - 1] = '\0';
    }

    Boolean valid = Decode(ptr, _currPos, line);
    DOASSERT(valid, "Inconsistent validity flag");
    ptr += _recSize;

    _currPos += len;
  }

  return TD_OK;
}

TD_Status TDataAscii::ReadRecAsync(TDataRequest *req, RecId id,
                                   int numRecs, void *buf)
{
#if DEBUGLVL >= 3
  printf("TDataAscii::ReadRecAsync %ld,%d,0x%p\n", id, numRecs, buf);
#endif

  int recs = 0;
  char *ptr = (char *)buf;
  char line[LINESIZE];
  int partialRecSize = 0;

  while (recs < numRecs) {
    /* Take chunk from cached values if present */
    char *chunk = req->lastChunk;
    char *origChunk = req->lastOrigChunk;
    iosize_t bytes = req->lastChunkBytes;

    /* No chunk in cache, get next chunk from data source */
    if (!chunk) {
        streampos_t offset;
        int status = _data->Consume(chunk, offset, bytes);
        DOASSERT(status >= 0, "Cannot consume data");
        DOASSERT((off_t)offset == _currPos, "Invalid data chunk consumed");
        _currPos += bytes;
        origChunk = chunk;
    } else {
        req->lastChunk = req->lastOrigChunk = NULL;
        req->lastChunkBytes = 0;
    }

    DOASSERT(chunk && origChunk, "Inconsistent state");

    if (bytes <= 0)
        break;

    while (recs < numRecs && bytes > 0) {
      char *eol = (char *)memchr(chunk, '\n', bytes);
      if (!eol) {
        DOASSERT(partialRecSize == 0, "Two consecutive partial records");
        /* Store fraction of record for next loop */
        memcpy(line, chunk, bytes);
        line[bytes] = 0;
        partialRecSize = bytes;
#if DEBUGLVL >= 3
        printf("Caching remainder of chunk (%d bytes): \"%s\"\n",
               partialRecSize, line);
#endif
        break;
      }

      /*
         Append record to existing record from previous iteration of
         outer loop if there is a fragment of it left. Terminating
         newline is first replaced with a null, then appended, and
         then the newline is put back.
      */

      char *record = chunk;
      int recSize = eol - record;
      int fullRecSize = recSize;

      char oldch = *eol;
      *eol = 0;

      if (partialRecSize > 0) {
          fullRecSize += partialRecSize;
          memcpy(&line[partialRecSize], record, recSize + 1);
#if DEBUGLVL >= 5
          printf("Got %d-byte record (%d partial): \"%s\"\n", fullRecSize,
                 partialRecSize, line);
#endif
          partialRecSize = 0;
          record = line;
      } else {
#if DEBUGLVL >= 5
          printf("Got %d-byte full record: \"%s\"\n", fullRecSize, record);
#endif
      }

      Boolean valid = Decode(ptr, _currPos, record);
      if (valid) {
        ptr += _recSize;
        recs++;
      }

      *eol = oldch;
      chunk = eol + 1;
      bytes -= recSize + 1;
    }

    if (recs == numRecs && bytes > 0) {
      /* Save unused piece of chunk for next call to this function */
      req->lastChunk = chunk;
      req->lastOrigChunk = origChunk;
      req->lastChunkBytes = bytes;
#if DEBUGLVL >= 3
      printf("Saving %ld bytes of chunk 0x%p for next function call\n",
             bytes, origChunk);
#endif
    } else {
      /*
         Release chunk so buffer manager (or whoever gets the following
         call) can make use of it.
      */
      if (req->relcb)
          req->relcb->ReleaseMemory(MemMgr::Buffer, origChunk, 1);
    }
  }

  if (recs != numRecs)
    fprintf(stderr, "Data source produced %d records, not %d\n",
            recs, numRecs);
  DOASSERT(recs == numRecs, "Incomplete data transfer");


  return TD_OK;
}

void TDataAscii::WriteRecs(RecId startRid, int numRecs, void *buf)
{
  DOASSERT(!_data->isTape(), "Writing to tape not supported yet");

  _totalRecs += numRecs;

  _indexP->Set(_totalRecs - 1, _lastPos);
  int len = strlen((char *)buf);

  if (_data->append(buf, len) != len) {
    reportErrSys("append");
    DOASSERT(0, "Cannot append to file");
  }

  _lastPos = _data->Tell();
  _currPos = _lastPos;
}

void TDataAscii::WriteLine(void *line)
{
  DOASSERT(!_data->isTape(), "Writing to tape not supported yet");

  int len = strlen((char *)line);

  if (_data->append(line, len) != len) {
    reportErrSys("append");
    DOASSERT(0, "Cannot append to file");
  }

  _lastPos = _data->Tell();
  _currPos = _lastPos;
}

void TDataAscii::Cleanup()
{
  Checkpoint();

  if (_data->isTape())
    _data->printStats();
}

void TDataAscii::PrintIndices()
{
  int cnt = 0;
  for(long i = 0; i < _totalRecs; i++) {
    printf("%ld ", _indexP->Get(i));
    if (cnt++ == 10) {
      printf("\n");
      cnt = 0;
    }
  }
  printf("\n");
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -