📄 tcpcomm.cpp
字号:
{ // clear write buffer writeBuffer.clear(); } pthread_mutex_unlock( &clientInfo.countlock ); stuffPipe(); DEBUG("TCPComm::removeClient : unlock")}/* helper function to start server pthread */void* checkClientsThread(void* ob){ static_cast<TCPComm*>(ob)->connectClients(); return NULL;}/* checks for new connected clients */void TCPComm::connectClients(){ while (true) { int clientFD = accept(serverFD, NULL, NULL); pthread_testcancel(); if (clientFD >= 0) { if (versionCheck(clientFD)) { addClient(clientFD); } else { close(clientFD); } } else { pthread_testcancel(); cancel(); } }}/* helper function to start client reader pthread */void* readClientsThread(void* ob){ static_cast<TCPComm*>(ob)->readClients(); return NULL;}/* reads from connected clients */void TCPComm::readClients(){ FD_t clientFDs; while (true) { pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock); pthread_mutex_lock( &clientInfo.countlock ); while( clientInfo.count == 0 ) { // do nothing when no client is connected... DEBUG("TCPComm::readClients : sleeping reader thread") pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock ); } // copy set in to temp set clientFDs = clientInfo.FDs; // removes the cleanup handler and executes it (unlock mutex) pthread_cleanup_pop(1); // check all fds (work with temp set)... fd_set rfds; FD_ZERO(&rfds); int maxFD = pipeReadFD; FD_SET(pipeReadFD, &rfds); set<int>::iterator it; for( it = clientFDs.begin(); it != clientFDs.end(); it++ ) { if (*it > maxFD) { maxFD = *it; } FD_SET(*it, &rfds); } if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 ) { // run = false; reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1); } else { if(FD_ISSET(pipeReadFD, &rfds)) { clearPipe(); } for (it = clientFDs.begin(); it != clientFDs.end(); it++) { if (FD_ISSET(*it, &rfds)) { SFPacket packet; if(readPacket(*it, packet)) { // this call blocks until buffer is not full readBuffer.enqueueBack(packet); ++readPacketCount; } else { DEBUG("TCPComm::readClients : removeClient") removeClient(*it); } } } } }}/* helper function to start client writer pthread */void* writeClientsThread(void* ob){ static_cast<TCPComm*>(ob)->writeClients(); return NULL;}/* writes to connected clients */void TCPComm::writeClients(){ FD_t clientFDs; while (true) { pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock); pthread_mutex_lock( &clientInfo.countlock ); while( clientInfo.count == 0 ) { // do nothing when no client is connected... DEBUG("TCPComm::writeClients : sleeping writer thread") pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock ); } // removes the cleanup handler and executes it (unlock mutex) pthread_cleanup_pop(1); // blocks until buffer is not empty SFPacket packet = writeBuffer.dequeue(); pthread_testcancel(); pthread_mutex_lock( &clientInfo.countlock ); // copy client fd set into temp set clientFDs = clientInfo.FDs; pthread_mutex_unlock( &clientInfo.countlock ); // check all fds (work with temp set)... set<int>::iterator it; // duplicate and send out packet to all connected clients for( it = clientFDs.begin(); it != clientFDs.end(); it++ ) { if (writePacket(*it, packet)) { ++writtenPacketCount; } else { DEBUG("TCPComm::writeClients : removeClient") removeClient(*it); } } }}/* cancels all running threads */void TCPComm::cancel(){ pthread_t callingThread = pthread_self(); if (pthread_equal(callingThread, readerThread)) { DEBUG("TCPComm::cancel : by readerThread") pthread_detach(readerThread); if (writerThreadRunning) { pthread_cancel(writerThread); DEBUG("TCPComm::cancel : writerThread canceled, joining") pthread_join(writerThread, NULL); writerThreadRunning = false; } if (serverThreadRunning) { pthread_cancel(serverThread); DEBUG("TCPComm::cancel : serverThread canceled, joining") pthread_join(serverThread, NULL); serverThreadRunning = false; } readerThreadRunning = false; pthread_cond_signal(&control.cancel); pthread_exit(NULL); } else if (pthread_equal(callingThread, writerThread)) { DEBUG("TCPComm::cancel : by writerThread") pthread_detach(writerThread); if (readerThreadRunning) { pthread_cancel(readerThread); DEBUG("TCPComm::cancel : readerThread canceled, joining") pthread_join(readerThread, NULL); readerThreadRunning = false; } if (serverThreadRunning) { pthread_cancel(serverThread); DEBUG("TCPComm::cancel : serverThread canceled, joining") pthread_join(serverThread, NULL); serverThreadRunning = false; } writerThreadRunning = false; pthread_cond_signal(&control.cancel); pthread_exit(NULL); } else if (pthread_equal(callingThread, serverThread)) { DEBUG("TCPComm::cancel : by serverThread") pthread_detach(serverThread); if (readerThreadRunning) { pthread_cancel(readerThread); DEBUG("TCPComm::cancel : readerThread canceled, joining") pthread_join(readerThread, NULL); readerThreadRunning = false; } if (writerThreadRunning) { pthread_cancel(writerThread); DEBUG("TCPComm::cancel : writerThread canceled, joining") pthread_join(writerThread, NULL); writerThreadRunning = false; } serverThreadRunning = false; pthread_cond_signal(&control.cancel); pthread_exit(NULL); } else { DEBUG("TCPComm::cancel : by other thread") if (serverThreadRunning) { pthread_cancel(serverThread); DEBUG("TCPComm::cancel : serverThread canceled, joining") pthread_join(serverThread, NULL); serverThreadRunning = false; } if (writerThreadRunning) { pthread_cancel(writerThread); DEBUG("TCPComm::cancel : writerThread canceled, joining") pthread_join(writerThread, NULL); writerThreadRunning = false; } if (readerThreadRunning) { pthread_cancel(readerThread); DEBUG("TCPComm::cancel : readerThread canceled, joining") pthread_join(readerThread, NULL); readerThreadRunning = false; } pthread_cond_signal(&control.cancel); }}/* reports error */int TCPComm::reportError(const char *msg, int result){ if ((result < 0) && (!errorReported)) { errorMsg << "error : SF-Server (TCPComm on port = " << port << ") : " << msg << " ( result = " << result << " )" << endl << "error-description : " << strerror(errno) << endl; cerr << errorMsg.str(); errorReported = true; cancel(); } return result;}/* prints out status */void TCPComm::reportStatus(ostream& os){ os << "SF-Server ( TCPComm on port " << port << " )" << " : clients = " << clientInfo.count << " , packets read = " << readPacketCount << " , packets written = " << writtenPacketCount << endl;}void TCPComm::stuffPipe() { char info = 'n'; write(pipeWriteFD, &info, 1);}void TCPComm::clearPipe() { char buf; while(read(pipeReadFD, &buf, 1) > 0) { ; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -