⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datasource.c

📁 Solaris环境下的数据挖掘算法:birch聚类算法。该算法适用于对大量数据的挖掘。
💻 C
📖 第 1 页 / 共 2 页
字号:
    _replyFd[1] = _reqFd[0];#else    if (pipe(_replyFd) < 0) {        perror("pipe");        return -1;    }#endif    _totalCount = 0;    _readBytes = 0;    _writeBytes = 0;    _seekBytes = 0;    _handle = 1;    _dpipe = new DataPipe(10, status);    if (!_dpipe || status < 0) {        fprintf(stderr, "Cannot create data pipe\n");        return -1;    }    DO_DEBUG(printf("Creating data source process/thread...\n"));#ifdef DS_PROCESS    fflush(stdout);    _child = fork();    if (_child < 0) {        perror("fork");        return -1;    }    if (!_child) {        (void)ProcessReq(this);        close(_reqFd[0]);        close(_reqFd[1]);#ifndef SOLARIS        close(_replyFd[0]);        close(_replyFd[1]);#endif        exit(1);    }#endif#ifdef DS_THREAD    if (pthread_create(&_child, 0, ProcessReq, this)) {        perror("pthread_create");        return -1;    }#endif    DO_DEBUG(printf("Created data source process/thread %ld\n", (long)_child));    return 0;}/*============================================================================*///// Terminate data source process//int DataSource::TerminateProc(){    if (_child < 0) {      fprintf(stderr, "Child process/thread no longer exists\n");      return 0;    }    DO_DEBUG(printf("Terminating child process/thread %ld...\n",                    (long)_child));    Request req = { TerminateReq, 0, 0 };    if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {        perror("write");    } else {#ifdef DS_PROCESS        while(1) {            int status;            pid_t child = wait(&status);            if (child < 0) {                if (errno == EINTR)                    continue;                if (errno != ECHILD) {                    perror("wait");                    break;                }            } else                break;        }#endif#ifdef DS_THREAD        (void)pthread_join(_child, 0);#endif    }    close(_reqFd[0]);    close(_reqFd[1]);#ifndef SOLARIS    close(_replyFd[0]);    close(_replyFd[1]);#endif    delete _dpipe;    _mutex->destroy();    delete _mutex;    _child = -1;    DO_DEBUG(printf("Child process/thread terminated\n"));    return 0;}    /*============================================================================*///// Submit read request to data source process//int DataSource::ReadProc(streampos_t offset, iosize_t bytes){    AcquireMutex();    DO_DEBUG(printf("Submitting read request %llu:%lu to data source 0x%p\n",                    offset, bytes, this));    Request req = { ReadReq, offset, bytes };    if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {        perror("write");        ReleaseMutex();        return -1;    }    Reply reply;    if (readn(_replyFd[0], (char *)&reply, sizeof reply) < (int)sizeof reply) {        perror("write");        ReleaseMutex();        return -1;    }    ReleaseMutex();    return reply.handle;}/*============================================================================*///// Submit write request to data source process//int DataSource::WriteProc(streampos_t offset, iosize_t bytes){    AcquireMutex();    DO_DEBUG(printf("Submitting write request %llu:%lu to data source 0x%p\n",                    offset, bytes, this));    Request req = { WriteReq, offset, bytes };    if (writen(_reqFd[1], (char *)&req, sizeof req) < (int)sizeof req) {        perror("write");        ReleaseMutex();        return -1;    }    Reply reply;    if (readn(_replyFd[0], (char *)&reply, sizeof reply) < (int)sizeof reply) {        perror("write");        ReleaseMutex();        return -1;    }    ReleaseMutex();    return reply.handle;}/*============================================================================*///// Jacket routine for interfacing ProcessReq() to POSIX threads.//void *DataSource::ProcessReq(void *arg){    DataSource &me = *(DataSource *)arg;    return me.ProcessReq();}/*============================================================================*///// Read requests from pipe and execute them.//void *DataSource::ProcessReq(){#if defined(DS_PROCESS) && defined(DEBUG) && 0    char fname[64];    sprintf(fname, "source.fd.%d", _reqFd[0]);    printf("Source 0x%p log file is %s\n", this, fname);    FILE *out = freopen(fname, "w", stdout);    if (!out)        perror("freopen");#endif    while (1) {        Request req;        int status = readn(_reqFd[0], (char *)&req, sizeof req);        if (status < (int)sizeof req) {            perror("read");            break;        }        if (req.type == TerminateReq) {            DO_DEBUG(printf("\nSource 0x%p received quit command\n", this));            printf("Data source: %llu requests, read: %.2f MB, write: %.2f MB, seek: %.2f MB\n",                   _totalCount, _readBytes / 1048576.0,                   _writeBytes / 1048576.0, _seekBytes / 1048576.0);            break;        }        DO_DEBUG(printf("\nSource 0x%p received request: %d, %llu, %lu\n",                        this, req.type, req.offset, req.bytes));        _totalCount++;        Reply reply = { _handle };        _handle++;        status = writen(_replyFd[1], (char *)&reply, sizeof reply);        if (status < (int)sizeof reply) {            perror("write");            break;        }        DO_DEBUG(printf("Source 0x%p request acknowledged\n", this));        if (req.type == ReadReq)            ReadStream(req.offset, req.bytes);        else            WriteStream(req.offset, req.bytes);        DO_DEBUG(printf("Source 0x%p request completed\n", this));    }    DO_DEBUG(printf("Source 0x%p terminates\n", this));    return 0;}/*============================================================================*///// Read data from device and pipe it to the consumer.//void DataSource::ReadStream(streampos_t offset, iosize_t totbytes){    DO_DEBUG(printf("Reading bytes %llu:%lu from data source 0x%p\n",                    offset, totbytes, this));    streampos_t curpos = Tell();    int status = Seek(offset, SEEK_SET);    DOASSERT(status >= 0, "Cannot seek data source");    // Be careful not to overflow/underflow (unsigned numbers)    _seekBytes += (curpos > offset ? curpos - offset : offset - curpos);    iosize_t bytes = 0;    while (!totbytes || bytes < totbytes) {        char *page;        status = MemMgr::Instance()->Allocate(MemMgr::Buffer, page);        DOASSERT(status >= 0 && page, "Failed to allocate buffer space\n");        int pageSize = MemMgr::Instance()->PageSize();        iosize_t reqsize = pageSize;        if (totbytes > 0 && totbytes - bytes < reqsize)            reqsize = totbytes - bytes;        DO_DEBUG(printf("Data source 0x%p reads %lu bytes\n", this, reqsize));        size_t b = Fread(page, 1, reqsize);        DO_DEBUG(printf("Data source 0x%p produces bytes %llu:%u\n",                        this, offset, b));        status = Produce(page, offset, b);        DOASSERT(status >= 0, "Cannot produce data");        offset += b;        bytes += b;        _readBytes += b;        if ((int)b < pageSize)            break;    }    status = Produce(0, offset, 0);    DOASSERT(status >= 0, "Cannot produce data");}/*============================================================================*///// Consume data from pipe and write it to device.//void DataSource::WriteStream(streampos_t offset, iosize_t totbytes){    DO_DEBUG(printf("Writing bytes %llu:%lu to data source 0x%p\n",                    offset, totbytes, this));    streampos_t curpos = Tell();    int status = Seek(offset, SEEK_SET);    DOASSERT(status >= 0, "Cannot seek data source");    // Be careful not to overflow/underflow (unsigned numbers)    _seekBytes += (curpos > offset ? curpos - offset : offset - curpos);    iosize_t bytes = 0;    while (!totbytes || bytes < totbytes) {        char *page;        streampos_t off;        iosize_t b;        int status = Consume(page, off, b);        DOASSERT(status >= 0, "Cannot consume data");        DO_DEBUG(printf("Data source 0x%p consumes bytes %llu:%lu\n",                        this, offset, b));        DOASSERT(off == offset, "Invalid data chunk consumed");        if (b <= 0)            continue;        size_t s = Fwrite(page, 1, b);        if (s != (size_t)b)            break;        status = MemMgr::Instance()->Deallocate(MemMgr::Buffer, page);        DOASSERT(status >= 0 && page, "Failed to deallocate buffer space\n");        offset += b;        bytes += b;        _writeBytes += b;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -