⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpcomm.cpp

📁 tinyos-2.x.rar
💻 CPP
📖 第 1 页 / 共 2 页
字号:
    {
        // clear write buffer
        writeBuffer.clear();
    }
    pthread_mutex_unlock( &clientInfo.countlock );
    stuffPipe();
    DEBUG("TCPComm::removeClient : unlock")
}

/* helper function to start server pthread */
void* checkClientsThread(void* ob)
{
    static_cast<TCPComm*>(ob)->connectClients();
    return NULL;
}

/* checks for new connected clients */
void TCPComm::connectClients()
{
    while (true)
    {
        int clientFD = accept(serverFD, NULL, NULL);
	pthread_testcancel();
        if (clientFD >= 0)
        {
            if (versionCheck(clientFD))
            {
                addClient(clientFD);
            }
            else
            {
                close(clientFD);
            }
        }
        else
        {
            pthread_testcancel();
            cancel();
        }
    }
}

/* helper function to start client reader pthread */
void* readClientsThread(void* ob)
{
    static_cast<TCPComm*>(ob)->readClients();
    return NULL;
}

/* reads from connected clients */
void TCPComm::readClients()
{
    FD_t clientFDs;
    while (true)
    {
        pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock);
        pthread_mutex_lock( &clientInfo.countlock );
        while( clientInfo.count == 0 )
        {
            // do nothing when no client is connected...
            DEBUG("TCPComm::readClients : sleeping reader thread")
            pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock );
        }
        // copy set in to temp set
        clientFDs = clientInfo.FDs;
        // removes the cleanup handler and executes it (unlock mutex)
        pthread_cleanup_pop(1);
        // check all fds (work with temp set)...
        fd_set rfds;
        FD_ZERO(&rfds);
        int maxFD = pipeReadFD;
        FD_SET(pipeReadFD, &rfds);
        set<int>::iterator it;
        for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
        {
            if (*it > maxFD)
            {
                maxFD = *it;
            }
            FD_SET(*it, &rfds);
        }
        if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 )
        {
            //             run = false;
            reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1);
        }
        else
        {
            if(FD_ISSET(pipeReadFD, &rfds)) {
                clearPipe();
            }
            for (it = clientFDs.begin(); it != clientFDs.end(); it++)
            {
                if (FD_ISSET(*it, &rfds))
                {
                    SFPacket packet;
                    if(readPacket(*it, packet)) {
                        // this call blocks until buffer is not full
                        readBuffer.enqueueBack(packet);
                        ++readPacketCount;
                    }
                    else {
                        DEBUG("TCPComm::readClients : removeClient")
                        removeClient(*it);
                    }
                }
            }
        }
    }
}

/* helper function to start client writer pthread */
void* writeClientsThread(void* ob)
{
    static_cast<TCPComm*>(ob)->writeClients();
    return NULL;
}

/* writes to connected clients */
void TCPComm::writeClients()
{
    FD_t clientFDs;
    while (true)
    {
        pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock);
        pthread_mutex_lock( &clientInfo.countlock );
        while( clientInfo.count == 0 )
        {
            // do nothing when no client is connected...
            DEBUG("TCPComm::writeClients : sleeping writer thread")
            pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock );
        }
        // removes the cleanup handler and executes it (unlock mutex)
        pthread_cleanup_pop(1); 

        // blocks until buffer is not empty
        SFPacket packet = writeBuffer.dequeue();
        pthread_testcancel();
        pthread_mutex_lock( &clientInfo.countlock );
        // copy client fd set into temp set
        clientFDs = clientInfo.FDs;
        pthread_mutex_unlock( &clientInfo.countlock );

        // check all fds (work with temp set)...
        set<int>::iterator it;
        // duplicate and send out packet to all connected clients
        for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
        {
            if (writePacket(*it, packet))
            {
                ++writtenPacketCount;
            }
            else
            {
                DEBUG("TCPComm::writeClients : removeClient")
                removeClient(*it);
            }
        }
    }
}

/* cancels all running threads */
void TCPComm::cancel()
{
    pthread_t callingThread = pthread_self();
    if (pthread_equal(callingThread, readerThread))
    {
        DEBUG("TCPComm::cancel : by readerThread")
        pthread_detach(readerThread);
        if (writerThreadRunning)
        {
            pthread_cancel(writerThread);
            DEBUG("TCPComm::cancel : writerThread canceled, joining")
            pthread_join(writerThread, NULL);
            writerThreadRunning = false;
        }
        if (serverThreadRunning)
        {
            pthread_cancel(serverThread);
            DEBUG("TCPComm::cancel : serverThread canceled, joining")
            pthread_join(serverThread, NULL);
            serverThreadRunning = false;
        }
        readerThreadRunning = false;
	pthread_cond_signal(&control.cancel);
        pthread_exit(NULL);
    }
    else if (pthread_equal(callingThread, writerThread))
    {
        DEBUG("TCPComm::cancel : by writerThread")
        pthread_detach(writerThread);
        if (readerThreadRunning)
        {
            pthread_cancel(readerThread);
            DEBUG("TCPComm::cancel : readerThread canceled, joining")
            pthread_join(readerThread, NULL);
            readerThreadRunning = false;
        }
        if (serverThreadRunning)
        {
            pthread_cancel(serverThread);
            DEBUG("TCPComm::cancel : serverThread canceled, joining")
            pthread_join(serverThread, NULL);
            serverThreadRunning = false;
        }
        writerThreadRunning = false;
	pthread_cond_signal(&control.cancel);
        pthread_exit(NULL);
    }
    else if (pthread_equal(callingThread, serverThread))
    {
        DEBUG("TCPComm::cancel : by serverThread")
        pthread_detach(serverThread);
        if (readerThreadRunning)
        {
            pthread_cancel(readerThread);
            DEBUG("TCPComm::cancel : readerThread canceled, joining")
	    pthread_join(readerThread, NULL);
            readerThreadRunning = false;
        }
        if (writerThreadRunning)
        {
            pthread_cancel(writerThread);
            DEBUG("TCPComm::cancel : writerThread canceled, joining")
            pthread_join(writerThread, NULL);
            writerThreadRunning = false;
        }
        serverThreadRunning = false;
	pthread_cond_signal(&control.cancel);
        pthread_exit(NULL);
    }
    else
    {
        DEBUG("TCPComm::cancel : by other thread")
	if (serverThreadRunning)
        {
            pthread_cancel(serverThread);
            DEBUG("TCPComm::cancel : serverThread canceled, joining")
            pthread_join(serverThread, NULL);
            serverThreadRunning = false;
        }
 	if (writerThreadRunning)
        {
            pthread_cancel(writerThread);
            DEBUG("TCPComm::cancel : writerThread canceled, joining")
            pthread_join(writerThread, NULL);
            writerThreadRunning = false;
        }
        if (readerThreadRunning)
        {
            pthread_cancel(readerThread);
            DEBUG("TCPComm::cancel : readerThread canceled, joining")
            pthread_join(readerThread, NULL);
            readerThreadRunning = false;
        }
	pthread_cond_signal(&control.cancel);
    }
}

/* reports error */
int TCPComm::reportError(const char *msg, int result)
{
    if ((result < 0) && (!errorReported))
    {
        errorMsg << "error : SF-Server (TCPComm on port = " << port << ") : "
        << msg << " ( result = " << result << " )" << endl
        << "error-description : " << strerror(errno) << endl;

        cerr << errorMsg.str();
        errorReported = true;
        cancel();
    }
    return result;
}

/* prints out status */
void TCPComm::reportStatus(ostream& os)
{
    os << "SF-Server ( TCPComm on port " << port << " )"
    << " : clients = " << clientInfo.count
    << " , packets read = " << readPacketCount
    << " , packets written = " << writtenPacketCount << endl;
}

void TCPComm::stuffPipe() 
{
    char info = 'n';
    if(write(pipeWriteFD, &info, 1) != 1) DEBUG("TCPComm::stuffPipe : lokal pipe is broken");
}

void TCPComm::clearPipe() {
    char buf;
    while(read(pipeReadFD, &buf, 1) > 0) {
        ;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -