⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpcomm.cpp

📁 tinyos2.0版本驱动
💻 CPP
📖 第 1 页 / 共 2 页
字号:
    {        // clear write buffer        writeBuffer.clear();    }    pthread_mutex_unlock( &clientInfo.countlock );    stuffPipe();    DEBUG("TCPComm::removeClient : unlock")}/* helper function to start server pthread */void* checkClientsThread(void* ob){    static_cast<TCPComm*>(ob)->connectClients();    return NULL;}/* checks for new connected clients */void TCPComm::connectClients(){    while (true)    {        int clientFD = accept(serverFD, NULL, NULL);	pthread_testcancel();        if (clientFD >= 0)        {            if (versionCheck(clientFD))            {                addClient(clientFD);            }            else            {                close(clientFD);            }        }        else        {            pthread_testcancel();            cancel();        }    }}/* helper function to start client reader pthread */void* readClientsThread(void* ob){    static_cast<TCPComm*>(ob)->readClients();    return NULL;}/* reads from connected clients */void TCPComm::readClients(){    FD_t clientFDs;    while (true)    {        pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock);        pthread_mutex_lock( &clientInfo.countlock );        while( clientInfo.count == 0 )        {            // do nothing when no client is connected...            DEBUG("TCPComm::readClients : sleeping reader thread")            pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock );        }        // copy set in to temp set        clientFDs = clientInfo.FDs;        // removes the cleanup handler and executes it (unlock mutex)        pthread_cleanup_pop(1);        // check all fds (work with temp set)...        fd_set rfds;        FD_ZERO(&rfds);        int maxFD = pipeReadFD;        FD_SET(pipeReadFD, &rfds);        set<int>::iterator it;        for( it = clientFDs.begin(); it != clientFDs.end(); it++ )        {            if (*it > maxFD)            {                maxFD = *it;            }            FD_SET(*it, &rfds);        }        if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 )        {            //             run = false;            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1);        }        else        {            if(FD_ISSET(pipeReadFD, &rfds)) {                clearPipe();            }            for (it = clientFDs.begin(); it != clientFDs.end(); it++)            {                if (FD_ISSET(*it, &rfds))                {                    SFPacket packet;                    if(readPacket(*it, packet)) {                        // this call blocks until buffer is not full                        readBuffer.enqueueBack(packet);                        ++readPacketCount;                    }                    else {                        DEBUG("TCPComm::readClients : removeClient")                        removeClient(*it);                    }                }            }        }    }}/* helper function to start client writer pthread */void* writeClientsThread(void* ob){    static_cast<TCPComm*>(ob)->writeClients();    return NULL;}/* writes to connected clients */void TCPComm::writeClients(){    FD_t clientFDs;    while (true)    {        pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock);        pthread_mutex_lock( &clientInfo.countlock );        while( clientInfo.count == 0 )        {            // do nothing when no client is connected...            DEBUG("TCPComm::writeClients : sleeping writer thread")            pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock );        }        // removes the cleanup handler and executes it (unlock mutex)        pthread_cleanup_pop(1);         // blocks until buffer is not empty        SFPacket packet = writeBuffer.dequeue();        pthread_testcancel();        pthread_mutex_lock( &clientInfo.countlock );        // copy client fd set into temp set        clientFDs = clientInfo.FDs;        pthread_mutex_unlock( &clientInfo.countlock );        // check all fds (work with temp set)...        set<int>::iterator it;        // duplicate and send out packet to all connected clients        for( it = clientFDs.begin(); it != clientFDs.end(); it++ )        {            if (writePacket(*it, packet))            {                ++writtenPacketCount;            }            else            {                DEBUG("TCPComm::writeClients : removeClient")                removeClient(*it);            }        }    }}/* cancels all running threads */void TCPComm::cancel(){    pthread_t callingThread = pthread_self();    if (pthread_equal(callingThread, readerThread))    {        DEBUG("TCPComm::cancel : by readerThread")        pthread_detach(readerThread);        if (writerThreadRunning)        {            pthread_cancel(writerThread);            DEBUG("TCPComm::cancel : writerThread canceled, joining")            pthread_join(writerThread, NULL);            writerThreadRunning = false;        }        if (serverThreadRunning)        {            pthread_cancel(serverThread);            DEBUG("TCPComm::cancel : serverThread canceled, joining")            pthread_join(serverThread, NULL);            serverThreadRunning = false;        }        readerThreadRunning = false;	pthread_cond_signal(&control.cancel);        pthread_exit(NULL);    }    else if (pthread_equal(callingThread, writerThread))    {        DEBUG("TCPComm::cancel : by writerThread")        pthread_detach(writerThread);        if (readerThreadRunning)        {            pthread_cancel(readerThread);            DEBUG("TCPComm::cancel : readerThread canceled, joining")            pthread_join(readerThread, NULL);            readerThreadRunning = false;        }        if (serverThreadRunning)        {            pthread_cancel(serverThread);            DEBUG("TCPComm::cancel : serverThread canceled, joining")            pthread_join(serverThread, NULL);            serverThreadRunning = false;        }        writerThreadRunning = false;	pthread_cond_signal(&control.cancel);        pthread_exit(NULL);    }    else if (pthread_equal(callingThread, serverThread))    {        DEBUG("TCPComm::cancel : by serverThread")        pthread_detach(serverThread);        if (readerThreadRunning)        {            pthread_cancel(readerThread);            DEBUG("TCPComm::cancel : readerThread canceled, joining")	    pthread_join(readerThread, NULL);            readerThreadRunning = false;        }        if (writerThreadRunning)        {            pthread_cancel(writerThread);            DEBUG("TCPComm::cancel : writerThread canceled, joining")            pthread_join(writerThread, NULL);            writerThreadRunning = false;        }        serverThreadRunning = false;	pthread_cond_signal(&control.cancel);        pthread_exit(NULL);    }    else    {        DEBUG("TCPComm::cancel : by other thread")	if (serverThreadRunning)        {            pthread_cancel(serverThread);            DEBUG("TCPComm::cancel : serverThread canceled, joining")            pthread_join(serverThread, NULL);            serverThreadRunning = false;        } 	if (writerThreadRunning)        {            pthread_cancel(writerThread);            DEBUG("TCPComm::cancel : writerThread canceled, joining")            pthread_join(writerThread, NULL);            writerThreadRunning = false;        }        if (readerThreadRunning)        {            pthread_cancel(readerThread);            DEBUG("TCPComm::cancel : readerThread canceled, joining")            pthread_join(readerThread, NULL);            readerThreadRunning = false;        }	pthread_cond_signal(&control.cancel);    }}/* reports error */int TCPComm::reportError(const char *msg, int result){    if ((result < 0) && (!errorReported))    {        errorMsg << "error : SF-Server (TCPComm on port = " << port << ") : "        << msg << " ( result = " << result << " )" << endl        << "error-description : " << strerror(errno) << endl;        cerr << errorMsg.str();        errorReported = true;        cancel();    }    return result;}/* prints out status */void TCPComm::reportStatus(ostream& os){    os << "SF-Server ( TCPComm on port " << port << " )"    << " : clients = " << clientInfo.count    << " , packets read = " << readPacketCount    << " , packets written = " << writtenPacketCount << endl;}void TCPComm::stuffPipe() {    char info = 'n';    write(pipeWriteFD, &info, 1);}void TCPComm::clearPipe() {    char buf;    while(read(pipeReadFD, &buf, 1) > 0) {        ;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -