📄 memmgr.c
字号:
printf("Now %d buffer pages remain in use\n", _count->buffer);#endif } else { _count->cache -= pages;#if DEBUGLVL >= 3 printf("Now %d cache pages remain in use\n", _count->cache);#endif }#if DEBUGLVL >= 5 printf("Memory allocation table now:\n"); Dump();#endif DOASSERT(_count->cache >= 0 && _count->buffer >= 0, "Inconsistent state"); // If someone is waiting for a free page, signal it if (_count->free == pages) ReleaseFree(); ReleaseMutex(); return 0;}int MemMgr::Convert(char *page, PageType oldType, PageType &newType){ AcquireMutex(); page = page; if (oldType != newType) { if (oldType == Cache) { _count->cache--; _count->buffer++; } else { _count->cache++; _count->buffer--; }#if DEBUGLVL >= 3 printf("Memory manager now has %d cache pages, %d buffer pages\n", _count->cache, _count->buffer);#endif } ReleaseMutex(); return 0;}void MemMgr::Dump(){ printf("Free %d, buffer %d (%d max), cache %d (%d max), entries %d\n", _count->free, _count->buffer, _count->maxBuffer, _count->cache, _count->maxCache, _count->entries); for(int i = 0; i < (_tableSize > 10 ? 10 : _tableSize); i++) { printf("memory[%d] = 0x%p, %d pages\n", i, _freePage[i], _freePageCount[i]); }}DataPipe::DataPipe(int maxSize, int &status){ status = Initialize(maxSize);}int DataPipe::Initialize(int maxSize){#if DEBUGLVL >= 1 printf("Creating new data pipe, maximum size %d\n", maxSize);#endif if (maxSize < 2) { maxSize = 2; fprintf(stderr, "Adjusting maximum size to %d\n", maxSize); } int status; _sem = new SemaphoreV(Semaphore::newKey(), status, 1); if (!_sem || status < 0) { fprintf(stderr, "Cannot create semaphore\n"); return -1; } _sem->setValue(1); _free = new SemaphoreV(Semaphore::newKey(), status, 1); if (!_free || status < 0) { fprintf(stderr, "Cannot create semaphore\n"); return -1; } _free->setValue(0); _data = new SemaphoreV(Semaphore::newKey(), status, 1); if (!_data || status < 0) { fprintf(stderr, "Cannot create semaphore\n"); return -1; } _data->setValue(0); int size = maxSize * (sizeof(char *) + sizeof(streampos_t) + sizeof(iosize_t)) + sizeof(CountStruct); key_t _shmKey = SharedMemory::newKey(); int created = 0; char *buf; _shm = new SharedMemory(_shmKey, size, buf, created); if (!_shm) { fprintf(stderr, "Cannot create shared memory\n"); return -1; } if (!created) printf("Warning: pre-existing shared memory initialized\n");#if DEBUGLVL >= 1 printf("Created a %d-byte shared memory segment (key %d) at 0x%p\n", size, _shmKey, buf);#endif if (!buf) { fprintf(stderr, "Failed to get shared memory\n"); return -1; } _chunk = (char **)buf; _offset = (streampos_t *)(_chunk + maxSize); _bytes = (iosize_t *)(_offset + maxSize); _count = (CountStruct *)(_bytes + maxSize); for(int i = 0; i < maxSize; i++) { _chunk[i] = 0; _offset[i] = 0; _bytes[i] = 0; } _count->head = 0; _count->tail = 0; _count->free = maxSize; _count->size = maxSize; _count->maxSize = maxSize; return 0;}DataPipe::~DataPipe(){ delete _shm; _data->destroy(); delete _data; _free->destroy(); delete _free; _sem->destroy(); delete _sem;}int DataPipe::Produce(char *buf, streampos_t offset, iosize_t bytes){ AcquireMutex(); // Free count may be negative if pipe size was recently reduced. while (_count->free <= 0) {#if DEBUGLVL >= 3 printf("Pipe 0x%p has to wait for consumer's free space (%d)\n", this, _count->free);#endif ReleaseMutex(); AcquireFree(); AcquireMutex(); } DOASSERT(!_chunk[_count->head], "Inconsistent state"); _chunk[_count->head] = buf; _offset[_count->head] = offset; _bytes[_count->head] = bytes; _count->head = (_count->head + 1) % _count->maxSize; _count->free--;#if DEBUGLVL >= 3 printf("Pipe 0x%p has %d free space left\n", this, _count->free);#endif if (_count->free == _count->size - 1) {#if DEBUGLVL >= 3 printf("Pipe 0x%p signaling consumer about new data (%d,%d)\n", this, _count->free, _count->size);#endif ReleaseData(); } ReleaseMutex(); return 0;}int DataPipe::Consume(char *&buf, streampos_t &offset, iosize_t &bytes){ AcquireMutex(); while (_count->free == _count->size) {#if DEBUGLVL >= 3 printf("Pipe 0x%p has to wait for producer's data (%d,%d)\n", this, _count->free, _count->size);#endif ReleaseMutex(); AcquireData(); AcquireMutex(); } buf = _chunk[_count->tail]; offset = _offset[_count->tail]; bytes = _bytes[_count->tail]; _chunk[_count->tail] = 0; _offset[_count->tail] = 0; _bytes[_count->tail] = 0; _count->tail = (_count->tail + 1) % _count->maxSize; _count->free++;#if DEBUGLVL >= 3 printf("Pipe 0x%p has %d free space left\n", this, _count->free);#endif if (_count->free == 1) {#if DEBUGLVL >= 3 printf("Pipe 0x%p signaling producer about free space\n", this);#endif ReleaseFree(); } ReleaseMutex(); return 0;}int DataPipe::SetSize(int size){ AcquireMutex(); if (size < 2) { size = 2; fprintf(stderr, "Adjusting pipe size to %d\n", size); } if (size >= _count->maxSize) { fprintf(stderr, "Invalid pipe size %d reduced to %d\n", size, _count->maxSize); size = _count->maxSize; } // The following adjustment to free count may leave if negative. // That's fine, and just means that Produce() will not be able // to append data to the pipe until that many chunks have been // consumed from the pipe. _count->free += size - _count->size; _count->size = size; ReleaseMutex(); return size;}MultiPipe::MultiPipe(int &status) : _numPipes(0), _hint(0){ _data = new SemaphoreV(Semaphore::newKey(), status, 1); if (!_data || status < 0) { fprintf(stderr, "Cannot create semaphore\n"); status = -1; return; } _data->setValue(0);}MultiPipe::~MultiPipe(){ _data->destroy(); delete _data;}int MultiPipe::AddPipe(DataPipe *pipe){ // See if pipe already in multipipe for(int i = 0; i < _numPipes; i++) { if (_pipes[i] == pipe) return -1; } if (_numPipes >= _maxPipes) return -1; _pipes[_numPipes++] = pipe;#if DEBUGLVL >= 3 printf("Added pipe 0x%p to multipipe 0x%p\n", pipe, this);#endif return 0;}int MultiPipe::RemovePipe(DataPipe *pipe){ for(int i = 0; i < _numPipes; i++) { if (_pipes[i] == pipe) { for(int j = i; j < _numPipes - 1; j++) _pipes[j] = _pipes[j + 1]; _numPipes--; if (_hint >= _numPipes) _hint = 0;#if DEBUGLVL >= 3 printf("Removed pipe 0x%p from multipipe 0x%p\n", pipe, this);#endif return 0; } } return -1;}int MultiPipe::Consume(char *&buf, streampos_t &offset, iosize_t &bytes, DataPipe *&pipe){ for(int j = 0; j < _numPipes; j++) { pipe = _pipes[_hint]; if (pipe->NumData() > 0) {#if DEBUGLVL >= 3 printf("Consuming data from pipe 0x%p of multipipe 0x%p\n", pipe, this);#endif return pipe->Consume(buf, offset, bytes); } _hint++; } // Block on first pipe but change this later once a smarter // synchronization mechanism is designed. pipe = _pipes[_hint];#if DEBUGLVL >= 3 printf("Blocking and consuming data from pipe 0x%p of multipipe 0x%p\n", pipe, this);#endif return pipe->Consume(buf, offset, bytes);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -