⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 memmgr.c

📁 数据挖掘经典的hierarchial clustering algorithm
💻 C
📖 第 1 页 / 共 2 页
字号:
        printf("Now %d buffer pages remain in use\n", _count->buffer);
#endif
    } else {
        _count->cache -= pages;
#if DEBUGLVL >= 3
        printf("Now %d cache pages remain in use\n", _count->cache);
#endif
    }

#if DEBUGLVL >= 5
    printf("Memory allocation table now:\n");
    Dump();
#endif

    DOASSERT(_count->cache >= 0 && _count->buffer >= 0, "Inconsistent state");

    // If someone is waiting for a free page, signal it
    if (_count->free == pages)
        ReleaseFree();

    ReleaseMutex();

    return 0;
}

int MemMgr::Convert(char *page, PageType oldType, PageType &newType)
{
    AcquireMutex();

    page = page;

    if (oldType != newType) {
        if (oldType == Cache) {
            _count->cache--;
            _count->buffer++;
        } else {
            _count->cache++;
            _count->buffer--;
        }

#if DEBUGLVL >= 3
        printf("Memory manager now has %d cache pages, %d buffer pages\n",
               _count->cache, _count->buffer);
#endif
    }

    ReleaseMutex();

    return 0;
}

void MemMgr::Dump()
{
    printf("Free %d, buffer %d (%d max), cache %d (%d max), entries %d\n",
           _count->free, _count->buffer, _count->maxBuffer,
           _count->cache, _count->maxCache, _count->entries);
    for(int i = 0; i < (_tableSize > 10 ? 10 : _tableSize); i++) {
        printf("memory[%d] = 0x%p, %d pages\n", i, _freePage[i],
               _freePageCount[i]);
    }
}

DataPipe::DataPipe(int maxSize, int &status)
{
    status = Initialize(maxSize);
}

int DataPipe::Initialize(int maxSize)
{
#if DEBUGLVL >= 1
    printf("Creating new data pipe, maximum size %d\n", maxSize);
#endif

    if (maxSize < 2) {
        maxSize = 2;
        fprintf(stderr, "Adjusting maximum size to %d\n", maxSize);
    }

    int status;
    _sem = new SemaphoreV(Semaphore::newKey(), status, 1);
    if (!_sem || status < 0) {
      fprintf(stderr, "Cannot create semaphore\n");
      return -1;
    }
    _sem->setValue(1);

    _free = new SemaphoreV(Semaphore::newKey(), status, 1);
    if (!_free || status < 0) {
      fprintf(stderr, "Cannot create semaphore\n");
      return -1;
    }
    _free->setValue(0);

    _data = new SemaphoreV(Semaphore::newKey(), status, 1);
    if (!_data || status < 0) {
      fprintf(stderr, "Cannot create semaphore\n");
      return -1;
    }
    _data->setValue(0);

    int size = maxSize * (sizeof(char *) + sizeof(streampos_t)
                          + sizeof(iosize_t)) + sizeof(CountStruct);

    key_t _shmKey = SharedMemory::newKey();
    int created = 0;
    char *buf;
    _shm = new SharedMemory(_shmKey, size, buf, created);
    if (!_shm) {
      fprintf(stderr, "Cannot create shared memory\n");
      return -1;
    }
    if (!created)
        printf("Warning: pre-existing shared memory initialized\n");
#if DEBUGLVL >= 1
    printf("Created a %d-byte shared memory segment (key %d) at 0x%p\n",
           size, _shmKey, buf);
#endif

    if (!buf) {
      fprintf(stderr, "Failed to get shared memory\n");
      return -1;
    }

    _chunk = (char **)buf;
    _offset = (streampos_t *)(_chunk + maxSize);
    _bytes = (iosize_t *)(_offset + maxSize);
    _count = (CountStruct *)(_bytes + maxSize);

    for(int i = 0; i < maxSize; i++) {
        _chunk[i] = 0;
        _offset[i] = 0;
        _bytes[i] = 0;
    }

    _count->head = 0;
    _count->tail = 0;
    _count->free = maxSize;
    _count->size = maxSize;
    _count->maxSize = maxSize;

    return 0;
}

DataPipe::~DataPipe()
{
    delete _shm;

    _data->destroy();
    delete _data;

    _free->destroy();
    delete _free;

    _sem->destroy();
    delete _sem;
}

int DataPipe::Produce(char *buf, streampos_t offset, iosize_t bytes)
{
    AcquireMutex();

    // Free count may be negative if pipe size was recently reduced.

    while (_count->free <= 0) {
#if DEBUGLVL >= 3
        printf("Pipe 0x%p has to wait for consumer's free space (%d)\n",
               this, _count->free);
#endif
        ReleaseMutex();
        AcquireFree();
        AcquireMutex();
    }

    DOASSERT(!_chunk[_count->head], "Inconsistent state");

    _chunk[_count->head] = buf;
    _offset[_count->head] = offset;
    _bytes[_count->head] = bytes;

    _count->head = (_count->head + 1) % _count->maxSize;
    _count->free--;

#if DEBUGLVL >= 3
    printf("Pipe 0x%p has %d free space left\n", this, _count->free);
#endif

    if (_count->free == _count->size - 1) {
#if DEBUGLVL >= 3
        printf("Pipe 0x%p signaling consumer about new data (%d,%d)\n",
               this, _count->free, _count->size);
#endif
        ReleaseData();
    }

    ReleaseMutex();

    return 0;
}

int DataPipe::Consume(char *&buf, streampos_t &offset, iosize_t &bytes)
{
    AcquireMutex();

    while (_count->free == _count->size) {
#if DEBUGLVL >= 3
        printf("Pipe 0x%p has to wait for producer's data (%d,%d)\n",
               this, _count->free, _count->size);
#endif
        ReleaseMutex();
        AcquireData();
        AcquireMutex();
    }

    buf = _chunk[_count->tail];
    offset = _offset[_count->tail];
    bytes = _bytes[_count->tail];

    _chunk[_count->tail] = 0;
    _offset[_count->tail] = 0;
    _bytes[_count->tail] = 0;
    _count->tail = (_count->tail + 1) % _count->maxSize;
    _count->free++;

#if DEBUGLVL >= 3
    printf("Pipe 0x%p has %d free space left\n", this, _count->free);
#endif

    if (_count->free == 1) {
#if DEBUGLVL >= 3
        printf("Pipe 0x%p signaling producer about free space\n", this);
#endif
        ReleaseFree();
    }

    ReleaseMutex();

    return 0;
}

int DataPipe::SetSize(int size)
{
    AcquireMutex();

    if (size < 2) {
        size = 2;
        fprintf(stderr, "Adjusting pipe size to %d\n", size);
    }

    if (size >= _count->maxSize) {
        fprintf(stderr, "Invalid pipe size %d reduced to %d\n",
                size, _count->maxSize);
        size = _count->maxSize;
    }

    // The following adjustment to free count may leave if negative.
    // That's fine, and just means that Produce() will not be able
    // to append data to the pipe until that many chunks have been
    // consumed from the pipe.

    _count->free += size - _count->size;
    _count->size = size;

    ReleaseMutex();

    return size;
}

MultiPipe::MultiPipe(int &status) : _numPipes(0), _hint(0)
{
    _data = new SemaphoreV(Semaphore::newKey(), status, 1);
    if (!_data || status < 0) {
      fprintf(stderr, "Cannot create semaphore\n");
      status = -1;
      return;
    }
    _data->setValue(0);
}

MultiPipe::~MultiPipe()
{
    _data->destroy();
    delete _data;
}

int MultiPipe::AddPipe(DataPipe *pipe)
{
    // See if pipe already in multipipe
    for(int i = 0; i < _numPipes; i++) {
        if (_pipes[i] == pipe)
            return -1;
    }

    if (_numPipes >= _maxPipes)
      return -1;

    _pipes[_numPipes++] = pipe;

#if DEBUGLVL >= 3
    printf("Added pipe 0x%p to multipipe 0x%p\n", pipe, this);
#endif

    return 0;
}

int MultiPipe::RemovePipe(DataPipe *pipe)
{
    for(int i = 0; i < _numPipes; i++) {
        if (_pipes[i] == pipe) {
            for(int j = i; j < _numPipes - 1; j++)
                _pipes[j] = _pipes[j + 1];
            _numPipes--;
            if (_hint >= _numPipes)
                _hint = 0;
#if DEBUGLVL >= 3
            printf("Removed pipe 0x%p from multipipe 0x%p\n", pipe, this);
#endif
            return 0;
        }
    }

    return -1;
}

int MultiPipe::Consume(char *&buf, streampos_t &offset,
                       iosize_t &bytes, DataPipe *&pipe)
{
    for(int j = 0; j < _numPipes; j++) {
        pipe = _pipes[_hint];
        if (pipe->NumData() > 0) {
#if DEBUGLVL >= 3
            printf("Consuming data from pipe 0x%p of multipipe 0x%p\n",
                   pipe, this);
#endif
            return pipe->Consume(buf, offset, bytes);
        }
        _hint++;
    }

    // Block on first pipe but change this later once a smarter
    // synchronization mechanism is designed.

    pipe = _pipes[_hint];
#if DEBUGLVL >= 3
    printf("Blocking and consuming data from pipe 0x%p of multipipe 0x%p\n",
           pipe, this);
#endif
    return pipe->Consume(buf, offset, bytes);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -