📄 datasource.c
字号:
#else
if (pipe(_replyFd) < 0) {
perror("pipe");
return -1;
}
#endif
_totalCount = 0;
_readBytes = 0;
_writeBytes = 0;
_seekBytes = 0;
_handle = 1;
_dpipe = new DataPipe(10, status);
if (!_dpipe || status < 0) {
fprintf(stderr, "Cannot create data pipe\n");
return -1;
}
DO_DEBUG(printf("Creating data source process/thread...\n"));
#ifdef DS_PROCESS
fflush(stdout);
_child = fork();
if (_child < 0) {
perror("fork");
return -1;
}
if (!_child) {
(void)ProcessReq(this);
close(_reqFd[0]);
close(_reqFd[1]);
#ifndef SOLARIS
close(_replyFd[0]);
close(_replyFd[1]);
#endif
exit(1);
}
#endif
#ifdef DS_THREAD
if (pthread_create(&_child, 0, ProcessReq, this)) {
perror("pthread_create");
return -1;
}
#endif
DO_DEBUG(printf("Created data source process/thread %ld\n", (long)_child));
return 0;
}
/*============================================================================*/
//
// Terminate data source process
//
int DataSource::TerminateProc()
{
if (_child < 0) {
fprintf(stderr, "Child process/thread no longer exists\n");
return 0;
}
DO_DEBUG(printf("Terminating child process/thread %ld...\n",
(long)_child));
Request req = { TerminateReq, 0, 0 };
if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {
perror("write");
} else {
#ifdef DS_PROCESS
while(1) {
int status;
pid_t child = wait(&status);
if (child < 0) {
if (errno == EINTR)
continue;
if (errno != ECHILD) {
perror("wait");
break;
}
} else
break;
}
#endif
#ifdef DS_THREAD
(void)pthread_join(_child, 0);
#endif
}
close(_reqFd[0]);
close(_reqFd[1]);
#ifndef SOLARIS
close(_replyFd[0]);
close(_replyFd[1]);
#endif
delete _dpipe;
_mutex->destroy();
delete _mutex;
_child = -1;
DO_DEBUG(printf("Child process/thread terminated\n"));
return 0;
}
/*============================================================================*/
//
// Submit read request to data source process
//
int DataSource::ReadProc(streampos_t offset, iosize_t bytes)
{
AcquireMutex();
DO_DEBUG(printf("Submitting read request %llu:%lu to data source 0x%p\n",
offset, bytes, this));
Request req = { ReadReq, offset, bytes };
if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {
perror("write");
ReleaseMutex();
return -1;
}
Reply reply;
if (readn(_replyFd[0], (char *)&reply, sizeof reply) < (int)sizeof reply) {
perror("write");
ReleaseMutex();
return -1;
}
ReleaseMutex();
return reply.handle;
}
/*============================================================================*/
//
// Submit write request to data source process
//
int DataSource::WriteProc(streampos_t offset, iosize_t bytes)
{
AcquireMutex();
DO_DEBUG(printf("Submitting write request %llu:%lu to data source 0x%p\n",
offset, bytes, this));
Request req = { WriteReq, offset, bytes };
if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {
perror("write");
ReleaseMutex();
return -1;
}
Reply reply;
if (readn(_replyFd[0], (char *)&reply, sizeof reply) < (int)sizeof reply) {
perror("write");
ReleaseMutex();
return -1;
}
ReleaseMutex();
return reply.handle;
}
/*============================================================================*/
//
// Jacket routine for interfacing ProcessReq() to POSIX threads.
//
void *DataSource::ProcessReq(void *arg)
{
DataSource &me = *(DataSource *)arg;
return me.ProcessReq();
}
/*============================================================================*/
//
// Read requests from pipe and execute them.
//
void *DataSource::ProcessReq()
{
#if defined(DS_PROCESS) && defined(DEBUG) && 0
char fname[64];
sprintf(fname, "source.fd.%d", _reqFd[0]);
printf("Source 0x%p log file is %s\n", this, fname);
FILE *out = freopen(fname, "w", stdout);
if (!out)
perror("freopen");
#endif
while (1) {
Request req;
int status = readn(_reqFd[0], (char *)&req, sizeof req);
if (status < (int)sizeof req) {
perror("read");
break;
}
if (req.type == TerminateReq) {
DO_DEBUG(printf("\nSource 0x%p received quit command\n", this));
printf("Data source: %llu requests, read: %.2f MB, write: %.2f MB, seek: %.2f MB\n",
_totalCount, _readBytes / 1048576.0,
_writeBytes / 1048576.0, _seekBytes / 1048576.0);
break;
}
DO_DEBUG(printf("\nSource 0x%p received request: %d, %llu, %lu\n",
this, req.type, req.offset, req.bytes));
_totalCount++;
Reply reply = { _handle };
_handle++;
status = writen(_replyFd[1], (char *)&reply, sizeof reply);
if (status < (int)sizeof reply) {
perror("write");
break;
}
DO_DEBUG(printf("Source 0x%p request acknowledged\n", this));
if (req.type == ReadReq)
ReadStream(req.offset, req.bytes);
else
WriteStream(req.offset, req.bytes);
DO_DEBUG(printf("Source 0x%p request completed\n", this));
}
DO_DEBUG(printf("Source 0x%p terminates\n", this));
return 0;
}
/*============================================================================*/
//
// Read data from device and pipe it to the consumer.
//
void DataSource::ReadStream(streampos_t offset, iosize_t totbytes)
{
DO_DEBUG(printf("Reading bytes %llu:%lu from data source 0x%p\n",
offset, totbytes, this));
streampos_t curpos = Tell();
int status = Seek(offset, SEEK_SET);
DOASSERT(status >= 0, "Cannot seek data source");
// Be careful not to overflow/underflow (unsigned numbers)
_seekBytes += (curpos > offset ? curpos - offset : offset - curpos);
iosize_t bytes = 0;
while (!totbytes || bytes < totbytes) {
char *page;
status = MemMgr::Instance()->Allocate(MemMgr::Buffer, page);
DOASSERT(status >= 0 && page, "Failed to allocate buffer space\n");
int pageSize = MemMgr::Instance()->PageSize();
iosize_t reqsize = pageSize;
if (totbytes > 0 && totbytes - bytes < reqsize)
reqsize = totbytes - bytes;
DO_DEBUG(printf("Data source 0x%p reads %lu bytes\n", this, reqsize));
size_t b = Fread(page, 1, reqsize);
DO_DEBUG(printf("Data source 0x%p produces bytes %llu:%u\n",
this, offset, b));
status = Produce(page, offset, b);
DOASSERT(status >= 0, "Cannot produce data");
offset += b;
bytes += b;
_readBytes += b;
if ((int)b < pageSize)
break;
}
status = Produce(0, offset, 0);
DOASSERT(status >= 0, "Cannot produce data");
}
/*============================================================================*/
//
// Consume data from pipe and write it to device.
//
void DataSource::WriteStream(streampos_t offset, iosize_t totbytes)
{
DO_DEBUG(printf("Writing bytes %llu:%lu to data source 0x%p\n",
offset, totbytes, this));
streampos_t curpos = Tell();
int status = Seek(offset, SEEK_SET);
DOASSERT(status >= 0, "Cannot seek data source");
// Be careful not to overflow/underflow (unsigned numbers)
_seekBytes += (curpos > offset ? curpos - offset : offset - curpos);
iosize_t bytes = 0;
while (!totbytes || bytes < totbytes) {
char *page;
streampos_t off;
iosize_t b;
int status = Consume(page, off, b);
DOASSERT(status >= 0, "Cannot consume data");
DO_DEBUG(printf("Data source 0x%p consumes bytes %llu:%lu\n",
this, offset, b));
DOASSERT(off == offset, "Invalid data chunk consumed");
if (b <= 0)
continue;
size_t s = Fwrite(page, 1, b);
if (s != (size_t)b)
break;
status = MemMgr::Instance()->Deallocate(MemMgr::Buffer, page);
DOASSERT(status >= 0 && page, "Failed to deallocate buffer space\n");
offset += b;
bytes += b;
_writeBytes += b;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -