📄 tcpcomm.cpp
字号:
/*
* Copyright (c) 2007, Technische Universitaet Berlin
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* - Neither the name of the Technische Universitaet Berlin nor the names
* of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/**
* @author Philipp Huppertz <huppertz@tkn.tu-berlin.de>
*/
#include "sharedinfo.h"
#include "tcpcomm.h"
#include "sfpacket.h"
#include "stdio.h"
#include <iostream>
#include <set>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
using namespace std;
/* forward declarations of pthrad helper functions*/
void* checkClientsThread(void*);
void* readClientsThread(void*);
void* writeClientsThread(void*);
/* opens tcp server port for listening and start threads*/
TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl)
{
// init values
writerThreadRunning = false;
readerThreadRunning = false;
serverThreadRunning = false;
clientInfo.count = 0;
clientInfo.FDs.clear();
readPacketCount = 0;
writtenPacketCount = 0;
port = pPort;
pthread_mutex_init(&clientInfo.sleeplock, NULL);
pthread_mutex_init(&clientInfo.countlock, NULL);
pthread_cond_init(&clientInfo.wakeup, NULL);
struct sockaddr_in me;
int opt;
int rxBuf = 1024;
/* create pipe to inform client reader of new clients */
if (!errorReported) {
int pipeFDPair[2];
reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair));
pipeWriteFD = pipeFDPair[1];
pipeReadFD = pipeFDPair[0];
}
if (!errorReported) {
reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);",
fcntl(pipeReadFD, F_SETFL, O_NONBLOCK));
}
/* create server socket where clients connect */
if (!errorReported) {
serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)",
socket(AF_INET, SOCK_STREAM, 0));
}
memset(&me, 0, sizeof me);
me.sin_family = AF_INET;
me.sin_port = htons(port);
opt = 1;
if (!errorReported) {
reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))",
setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
}
if (!errorReported) {
reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))",
setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
}
if (!errorReported) {
reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)",
bind(serverFD, (struct sockaddr *)&me, sizeof me));
}
if (!errorReported) {
reportError("TCPComm::TCPComm : listen(serverFD, 5)",
listen(serverFD, 5));
}
// start thread for server socket (adding and removing clients)
if (!errorReported)
{
if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)",
pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) {
serverThreadRunning = true;
}
// start thread for reading from client connections
if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)",
pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) {
readerThreadRunning = true;
}
// start thread for writing to client connections
if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)",
pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) {
writerThreadRunning = true;
}
}
}
TCPComm::~TCPComm()
{
cancel();
close(serverFD);
set<int>::iterator it;
for( it = clientInfo.FDs.begin(); it != clientInfo.FDs.end(); it++ )
{
close(*it);
}
close(pipeWriteFD);
close(pipeReadFD);
pthread_mutex_destroy(&clientInfo.sleeplock);
pthread_mutex_destroy(&clientInfo.countlock);
pthread_cond_destroy(&clientInfo.wakeup);
}
int TCPComm::getPort()
{
return port;
}
/* reads packet */
bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
{
char l;
char* buffer[SFPacket::getMaxPayloadLength()];
int err;
if (readFD(pFD, &l, 1, &err) != 1)
{
return false;
}
if (l > SFPacket::getMaxPayloadLength())
{
return false;
}
if (readFD(pFD, (char*) buffer, static_cast<int>(l), &err) != l)
{
return false;
}
if (pPacket.setPayload((char*)buffer ,l))
{
return true;
}
else
{
return false;
}
}
int TCPComm::writeFD(int fd, const char *buffer, int count, int *err)
{
int actual = 0;
while (count > 0)
{
#ifdef __APPLE__
int n = send(fd, buffer, count, 0);
#else
int n = send(fd, buffer, count, MSG_NOSIGNAL);
#endif
if (n == -1) {
*err = errno;
return -1;
}
count -= n;
actual += n;
buffer += n;
}
return actual;
}
/* writes packet */
bool TCPComm::writePacket(int pFD, SFPacket &pPacket)
{
int len = pPacket.getTcpLength();
int err;
return (writeFD(pFD, pPacket.getTcpPayload(), len, &err) == len);
}
/* checks for correct version of SF protocol */
bool TCPComm::versionCheck(int clientFD)
{
char check[2], us[2];
int version;
int err = 0;
/* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */
us[0] = 'U';
us[1] = ' ';
if (writeFD(clientFD, us, 2, &err) != 2)
{
return false;
}
if (readFD(clientFD, check, 2, &err) != 2)
{
return false;
}
if (check[0] != 'U')
{
return false;
}
version = check[1];
if (us[1] < version)
{
version = us[1];
}
/* Add other cases here for later protocol versions */
switch (version)
{
case ' ':
break;
default:
return false;
}
return true;
}
/* adds a client to the client list and wakes up all threads */
void TCPComm::addClient(int clientFD)
{
DEBUG("TCPComm::addClient : lock")
pthread_testcancel();
pthread_mutex_lock( &clientInfo.countlock );
bool wakeupClientThreads = false;
if (clientInfo.count == 0)
{
wakeupClientThreads = true;
}
++clientInfo.count;
clientInfo.FDs.insert(clientFD);
if (wakeupClientThreads)
{
pthread_cond_broadcast( &clientInfo.wakeup );
}
pthread_mutex_unlock( &clientInfo.countlock );
stuffPipe();
DEBUG("TCPComm::addClient : unlock")
}
void TCPComm::removeClient(int clientFD)
{
DEBUG("TCPComm::removeClient : lock")
pthread_testcancel();
pthread_mutex_lock( &clientInfo.countlock );
if (clientInfo.count > 0)
{
clientInfo.FDs.erase(clientFD);
if (close(clientFD) != 0)
{
DEBUG("TCPComm::removeClient : error closing fd " << clientFD)
}
else
{
--clientInfo.count;
}
}
if (clientInfo.count == 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -