📄 tcpcomm.cpp
字号:
/* * Copyright (c) 2007, Technische Universitaet Berlin * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * - Neither the name of the Technische Universitaet Berlin nor the names * of its contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *//** * @author Philipp Huppertz <huppertz@tkn.tu-berlin.de> */#include "sharedinfo.h"#include "tcpcomm.h"#include "sfpacket.h"#include "stdio.h"#include <iostream>#include <set>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <netdb.h>#include <errno.h>#include <fcntl.h>#include <pthread.h>using namespace std;/* forward declarations of pthrad helper functions*/void* checkClientsThread(void*);void* readClientsThread(void*);void* writeClientsThread(void*);/* opens tcp server port for listening and start threads*/TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl){ // init values writerThreadRunning = false; readerThreadRunning = false; serverThreadRunning = false; clientInfo.count = 0; clientInfo.FDs.clear(); readPacketCount = 0; writtenPacketCount = 0; port = pPort; pthread_mutex_init(&clientInfo.sleeplock, NULL); pthread_mutex_init(&clientInfo.countlock, NULL); pthread_cond_init(&clientInfo.wakeup, NULL); struct sockaddr_in me; int opt; int rxBuf = 1024; /* create pipe to inform client reader of new clients */ if (!errorReported) { int pipeFDPair[2]; reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair)); pipeWriteFD = pipeFDPair[1]; pipeReadFD = pipeFDPair[0]; } if (!errorReported) { reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);", fcntl(pipeReadFD, F_SETFL, O_NONBLOCK)); } /* create server socket where clients connect */ if (!errorReported) { serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)", socket(AF_INET, SOCK_STREAM, 0)); } memset(&me, 0, sizeof me); me.sin_family = AF_INET; me.sin_port = htons(port); opt = 1; if (!errorReported) { reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))", setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))); } if (!errorReported) { reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))", setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))); } if (!errorReported) { reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)", bind(serverFD, (struct sockaddr *)&me, sizeof me)); } if (!errorReported) { reportError("TCPComm::TCPComm : listen(serverFD, 5)", listen(serverFD, 5)); } // start thread for server socket (adding and removing clients) if (!errorReported) { if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)", pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) { serverThreadRunning = true; } // start thread for reading from client connections if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)", pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) { readerThreadRunning = true; } // start thread for writing to client connections if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)", pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) { writerThreadRunning = true; } }}TCPComm::~TCPComm(){ cancel(); close(serverFD); set<int>::iterator it; for( it = clientInfo.FDs.begin(); it != clientInfo.FDs.end(); it++ ) { close(*it); } close(pipeWriteFD); close(pipeReadFD); pthread_mutex_destroy(&clientInfo.sleeplock); pthread_mutex_destroy(&clientInfo.countlock); pthread_cond_destroy(&clientInfo.wakeup);}int TCPComm::getPort(){ return port;}/* reads packet */bool TCPComm::readPacket(int pFD, SFPacket &pPacket){ char l; char* buffer[SFPacket::getMaxPayloadLength()]; int err; if (readFD(pFD, &l, 1, &err) != 1) { return false; } if (l > SFPacket::getMaxPayloadLength()) { return false; } if (readFD(pFD, (char*) buffer, static_cast<int>(l), &err) != l) { return false; } if (pPacket.setPayload((char*)buffer ,l)) { return true; } else { return false; }}int TCPComm::writeFD(int fd, const char *buffer, int count, int *err){ int actual = 0; while (count > 0) {#ifdef __APPLE__ int n = send(fd, buffer, count, 0);#else int n = send(fd, buffer, count, MSG_NOSIGNAL);#endif if (n == -1) { *err = errno; return -1; } count -= n; actual += n; buffer += n; } return actual;}/* writes packet */bool TCPComm::writePacket(int pFD, SFPacket &pPacket){ int len = pPacket.getTcpLength(); int err; return (writeFD(pFD, pPacket.getTcpPayload(), len, &err) == len);}/* checks for correct version of SF protocol */bool TCPComm::versionCheck(int clientFD){ char check[2], us[2]; int version; int err = 0; /* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */ us[0] = 'U'; us[1] = ' '; if (writeFD(clientFD, us, 2, &err) != 2) { return false; } if (readFD(clientFD, check, 2, &err) != 2) { return false; } if (check[0] != 'U') { return false; } version = check[1]; if (us[1] < version) { version = us[1]; } /* Add other cases here for later protocol versions */ switch (version) { case ' ': break; default: return false; } return true;}/* adds a client to the client list and wakes up all threads */void TCPComm::addClient(int clientFD){ DEBUG("TCPComm::addClient : lock") pthread_testcancel(); pthread_mutex_lock( &clientInfo.countlock ); bool wakeupClientThreads = false; if (clientInfo.count == 0) { wakeupClientThreads = true; } ++clientInfo.count; clientInfo.FDs.insert(clientFD); if (wakeupClientThreads) { pthread_cond_broadcast( &clientInfo.wakeup ); } pthread_mutex_unlock( &clientInfo.countlock ); stuffPipe(); DEBUG("TCPComm::addClient : unlock")}void TCPComm::removeClient(int clientFD){ DEBUG("TCPComm::removeClient : lock") pthread_testcancel(); pthread_mutex_lock( &clientInfo.countlock ); if (clientInfo.count > 0) { clientInfo.FDs.erase(clientFD); if (close(clientFD) != 0) { DEBUG("TCPComm::removeClient : error closing fd " << clientFD) } else { --clientInfo.count; } } if (clientInfo.count == 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -