⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datasource.c

📁 数据挖掘经典的hierarchial clustering algorithm
💻 C
📖 第 1 页 / 共 2 页
字号:
#else
    if (pipe(_replyFd) < 0) {
        perror("pipe");
        return -1;
    }
#endif

    _totalCount = 0;
    _readBytes = 0;
    _writeBytes = 0;
    _seekBytes = 0;

    _handle = 1;

    _dpipe = new DataPipe(10, status);
    if (!_dpipe || status < 0) {
        fprintf(stderr, "Cannot create data pipe\n");
        return -1;
    }

    DO_DEBUG(printf("Creating data source process/thread...\n"));

#ifdef DS_PROCESS
    fflush(stdout);
    _child = fork();
    if (_child < 0) {
        perror("fork");
        return -1;
    }

    if (!_child) {
        (void)ProcessReq(this);
        close(_reqFd[0]);
        close(_reqFd[1]);
#ifndef SOLARIS
        close(_replyFd[0]);
        close(_replyFd[1]);
#endif
        exit(1);
    }
#endif

#ifdef DS_THREAD
    if (pthread_create(&_child, 0, ProcessReq, this)) {
        perror("pthread_create");
        return -1;
    }
#endif

    DO_DEBUG(printf("Created data source process/thread %ld\n", (long)_child));

    return 0;
}

/*============================================================================*/

//
// Terminate data source process
//

int DataSource::TerminateProc()
{
    if (_child < 0) {
      fprintf(stderr, "Child process/thread no longer exists\n");
      return 0;
    }

    DO_DEBUG(printf("Terminating child process/thread %ld...\n",
                    (long)_child));

    Request req = { TerminateReq, 0, 0 };
    if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {
        perror("write");
    } else {
#ifdef DS_PROCESS
        while(1) {
            int status;
            pid_t child = wait(&status);
            if (child < 0) {
                if (errno == EINTR)
                    continue;
                if (errno != ECHILD) {
                    perror("wait");
                    break;
                }
            } else
                break;
        }
#endif
#ifdef DS_THREAD
        (void)pthread_join(_child, 0);
#endif
    }

    close(_reqFd[0]);
    close(_reqFd[1]);
#ifndef SOLARIS
    close(_replyFd[0]);
    close(_replyFd[1]);
#endif

    delete _dpipe;

    _mutex->destroy();
    delete _mutex;

    _child = -1;

    DO_DEBUG(printf("Child process/thread terminated\n"));

    return 0;
}

/*============================================================================*/

//
// Submit read request to data source process
//

int DataSource::ReadProc(streampos_t offset, iosize_t bytes)
{
    AcquireMutex();

    DO_DEBUG(printf("Submitting read request %llu:%lu to data source 0x%p\n",
                    offset, bytes, this));

    Request req = { ReadReq, offset, bytes };
    if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {
        perror("write");
        ReleaseMutex();
        return -1;
    }

    Reply reply;
    if (readn(_replyFd[0], (char *)&reply, sizeof reply) < (int)sizeof reply) {
        perror("write");
        ReleaseMutex();
        return -1;
    }

    ReleaseMutex();

    return reply.handle;
}

/*============================================================================*/

//
// Submit write request to data source process
//

int DataSource::WriteProc(streampos_t offset, iosize_t bytes)
{
    AcquireMutex();

    DO_DEBUG(printf("Submitting write request %llu:%lu to data source 0x%p\n",
                    offset, bytes, this));

    Request req = { WriteReq, offset, bytes };
    if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {
        perror("write");
        ReleaseMutex();
        return -1;
    }

    Reply reply;
    if (readn(_replyFd[0], (char *)&reply, sizeof reply) < (int)sizeof reply) {
        perror("write");
        ReleaseMutex();
        return -1;
    }

    ReleaseMutex();

    return reply.handle;
}

/*============================================================================*/

//
// Jacket routine for interfacing ProcessReq() to POSIX threads.
//

void *DataSource::ProcessReq(void *arg)
{
    DataSource &me = *(DataSource *)arg;
    return me.ProcessReq();
}

/*============================================================================*/

//
// Read requests from pipe and execute them.
//

void *DataSource::ProcessReq()
{
#if defined(DS_PROCESS) && defined(DEBUG) && 0
    char fname[64];
    sprintf(fname, "source.fd.%d", _reqFd[0]);
    printf("Source 0x%p log file is %s\n", this, fname);
    FILE *out = freopen(fname, "w", stdout);
    if (!out)
        perror("freopen");
#endif

    while (1) {
        Request req;
        int status = readn(_reqFd[0], (char *)&req, sizeof req);
        if (status < (int)sizeof req) {
            perror("read");
            break;
        }

        if (req.type == TerminateReq) {
            DO_DEBUG(printf("\nSource 0x%p received quit command\n", this));
            printf("Data source: %llu requests, read: %.2f MB, write: %.2f MB, seek: %.2f MB\n",
                   _totalCount, _readBytes / 1048576.0,
                   _writeBytes / 1048576.0, _seekBytes / 1048576.0);
            break;
        }

        DO_DEBUG(printf("\nSource 0x%p received request: %d, %llu, %lu\n",
                        this, req.type, req.offset, req.bytes));

        _totalCount++;

        Reply reply = { _handle };
        _handle++;

        status = writen(_replyFd[1], (char *)&reply, sizeof reply);
        if (status < (int)sizeof reply) {
            perror("write");
            break;
        }

        DO_DEBUG(printf("Source 0x%p request acknowledged\n", this));

        if (req.type == ReadReq)
            ReadStream(req.offset, req.bytes);
        else
            WriteStream(req.offset, req.bytes);

        DO_DEBUG(printf("Source 0x%p request completed\n", this));
    }

    DO_DEBUG(printf("Source 0x%p terminates\n", this));

    return 0;
}

/*============================================================================*/

//
// Read data from device and pipe it to the consumer.
//

void DataSource::ReadStream(streampos_t offset, iosize_t totbytes)
{
    DO_DEBUG(printf("Reading bytes %llu:%lu from data source 0x%p\n",
                    offset, totbytes, this));

    streampos_t curpos = Tell();
    int status = Seek(offset, SEEK_SET);
    DOASSERT(status >= 0, "Cannot seek data source");

    // Be careful not to overflow/underflow (unsigned numbers)
    _seekBytes += (curpos > offset ? curpos - offset : offset - curpos);

    iosize_t bytes = 0;

    while (!totbytes || bytes < totbytes) {
        char *page;
        status = MemMgr::Instance()->Allocate(MemMgr::Buffer, page);
        DOASSERT(status >= 0 && page, "Failed to allocate buffer space\n");
        int pageSize = MemMgr::Instance()->PageSize();
        iosize_t reqsize = pageSize;
        if (totbytes > 0 && totbytes - bytes < reqsize)
            reqsize = totbytes - bytes;
        DO_DEBUG(printf("Data source 0x%p reads %lu bytes\n", this, reqsize));
        size_t b = Fread(page, 1, reqsize);
        DO_DEBUG(printf("Data source 0x%p produces bytes %llu:%u\n",
                        this, offset, b));
        status = Produce(page, offset, b);
        DOASSERT(status >= 0, "Cannot produce data");
        offset += b;
        bytes += b;
        _readBytes += b;
        if ((int)b < pageSize)
            break;
    }

    status = Produce(0, offset, 0);
    DOASSERT(status >= 0, "Cannot produce data");
}

/*============================================================================*/

//
// Consume data from pipe and write it to device.
//

void DataSource::WriteStream(streampos_t offset, iosize_t totbytes)
{
    DO_DEBUG(printf("Writing bytes %llu:%lu to data source 0x%p\n",
                    offset, totbytes, this));

    streampos_t curpos = Tell();
    int status = Seek(offset, SEEK_SET);
    DOASSERT(status >= 0, "Cannot seek data source");

    // Be careful not to overflow/underflow (unsigned numbers)
    _seekBytes += (curpos > offset ? curpos - offset : offset - curpos);

    iosize_t bytes = 0;

    while (!totbytes || bytes < totbytes) {
        char *page;
        streampos_t off;
        iosize_t b;
        int status = Consume(page, off, b);
        DOASSERT(status >= 0, "Cannot consume data");
        DO_DEBUG(printf("Data source 0x%p consumes bytes %llu:%lu\n",
                        this, offset, b));
        DOASSERT(off == offset, "Invalid data chunk consumed");
        if (b <= 0)
            continue;
        size_t s = Fwrite(page, 1, b);
        if (s != (size_t)b)
            break;
        status = MemMgr::Instance()->Deallocate(MemMgr::Buffer, page);
        DOASSERT(status >= 0 && page, "Failed to deallocate buffer space\n");
        offset += b;
        bytes += b;
        _writeBytes += b;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -