📄 memmgr.c
字号:
printf("Now %d buffer pages remain in use\n", _count->buffer);
#endif
} else {
_count->cache -= pages;
#if DEBUGLVL >= 3
printf("Now %d cache pages remain in use\n", _count->cache);
#endif
}
#if DEBUGLVL >= 5
printf("Memory allocation table now:\n");
Dump();
#endif
DOASSERT(_count->cache >= 0 && _count->buffer >= 0, "Inconsistent state");
// If someone is waiting for a free page, signal it
if (_count->free == pages)
ReleaseFree();
ReleaseMutex();
return 0;
}
int MemMgr::Convert(char *page, PageType oldType, PageType &newType)
{
AcquireMutex();
page = page;
if (oldType != newType) {
if (oldType == Cache) {
_count->cache--;
_count->buffer++;
} else {
_count->cache++;
_count->buffer--;
}
#if DEBUGLVL >= 3
printf("Memory manager now has %d cache pages, %d buffer pages\n",
_count->cache, _count->buffer);
#endif
}
ReleaseMutex();
return 0;
}
void MemMgr::Dump()
{
printf("Free %d, buffer %d (%d max), cache %d (%d max), entries %d\n",
_count->free, _count->buffer, _count->maxBuffer,
_count->cache, _count->maxCache, _count->entries);
for(int i = 0; i < (_tableSize > 10 ? 10 : _tableSize); i++) {
printf("memory[%d] = 0x%p, %d pages\n", i, _freePage[i],
_freePageCount[i]);
}
}
DataPipe::DataPipe(int maxSize, int &status)
{
status = Initialize(maxSize);
}
int DataPipe::Initialize(int maxSize)
{
#if DEBUGLVL >= 1
printf("Creating new data pipe, maximum size %d\n", maxSize);
#endif
if (maxSize < 2) {
maxSize = 2;
fprintf(stderr, "Adjusting maximum size to %d\n", maxSize);
}
int status;
_sem = new SemaphoreV(Semaphore::newKey(), status, 1);
if (!_sem || status < 0) {
fprintf(stderr, "Cannot create semaphore\n");
return -1;
}
_sem->setValue(1);
_free = new SemaphoreV(Semaphore::newKey(), status, 1);
if (!_free || status < 0) {
fprintf(stderr, "Cannot create semaphore\n");
return -1;
}
_free->setValue(0);
_data = new SemaphoreV(Semaphore::newKey(), status, 1);
if (!_data || status < 0) {
fprintf(stderr, "Cannot create semaphore\n");
return -1;
}
_data->setValue(0);
int size = maxSize * (sizeof(char *) + sizeof(streampos_t)
+ sizeof(iosize_t)) + sizeof(CountStruct);
key_t _shmKey = SharedMemory::newKey();
int created = 0;
char *buf;
_shm = new SharedMemory(_shmKey, size, buf, created);
if (!_shm) {
fprintf(stderr, "Cannot create shared memory\n");
return -1;
}
if (!created)
printf("Warning: pre-existing shared memory initialized\n");
#if DEBUGLVL >= 1
printf("Created a %d-byte shared memory segment (key %d) at 0x%p\n",
size, _shmKey, buf);
#endif
if (!buf) {
fprintf(stderr, "Failed to get shared memory\n");
return -1;
}
_chunk = (char **)buf;
_offset = (streampos_t *)(_chunk + maxSize);
_bytes = (iosize_t *)(_offset + maxSize);
_count = (CountStruct *)(_bytes + maxSize);
for(int i = 0; i < maxSize; i++) {
_chunk[i] = 0;
_offset[i] = 0;
_bytes[i] = 0;
}
_count->head = 0;
_count->tail = 0;
_count->free = maxSize;
_count->size = maxSize;
_count->maxSize = maxSize;
return 0;
}
DataPipe::~DataPipe()
{
delete _shm;
_data->destroy();
delete _data;
_free->destroy();
delete _free;
_sem->destroy();
delete _sem;
}
int DataPipe::Produce(char *buf, streampos_t offset, iosize_t bytes)
{
AcquireMutex();
// Free count may be negative if pipe size was recently reduced.
while (_count->free <= 0) {
#if DEBUGLVL >= 3
printf("Pipe 0x%p has to wait for consumer's free space (%d)\n",
this, _count->free);
#endif
ReleaseMutex();
AcquireFree();
AcquireMutex();
}
DOASSERT(!_chunk[_count->head], "Inconsistent state");
_chunk[_count->head] = buf;
_offset[_count->head] = offset;
_bytes[_count->head] = bytes;
_count->head = (_count->head + 1) % _count->maxSize;
_count->free--;
#if DEBUGLVL >= 3
printf("Pipe 0x%p has %d free space left\n", this, _count->free);
#endif
if (_count->free == _count->size - 1) {
#if DEBUGLVL >= 3
printf("Pipe 0x%p signaling consumer about new data (%d,%d)\n",
this, _count->free, _count->size);
#endif
ReleaseData();
}
ReleaseMutex();
return 0;
}
int DataPipe::Consume(char *&buf, streampos_t &offset, iosize_t &bytes)
{
AcquireMutex();
while (_count->free == _count->size) {
#if DEBUGLVL >= 3
printf("Pipe 0x%p has to wait for producer's data (%d,%d)\n",
this, _count->free, _count->size);
#endif
ReleaseMutex();
AcquireData();
AcquireMutex();
}
buf = _chunk[_count->tail];
offset = _offset[_count->tail];
bytes = _bytes[_count->tail];
_chunk[_count->tail] = 0;
_offset[_count->tail] = 0;
_bytes[_count->tail] = 0;
_count->tail = (_count->tail + 1) % _count->maxSize;
_count->free++;
#if DEBUGLVL >= 3
printf("Pipe 0x%p has %d free space left\n", this, _count->free);
#endif
if (_count->free == 1) {
#if DEBUGLVL >= 3
printf("Pipe 0x%p signaling producer about free space\n", this);
#endif
ReleaseFree();
}
ReleaseMutex();
return 0;
}
int DataPipe::SetSize(int size)
{
AcquireMutex();
if (size < 2) {
size = 2;
fprintf(stderr, "Adjusting pipe size to %d\n", size);
}
if (size >= _count->maxSize) {
fprintf(stderr, "Invalid pipe size %d reduced to %d\n",
size, _count->maxSize);
size = _count->maxSize;
}
// The following adjustment to free count may leave if negative.
// That's fine, and just means that Produce() will not be able
// to append data to the pipe until that many chunks have been
// consumed from the pipe.
_count->free += size - _count->size;
_count->size = size;
ReleaseMutex();
return size;
}
MultiPipe::MultiPipe(int &status) : _numPipes(0), _hint(0)
{
_data = new SemaphoreV(Semaphore::newKey(), status, 1);
if (!_data || status < 0) {
fprintf(stderr, "Cannot create semaphore\n");
status = -1;
return;
}
_data->setValue(0);
}
MultiPipe::~MultiPipe()
{
_data->destroy();
delete _data;
}
int MultiPipe::AddPipe(DataPipe *pipe)
{
// See if pipe already in multipipe
for(int i = 0; i < _numPipes; i++) {
if (_pipes[i] == pipe)
return -1;
}
if (_numPipes >= _maxPipes)
return -1;
_pipes[_numPipes++] = pipe;
#if DEBUGLVL >= 3
printf("Added pipe 0x%p to multipipe 0x%p\n", pipe, this);
#endif
return 0;
}
int MultiPipe::RemovePipe(DataPipe *pipe)
{
for(int i = 0; i < _numPipes; i++) {
if (_pipes[i] == pipe) {
for(int j = i; j < _numPipes - 1; j++)
_pipes[j] = _pipes[j + 1];
_numPipes--;
if (_hint >= _numPipes)
_hint = 0;
#if DEBUGLVL >= 3
printf("Removed pipe 0x%p from multipipe 0x%p\n", pipe, this);
#endif
return 0;
}
}
return -1;
}
int MultiPipe::Consume(char *&buf, streampos_t &offset,
iosize_t &bytes, DataPipe *&pipe)
{
for(int j = 0; j < _numPipes; j++) {
pipe = _pipes[_hint];
if (pipe->NumData() > 0) {
#if DEBUGLVL >= 3
printf("Consuming data from pipe 0x%p of multipipe 0x%p\n",
pipe, this);
#endif
return pipe->Consume(buf, offset, bytes);
}
_hint++;
}
// Block on first pipe but change this later once a smarter
// synchronization mechanism is designed.
pipe = _pipes[_hint];
#if DEBUGLVL >= 3
printf("Blocking and consuming data from pipe 0x%p of multipipe 0x%p\n",
pipe, this);
#endif
return pipe->Consume(buf, offset, bytes);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -