📄 dce.c
字号:
}
} else
perror("shmctl");
}
#if defined(SHARED_KEYS)
int SharedMemory::destroyAll()
{
key_t maxKey = ShmKeyTableKey + 500;
if (shmKeyTable)
maxKey = shmKeyTable->shmNextKey + 100;
for(key_t key = ShmKeyTableKey; key < maxKey; key++) {
int id;
if ((id = shmget(key, 0, SHM_R | SHM_W)) < 0) {
if (errno != ENOENT)
perror("shmget");
} else {
#ifdef DEBUG
cerr << "%% Removing shared memory segment " << id << endl;
#endif
if (shmctl(id, IPC_RMID, 0) < 0)
perror("semctl");
}
}
if (shmKeyTable)
shmKeyTable->shmNextKey = SharedMemoryBase;
return 0;
}
key_t SharedMemory::newKey()
{
if (!shmKeyTable)
attachTable();
assert(shmKeyTable);
key_t key = shmKeyTable->shmNextKey++;
#ifdef DEBUG
cerr << "Next shared memory key is " << key << endl;
#endif
return key;
}
#endif
#if defined(PRIVATE_KEYS)
key_t SharedMemory::newKey()
{
return IPC_PRIVATE;
}
#endif
#define RELEASE(sem) sems->release(1, sem)
#define ACQUIRE(sem) sems->acquire(1, sem)
IOBuffer::IOBuffer(int size) :
size(size), sems(0), shmem(0)
{
semKey = Semaphore::newKey();
shmKey = SharedMemory::newKey();
}
IOBuffer::~IOBuffer()
{
delete sems;
delete shmem;
}
void IOBuffer::attach()
{
// attach to semaphores
int status;
sems = new SemaphoreV(semKey, status, 3);
assert(status >= 0);
// attach to shared memory segment
char *buf = 0;
int created = 0;
shmem = new SharedMemory(shmKey, size + 7 * sizeof(int), buf, created);
assert(shmem && buf);
comm = (CommBuffer *)buf;
if (created)
init();
}
void IOBuffer::init()
{
assert(comm && sems);
comm->size = shmem->getSize() - 7 * sizeof(int);
assert(comm->size == size);
comm->bytes = 0;
comm->head = 0;
comm->tail = 0;
comm->atEOF = 0;
comm->wReserve = 0;
comm->rReserve = 0;
sems->setValue(1, lock);
sems->setValue(0, hasData);
sems->setValue(1, hasSpace);
}
int IOBuffer::reserveW(int minUnits, int maxUnits, int bytes,
int &n, char *&buf)
{
if (!minUnits)
minUnits = 1;
if (!maxUnits)
maxUnits = 1;
assert(comm && sems);
assert(minUnits > 0 && maxUnits >= minUnits && bytes > 0);
while(1) {
ACQUIRE(hasSpace); // wait until space available
ACQUIRE(lock);
int space = comm->size - comm->bytes - comm->wReserve;
if (!space) { // false alarm, no space?
RELEASE(lock);
continue;
}
int b = (maxUnits * bytes < space ? maxUnits * bytes : space);
b = (b < comm->size - comm->tail ? b : comm->size - comm->tail);
n = b / bytes;
assert(n > 0);
if (n < minUnits) { // too little space?
#ifdef DEBUG
cerr << "reserveW asking for " << minUnits << " units, got only "
<< n << endl;
#endif
RELEASE(lock);
continue;
}
buf = &comm->buf[comm->tail];
b = n * bytes;
#ifdef DEBUG
cerr << "%% Reserved " << b << " bytes to buffer at "
<< comm->tail << ", now " << comm->wReserve << "/"
<< comm->bytes << " bytes" << endl;
#endif
comm->tail += b;
if (comm->tail >= comm->size)
comm->tail -= comm->size;
assert(comm->tail >= 0 && comm->tail < comm->size);
comm->wReserve += b;
if (comm->bytes + comm->wReserve < comm->size)
RELEASE(hasSpace); // still space left
RELEASE(lock);
break;
}
return 0;
}
int IOBuffer::reserveR(int minUnits, int maxUnits, int bytes,
int &n, char *&buf)
{
if (!minUnits)
minUnits = 1;
if (!maxUnits)
maxUnits = 1;
assert(comm && sems);
assert(minUnits > 0 && maxUnits >= minUnits && bytes > 0);
ACQUIRE(lock);
if (!comm->bytes && comm->atEOF) {
n = 0;
buf = 0;
RELEASE(lock);
return 0;
}
RELEASE(lock);
while(1) {
ACQUIRE(hasData); // wait until data available
ACQUIRE(lock);
int data = comm->bytes - comm->rReserve;
if (!data) { // false alarm, no data?
int eof = comm->atEOF;
RELEASE(lock);
if (eof) {
n = 0;
buf = 0;
break;
}
continue;
}
int b = (maxUnits * bytes < data ? maxUnits * bytes : data);
b = (b < comm->size - comm->head ? b : comm->size - comm->head);
n = b / bytes;
assert(n > 0);
if (!comm->atEOF && n < minUnits) { // too little data?
#ifdef DEBUG
cerr << "reserveR asking for " << minUnits << " units, got only "
<< n << endl;
#endif
RELEASE(lock);
continue;
}
buf = &comm->buf[comm->head];
b = n * bytes;
#ifdef DEBUG
cerr << "%% Reserved " << b << " bytes from buffer at "
<< comm->head << ", now " << comm->rReserve << "/"
<< comm->bytes << " bytes" << endl;
#endif
comm->head += b;
if (comm->head >= comm->size)
comm->head -= comm->size;
assert(comm->head >= 0 && comm->head < comm->size);
comm->rReserve += b;
if (comm->bytes - comm->rReserve > 0)
RELEASE(hasData); // still data left
RELEASE(lock);
break;
}
return 0;
}
int IOBuffer::releaseW(int origUnits, int units, int bytes)
{
assert(origUnits > 0 && units >= 0 && bytes > 0);
ACQUIRE(lock);
assert(comm->wReserve >= origUnits * bytes);
comm->wReserve -= origUnits * bytes; // move bytes from reserved state
comm->bytes += units * bytes; // .. to actual state
#ifdef DEBUG
cerr << "%% Released " << units * bytes << " of "
<< origUnits * bytes << " bytes as written" << endl;
#endif
RELEASE(lock);
RELEASE(hasData); // notify reader that data is available
return 0;
}
int IOBuffer::releaseR(int units, int bytes)
{
if (!units)
return 0;
assert(units > 0 && bytes > 0);
ACQUIRE(lock);
assert(comm->rReserve >= units * bytes);
comm->rReserve -= units * bytes; // move bytes from reserved state
comm->bytes -= units * bytes; // .. to actual state
#ifdef DEBUG
cerr << "%% Released " << units * bytes << " bytes as read" << endl;
#endif
RELEASE(lock);
RELEASE(hasSpace); // notify writer that space exists
return 0;
}
int IOBuffer::write(char *buf, int bytes)
{
assert(comm && sems);
assert(comm->wReserve == 0);
int left = bytes;
while(left > 0) {
ACQUIRE(hasSpace); // wait until space available
ACQUIRE(lock);
int space = comm->size - comm->bytes;
if (!space) { // false alarm, no space?
RELEASE(lock);
continue;
}
int b = (left < space ? left : space);
#ifdef DEBUG
cerr << "%% Writing " << b << " bytes to buffer at "
<< comm->tail << ", now " << comm->bytes << " bytes" << endl;
#endif
int atend = (b < comm->size - comm->tail ? b : comm->size - comm->tail);
int atbeg = b - atend;
if (atend) {
memcpy(&comm->buf[comm->tail], &buf[bytes - left], atend);
comm->tail += atend;
if (comm->tail >= comm->size) // wrap around the buffer?
comm->tail -= comm->size;
}
if (atbeg) {
memcpy(comm->buf, &buf[bytes - left + atend], atbeg);
comm->tail = atbeg;
}
assert(comm->tail >= 0 && comm->tail < comm->size);
left -= b;
comm->bytes += b;
if (comm->bytes < comm->size)
RELEASE(hasSpace); // still space left
RELEASE(lock);
RELEASE(hasData); // notify reader that data is available
}
return bytes;
}
int IOBuffer::read(char *buf, int bytes)
{
assert(comm && sems);
assert(comm->rReserve == 0);
ACQUIRE(lock);
if (!comm->bytes && comm->atEOF) {
RELEASE(lock);
return 0;
}
RELEASE(lock);
int left = bytes;
while(left > 0) {
ACQUIRE(hasData); // wait until data available
ACQUIRE(lock);
int data = comm->bytes;
if (!data) { // false alarm, no data?
int eof = comm->atEOF;
RELEASE(lock);
if (eof)
break;
continue;
}
int b = (left < data ? left : data);
#ifdef DEBUG
cerr << "%% Reading " << b << " bytes from buffer at "
<< comm->head << ", now " << comm->bytes << " bytes" << endl;
#endif
int atend = (b < comm->size - comm->head ? b : comm->size - comm->head);
int atbeg = b - atend;
if (atend) {
memcpy(&buf[bytes - left], &comm->buf[comm->head], atend);
comm->head += atend;
if (comm->head >= comm->size) // wrap around the buffer?
comm->head -= comm->size;
}
if (atbeg) {
memcpy(&buf[bytes - left + atend], comm->buf, atbeg);
comm->head = atbeg;
}
assert(comm->head >= 0 && comm->head < comm->size);
left -= b;
comm->bytes -= b;
if (comm->bytes > 0)
RELEASE(hasData); // still data left
RELEASE(lock);
RELEASE(hasSpace); // notify writer that space exists
}
return bytes - left;
}
void IOBuffer::setEOF()
{
ACQUIRE(lock);
assert(comm->wReserve == 0);
comm->atEOF = 1;
RELEASE(lock);
RELEASE(hasData);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -