📄 tcpcomm.cpp
字号:
{
// clear write buffer
writeBuffer.clear();
}
pthread_mutex_unlock( &clientInfo.countlock );
stuffPipe();
DEBUG("TCPComm::removeClient : unlock")
}
/* helper function to start server pthread */
void* checkClientsThread(void* ob)
{
static_cast<TCPComm*>(ob)->connectClients();
return NULL;
}
/* checks for new connected clients */
void TCPComm::connectClients()
{
while (true)
{
int clientFD = accept(serverFD, NULL, NULL);
pthread_testcancel();
if (clientFD >= 0)
{
if (versionCheck(clientFD))
{
addClient(clientFD);
}
else
{
close(clientFD);
}
}
else
{
pthread_testcancel();
cancel();
}
}
}
/* helper function to start client reader pthread */
void* readClientsThread(void* ob)
{
static_cast<TCPComm*>(ob)->readClients();
return NULL;
}
/* reads from connected clients */
void TCPComm::readClients()
{
FD_t clientFDs;
while (true)
{
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock);
pthread_mutex_lock( &clientInfo.countlock );
while( clientInfo.count == 0 )
{
// do nothing when no client is connected...
DEBUG("TCPComm::readClients : sleeping reader thread")
pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock );
}
// copy set in to temp set
clientFDs = clientInfo.FDs;
// removes the cleanup handler and executes it (unlock mutex)
pthread_cleanup_pop(1);
// check all fds (work with temp set)...
fd_set rfds;
FD_ZERO(&rfds);
int maxFD = pipeReadFD;
FD_SET(pipeReadFD, &rfds);
set<int>::iterator it;
for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
{
if (*it > maxFD)
{
maxFD = *it;
}
FD_SET(*it, &rfds);
}
if (select(maxFD + 1, &rfds, NULL, NULL, NULL) < 0 )
{
// run = false;
reportError("TCPComm::readClients : select(maxFD+1, &rfds, NULL, NULL, NULL)", -1);
}
else
{
if(FD_ISSET(pipeReadFD, &rfds)) {
clearPipe();
}
for (it = clientFDs.begin(); it != clientFDs.end(); it++)
{
if (FD_ISSET(*it, &rfds))
{
SFPacket packet;
if(readPacket(*it, packet)) {
// this call blocks until buffer is not full
readBuffer.enqueueBack(packet);
++readPacketCount;
}
else {
DEBUG("TCPComm::readClients : removeClient")
removeClient(*it);
}
}
}
}
}
}
/* helper function to start client writer pthread */
void* writeClientsThread(void* ob)
{
static_cast<TCPComm*>(ob)->writeClients();
return NULL;
}
/* writes to connected clients */
void TCPComm::writeClients()
{
FD_t clientFDs;
while (true)
{
pthread_cleanup_push((void(*)(void*)) pthread_mutex_unlock, (void *) &clientInfo.countlock);
pthread_mutex_lock( &clientInfo.countlock );
while( clientInfo.count == 0 )
{
// do nothing when no client is connected...
DEBUG("TCPComm::writeClients : sleeping writer thread")
pthread_cond_wait( &clientInfo.wakeup, &clientInfo.countlock );
}
// removes the cleanup handler and executes it (unlock mutex)
pthread_cleanup_pop(1);
// blocks until buffer is not empty
SFPacket packet = writeBuffer.dequeue();
pthread_testcancel();
pthread_mutex_lock( &clientInfo.countlock );
// copy client fd set into temp set
clientFDs = clientInfo.FDs;
pthread_mutex_unlock( &clientInfo.countlock );
// check all fds (work with temp set)...
set<int>::iterator it;
// duplicate and send out packet to all connected clients
for( it = clientFDs.begin(); it != clientFDs.end(); it++ )
{
if (writePacket(*it, packet))
{
++writtenPacketCount;
}
else
{
DEBUG("TCPComm::writeClients : removeClient")
removeClient(*it);
}
}
}
}
/* cancels all running threads */
void TCPComm::cancel()
{
pthread_t callingThread = pthread_self();
if (pthread_equal(callingThread, readerThread))
{
DEBUG("TCPComm::cancel : by readerThread")
pthread_detach(readerThread);
if (writerThreadRunning)
{
pthread_cancel(writerThread);
DEBUG("TCPComm::cancel : writerThread canceled, joining")
pthread_join(writerThread, NULL);
writerThreadRunning = false;
}
if (serverThreadRunning)
{
pthread_cancel(serverThread);
DEBUG("TCPComm::cancel : serverThread canceled, joining")
pthread_join(serverThread, NULL);
serverThreadRunning = false;
}
readerThreadRunning = false;
pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
else if (pthread_equal(callingThread, writerThread))
{
DEBUG("TCPComm::cancel : by writerThread")
pthread_detach(writerThread);
if (readerThreadRunning)
{
pthread_cancel(readerThread);
DEBUG("TCPComm::cancel : readerThread canceled, joining")
pthread_join(readerThread, NULL);
readerThreadRunning = false;
}
if (serverThreadRunning)
{
pthread_cancel(serverThread);
DEBUG("TCPComm::cancel : serverThread canceled, joining")
pthread_join(serverThread, NULL);
serverThreadRunning = false;
}
writerThreadRunning = false;
pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
else if (pthread_equal(callingThread, serverThread))
{
DEBUG("TCPComm::cancel : by serverThread")
pthread_detach(serverThread);
if (readerThreadRunning)
{
pthread_cancel(readerThread);
DEBUG("TCPComm::cancel : readerThread canceled, joining")
pthread_join(readerThread, NULL);
readerThreadRunning = false;
}
if (writerThreadRunning)
{
pthread_cancel(writerThread);
DEBUG("TCPComm::cancel : writerThread canceled, joining")
pthread_join(writerThread, NULL);
writerThreadRunning = false;
}
serverThreadRunning = false;
pthread_cond_signal(&control.cancel);
pthread_exit(NULL);
}
else
{
DEBUG("TCPComm::cancel : by other thread")
if (serverThreadRunning)
{
pthread_cancel(serverThread);
DEBUG("TCPComm::cancel : serverThread canceled, joining")
pthread_join(serverThread, NULL);
serverThreadRunning = false;
}
if (writerThreadRunning)
{
pthread_cancel(writerThread);
DEBUG("TCPComm::cancel : writerThread canceled, joining")
pthread_join(writerThread, NULL);
writerThreadRunning = false;
}
if (readerThreadRunning)
{
pthread_cancel(readerThread);
DEBUG("TCPComm::cancel : readerThread canceled, joining")
pthread_join(readerThread, NULL);
readerThreadRunning = false;
}
pthread_cond_signal(&control.cancel);
}
}
/* reports error */
int TCPComm::reportError(const char *msg, int result)
{
if ((result < 0) && (!errorReported))
{
errorMsg << "error : SF-Server (TCPComm on port = " << port << ") : "
<< msg << " ( result = " << result << " )" << endl
<< "error-description : " << strerror(errno) << endl;
cerr << errorMsg.str();
errorReported = true;
cancel();
}
return result;
}
/* prints out status */
void TCPComm::reportStatus(ostream& os)
{
os << "SF-Server ( TCPComm on port " << port << " )"
<< " : clients = " << clientInfo.count
<< " , packets read = " << readPacketCount
<< " , packets written = " << writtenPacketCount << endl;
}
void TCPComm::stuffPipe()
{
char info = 'n';
if(write(pipeWriteFD, &info, 1) != 1) DEBUG("TCPComm::stuffPipe : lokal pipe is broken");
}
void TCPComm::clearPipe() {
char buf;
while(read(pipeReadFD, &buf, 1) > 0) {
;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -