📄 datasource.c
字号:
_replyFd[1] = _reqFd[0];#else if (pipe(_replyFd) < 0) { perror("pipe"); return -1; }#endif _totalCount = 0; _readBytes = 0; _writeBytes = 0; _seekBytes = 0; _handle = 1; _dpipe = new DataPipe(10, status); if (!_dpipe || status < 0) { fprintf(stderr, "Cannot create data pipe\n"); return -1; } DO_DEBUG(printf("Creating data source process/thread...\n"));#ifdef DS_PROCESS fflush(stdout); _child = fork(); if (_child < 0) { perror("fork"); return -1; } if (!_child) { (void)ProcessReq(this); close(_reqFd[0]); close(_reqFd[1]);#ifndef SOLARIS close(_replyFd[0]); close(_replyFd[1]);#endif exit(1); }#endif#ifdef DS_THREAD if (pthread_create(&_child, 0, ProcessReq, this)) { perror("pthread_create"); return -1; }#endif DO_DEBUG(printf("Created data source process/thread %ld\n", (long)_child)); return 0;}/*============================================================================*///// Terminate data source process//int DataSource::TerminateProc(){ if (_child < 0) { fprintf(stderr, "Child process/thread no longer exists\n"); return 0; } DO_DEBUG(printf("Terminating child process/thread %ld...\n", (long)_child)); Request req = { TerminateReq, 0, 0 }; if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) { perror("write"); } else {#ifdef DS_PROCESS while(1) { int status; pid_t child = wait(&status); if (child < 0) { if (errno == EINTR) continue; if (errno != ECHILD) { perror("wait"); break; } } else break; }#endif#ifdef DS_THREAD (void)pthread_join(_child, 0);#endif } close(_reqFd[0]); close(_reqFd[1]);#ifndef SOLARIS close(_replyFd[0]); close(_replyFd[1]);#endif delete _dpipe; _mutex->destroy(); delete _mutex; _child = -1; DO_DEBUG(printf("Child process/thread terminated\n")); return 0;} /*============================================================================*///// Submit read request to data source process//int DataSource::ReadProc(streampos_t offset, iosize_t bytes){ AcquireMutex(); DO_DEBUG(printf("Submitting read request %llu:%lu to data source 0x%p\n", offset, bytes, this)); Request req = { ReadReq, offset, bytes }; if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) { perror("write"); ReleaseMutex(); return -1; } Reply reply; if (readn(_replyFd[0], (char *)&reply, sizeof reply) < (int)sizeof reply) { perror("write"); ReleaseMutex(); return -1; } ReleaseMutex(); return reply.handle;}/*============================================================================*///// Submit write request to data source process//int DataSource::WriteProc(streampos_t offset, iosize_t bytes){ AcquireMutex(); DO_DEBUG(printf("Submitting write request %llu:%lu to data source 0x%p\n", offset, bytes, this)); Request req = { WriteReq, offset, bytes }; if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) { perror("write"); ReleaseMutex(); return -1; } Reply reply; if (readn(_replyFd[0], (char *)&reply, sizeof reply) < (int)sizeof reply) { perror("write"); ReleaseMutex(); return -1; } ReleaseMutex(); return reply.handle;}/*============================================================================*///// Jacket routine for interfacing ProcessReq() to POSIX threads.//void *DataSource::ProcessReq(void *arg){ DataSource &me = *(DataSource *)arg; return me.ProcessReq();}/*============================================================================*///// Read requests from pipe and execute them.//void *DataSource::ProcessReq(){#if defined(DS_PROCESS) && defined(DEBUG) && 0 char fname[64]; sprintf(fname, "source.fd.%d", _reqFd[0]); printf("Source 0x%p log file is %s\n", this, fname); FILE *out = freopen(fname, "w", stdout); if (!out) perror("freopen");#endif while (1) { Request req; int status = readn(_reqFd[0], (char *)&req, sizeof req); if (status < (int)sizeof req) { perror("read"); break; } if (req.type == TerminateReq) { DO_DEBUG(printf("\nSource 0x%p received quit command\n", this)); printf("Data source: %llu requests, read: %.2f MB, write: %.2f MB, seek: %.2f MB\n", _totalCount, _readBytes / 1048576.0, _writeBytes / 1048576.0, _seekBytes / 1048576.0); break; } DO_DEBUG(printf("\nSource 0x%p received request: %d, %llu, %lu\n", this, req.type, req.offset, req.bytes)); _totalCount++; Reply reply = { _handle }; _handle++; status = writen(_replyFd[1], (char *)&reply, sizeof reply); if (status < (int)sizeof reply) { perror("write"); break; } DO_DEBUG(printf("Source 0x%p request acknowledged\n", this)); if (req.type == ReadReq) ReadStream(req.offset, req.bytes); else WriteStream(req.offset, req.bytes); DO_DEBUG(printf("Source 0x%p request completed\n", this)); } DO_DEBUG(printf("Source 0x%p terminates\n", this)); return 0;}/*============================================================================*///// Read data from device and pipe it to the consumer.//void DataSource::ReadStream(streampos_t offset, iosize_t totbytes){ DO_DEBUG(printf("Reading bytes %llu:%lu from data source 0x%p\n", offset, totbytes, this)); streampos_t curpos = Tell(); int status = Seek(offset, SEEK_SET); DOASSERT(status >= 0, "Cannot seek data source"); // Be careful not to overflow/underflow (unsigned numbers) _seekBytes += (curpos > offset ? curpos - offset : offset - curpos); iosize_t bytes = 0; while (!totbytes || bytes < totbytes) { char *page; status = MemMgr::Instance()->Allocate(MemMgr::Buffer, page); DOASSERT(status >= 0 && page, "Failed to allocate buffer space\n"); int pageSize = MemMgr::Instance()->PageSize(); iosize_t reqsize = pageSize; if (totbytes > 0 && totbytes - bytes < reqsize) reqsize = totbytes - bytes; DO_DEBUG(printf("Data source 0x%p reads %lu bytes\n", this, reqsize)); size_t b = Fread(page, 1, reqsize); DO_DEBUG(printf("Data source 0x%p produces bytes %llu:%u\n", this, offset, b)); status = Produce(page, offset, b); DOASSERT(status >= 0, "Cannot produce data"); offset += b; bytes += b; _readBytes += b; if ((int)b < pageSize) break; } status = Produce(0, offset, 0); DOASSERT(status >= 0, "Cannot produce data");}/*============================================================================*///// Consume data from pipe and write it to device.//void DataSource::WriteStream(streampos_t offset, iosize_t totbytes){ DO_DEBUG(printf("Writing bytes %llu:%lu to data source 0x%p\n", offset, totbytes, this)); streampos_t curpos = Tell(); int status = Seek(offset, SEEK_SET); DOASSERT(status >= 0, "Cannot seek data source"); // Be careful not to overflow/underflow (unsigned numbers) _seekBytes += (curpos > offset ? curpos - offset : offset - curpos); iosize_t bytes = 0; while (!totbytes || bytes < totbytes) { char *page; streampos_t off; iosize_t b; int status = Consume(page, off, b); DOASSERT(status >= 0, "Cannot consume data"); DO_DEBUG(printf("Data source 0x%p consumes bytes %llu:%lu\n", this, offset, b)); DOASSERT(off == offset, "Invalid data chunk consumed"); if (b <= 0) continue; size_t s = Fwrite(page, 1, b); if (s != (size_t)b) break; status = MemMgr::Instance()->Deallocate(MemMgr::Buffer, page); DOASSERT(status >= 0 && page, "Failed to deallocate buffer space\n"); offset += b; bytes += b; _writeBytes += b; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -