📄 dce.c
字号:
} } else perror("shmctl");}#if defined(SHARED_KEYS)int SharedMemory::destroyAll(){ key_t maxKey = ShmKeyTableKey + 500; if (shmKeyTable) maxKey = shmKeyTable->shmNextKey + 100; for(key_t key = ShmKeyTableKey; key < maxKey; key++) { int id; if ((id = shmget(key, 0, SHM_R | SHM_W)) < 0) { if (errno != ENOENT) perror("shmget"); } else {#ifdef DEBUG cerr << "%% Removing shared memory segment " << id << endl;#endif if (shmctl(id, IPC_RMID, 0) < 0) perror("semctl"); } } if (shmKeyTable) shmKeyTable->shmNextKey = SharedMemoryBase; return 0;}key_t SharedMemory::newKey(){ if (!shmKeyTable) attachTable(); assert(shmKeyTable); key_t key = shmKeyTable->shmNextKey++;#ifdef DEBUG cerr << "Next shared memory key is " << key << endl;#endif return key;}#endif#if defined(PRIVATE_KEYS)key_t SharedMemory::newKey(){ return IPC_PRIVATE;}#endif#define RELEASE(sem) sems->release(1, sem)#define ACQUIRE(sem) sems->acquire(1, sem)IOBuffer::IOBuffer(int size) : size(size), sems(0), shmem(0){ semKey = Semaphore::newKey(); shmKey = SharedMemory::newKey();}IOBuffer::~IOBuffer(){ delete sems; delete shmem;}void IOBuffer::attach(){ // attach to semaphores int status; sems = new SemaphoreV(semKey, status, 3); assert(status >= 0); // attach to shared memory segment char *buf = 0; int created = 0; shmem = new SharedMemory(shmKey, size + 7 * sizeof(int), buf, created); assert(shmem && buf); comm = (CommBuffer *)buf; if (created) init();}void IOBuffer::init(){ assert(comm && sems); comm->size = shmem->getSize() - 7 * sizeof(int); assert(comm->size == size); comm->bytes = 0; comm->head = 0; comm->tail = 0; comm->atEOF = 0; comm->wReserve = 0; comm->rReserve = 0; sems->setValue(1, lock); sems->setValue(0, hasData); sems->setValue(1, hasSpace);}int IOBuffer::reserveW(int minUnits, int maxUnits, int bytes, int &n, char *&buf){ if (!minUnits) minUnits = 1; if (!maxUnits) maxUnits = 1; assert(comm && sems); assert(minUnits > 0 && maxUnits >= minUnits && bytes > 0); while(1) { ACQUIRE(hasSpace); // wait until space available ACQUIRE(lock); int space = comm->size - comm->bytes - comm->wReserve; if (!space) { // false alarm, no space? RELEASE(lock); continue; } int b = (maxUnits * bytes < space ? maxUnits * bytes : space); b = (b < comm->size - comm->tail ? b : comm->size - comm->tail); n = b / bytes; assert(n > 0); if (n < minUnits) { // too little space?#ifdef DEBUG cerr << "reserveW asking for " << minUnits << " units, got only " << n << endl;#endif RELEASE(lock); continue; } buf = &comm->buf[comm->tail]; b = n * bytes;#ifdef DEBUG cerr << "%% Reserved " << b << " bytes to buffer at " << comm->tail << ", now " << comm->wReserve << "/" << comm->bytes << " bytes" << endl;#endif comm->tail += b; if (comm->tail >= comm->size) comm->tail -= comm->size; assert(comm->tail >= 0 && comm->tail < comm->size); comm->wReserve += b; if (comm->bytes + comm->wReserve < comm->size) RELEASE(hasSpace); // still space left RELEASE(lock); break; } return 0;}int IOBuffer::reserveR(int minUnits, int maxUnits, int bytes, int &n, char *&buf){ if (!minUnits) minUnits = 1; if (!maxUnits) maxUnits = 1; assert(comm && sems); assert(minUnits > 0 && maxUnits >= minUnits && bytes > 0); ACQUIRE(lock); if (!comm->bytes && comm->atEOF) { n = 0; buf = 0; RELEASE(lock); return 0; } RELEASE(lock); while(1) { ACQUIRE(hasData); // wait until data available ACQUIRE(lock); int data = comm->bytes - comm->rReserve; if (!data) { // false alarm, no data? int eof = comm->atEOF; RELEASE(lock); if (eof) { n = 0; buf = 0; break; } continue; } int b = (maxUnits * bytes < data ? maxUnits * bytes : data); b = (b < comm->size - comm->head ? b : comm->size - comm->head); n = b / bytes; assert(n > 0); if (!comm->atEOF && n < minUnits) { // too little data?#ifdef DEBUG cerr << "reserveR asking for " << minUnits << " units, got only " << n << endl;#endif RELEASE(lock); continue; } buf = &comm->buf[comm->head]; b = n * bytes;#ifdef DEBUG cerr << "%% Reserved " << b << " bytes from buffer at " << comm->head << ", now " << comm->rReserve << "/" << comm->bytes << " bytes" << endl;#endif comm->head += b; if (comm->head >= comm->size) comm->head -= comm->size; assert(comm->head >= 0 && comm->head < comm->size); comm->rReserve += b; if (comm->bytes - comm->rReserve > 0) RELEASE(hasData); // still data left RELEASE(lock); break; } return 0;}int IOBuffer::releaseW(int origUnits, int units, int bytes){ assert(origUnits > 0 && units >= 0 && bytes > 0); ACQUIRE(lock); assert(comm->wReserve >= origUnits * bytes); comm->wReserve -= origUnits * bytes; // move bytes from reserved state comm->bytes += units * bytes; // .. to actual state#ifdef DEBUG cerr << "%% Released " << units * bytes << " of " << origUnits * bytes << " bytes as written" << endl;#endif RELEASE(lock); RELEASE(hasData); // notify reader that data is available return 0;}int IOBuffer::releaseR(int units, int bytes){ if (!units) return 0; assert(units > 0 && bytes > 0); ACQUIRE(lock); assert(comm->rReserve >= units * bytes); comm->rReserve -= units * bytes; // move bytes from reserved state comm->bytes -= units * bytes; // .. to actual state#ifdef DEBUG cerr << "%% Released " << units * bytes << " bytes as read" << endl;#endif RELEASE(lock); RELEASE(hasSpace); // notify writer that space exists return 0;}int IOBuffer::write(char *buf, int bytes){ assert(comm && sems); assert(comm->wReserve == 0); int left = bytes; while(left > 0) { ACQUIRE(hasSpace); // wait until space available ACQUIRE(lock); int space = comm->size - comm->bytes; if (!space) { // false alarm, no space? RELEASE(lock); continue; } int b = (left < space ? left : space);#ifdef DEBUG cerr << "%% Writing " << b << " bytes to buffer at " << comm->tail << ", now " << comm->bytes << " bytes" << endl;#endif int atend = (b < comm->size - comm->tail ? b : comm->size - comm->tail); int atbeg = b - atend; if (atend) { memcpy(&comm->buf[comm->tail], &buf[bytes - left], atend); comm->tail += atend; if (comm->tail >= comm->size) // wrap around the buffer? comm->tail -= comm->size; } if (atbeg) { memcpy(comm->buf, &buf[bytes - left + atend], atbeg); comm->tail = atbeg; } assert(comm->tail >= 0 && comm->tail < comm->size); left -= b; comm->bytes += b; if (comm->bytes < comm->size) RELEASE(hasSpace); // still space left RELEASE(lock); RELEASE(hasData); // notify reader that data is available } return bytes;}int IOBuffer::read(char *buf, int bytes){ assert(comm && sems); assert(comm->rReserve == 0); ACQUIRE(lock); if (!comm->bytes && comm->atEOF) { RELEASE(lock); return 0; } RELEASE(lock); int left = bytes; while(left > 0) { ACQUIRE(hasData); // wait until data available ACQUIRE(lock); int data = comm->bytes; if (!data) { // false alarm, no data? int eof = comm->atEOF; RELEASE(lock); if (eof) break; continue; } int b = (left < data ? left : data);#ifdef DEBUG cerr << "%% Reading " << b << " bytes from buffer at " << comm->head << ", now " << comm->bytes << " bytes" << endl;#endif int atend = (b < comm->size - comm->head ? b : comm->size - comm->head); int atbeg = b - atend; if (atend) { memcpy(&buf[bytes - left], &comm->buf[comm->head], atend); comm->head += atend; if (comm->head >= comm->size) // wrap around the buffer? comm->head -= comm->size; } if (atbeg) { memcpy(&buf[bytes - left + atend], comm->buf, atbeg); comm->head = atbeg; } assert(comm->head >= 0 && comm->head < comm->size); left -= b; comm->bytes -= b; if (comm->bytes > 0) RELEASE(hasData); // still data left RELEASE(lock); RELEASE(hasSpace); // notify writer that space exists } return bytes - left;}void IOBuffer::setEOF(){ ACQUIRE(lock); assert(comm->wReserve == 0); comm->atEOF = 1; RELEASE(lock); RELEASE(hasData);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -