⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpcomm.cpp

📁 tinyos-2.x.rar
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/*
 * Copyright (c) 2007, Technische Universitaet Berlin
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions 
 * are met:
 * - Redistributions of source code must retain the above copyright notice,
 *   this list of conditions and the following disclaimer.
 * - Redistributions in binary form must reproduce the above copyright 
 *   notice, this list of conditions and the following disclaimer in the 
 *   documentation and/or other materials provided with the distribution.
 * - Neither the name of the Technische Universitaet Berlin nor the names 
 *   of its contributors may be used to endorse or promote products derived
 *   from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 
 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 
 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
/**
 * @author Philipp Huppertz <huppertz@tkn.tu-berlin.de>
 */

#include "sharedinfo.h"
#include "tcpcomm.h"
#include "sfpacket.h"
#include "stdio.h"

#include <iostream>
#include <set>

#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>



using namespace std;

/* forward declarations of pthrad helper functions*/
void* checkClientsThread(void*);
void* readClientsThread(void*);
void* writeClientsThread(void*);

/* opens tcp server port for listening and start threads*/
TCPComm::TCPComm(int pPort, PacketBuffer &pReadBuffer, PacketBuffer &pWriteBuffer, sharedControlInfo_t& pControl) : readBuffer(pReadBuffer), writeBuffer(pWriteBuffer), errorReported(false), errorMsg(""), control(pControl)
{   
    // init values
    writerThreadRunning = false;
    readerThreadRunning = false;
    serverThreadRunning = false;
    clientInfo.count = 0;
    clientInfo.FDs.clear();
    readPacketCount = 0;
    writtenPacketCount = 0;
    port = pPort;
    
    pthread_mutex_init(&clientInfo.sleeplock, NULL);
    pthread_mutex_init(&clientInfo.countlock, NULL);
    pthread_cond_init(&clientInfo.wakeup, NULL);

    struct sockaddr_in me;
    int opt;
    int rxBuf = 1024;

    /* create pipe to inform client reader of new clients */
    if (!errorReported) {
        int pipeFDPair[2];
        reportError("TCPComm::TCPComm : pipe(pipeFDPair)", pipe(pipeFDPair));
        pipeWriteFD = pipeFDPair[1];
        pipeReadFD = pipeFDPair[0];
    }
    if (!errorReported) {
        reportError("TCPComm::TCPComm : fcntl(pipeReadFD, F_SETFL, O_NONBLOCK);",
                    fcntl(pipeReadFD, F_SETFL, O_NONBLOCK));
    }
    /* create server socket where clients connect */
    if (!errorReported) {
        serverFD = reportError("TCPComm::TCPComm : socket(AF_INET, SOCK_STREAM, 0)",
                               socket(AF_INET, SOCK_STREAM, 0));
    }
    memset(&me, 0, sizeof me);
    me.sin_family = AF_INET;
    me.sin_port = htons(port);
    
    opt = 1;
    if (!errorReported) {
        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt))",
                    setsockopt(serverFD, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)));
    }
    if (!errorReported) {
        reportError("TCPComm::TCPComm : setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf))",
                    setsockopt(serverFD, SOL_SOCKET, SO_RCVBUF, (char *)&rxBuf, sizeof(rxBuf)));
    }
    if (!errorReported) {
        reportError("TCPComm::TCPComm : bind(serverFD, (struct sockaddr *)&me, sizeof me)",
                    bind(serverFD, (struct sockaddr *)&me, sizeof me));
    }
    if (!errorReported) {
        reportError("TCPComm::TCPComm : listen(serverFD, 5)",
                    listen(serverFD, 5));
    }

    // start thread for server socket (adding and removing clients)
    if (!errorReported)
    {
        if (reportError("TCPComm::TCPComm : pthread_create( &serverThread, NULL, checkClientsThread, this)",
                        pthread_create( &serverThread, NULL, checkClientsThread, this)) == 0) {
            serverThreadRunning = true;
        }
        // start thread for reading from client connections
        if (reportError("TCPComm::TCPComm : pthread_create( &readerThread, NULL, readClientsThread, this)",
                        pthread_create( &readerThread, NULL, readClientsThread, this)) == 0) {
            readerThreadRunning = true;
        }
        // start thread for writing to client connections
        if (reportError("TCPComm::TCPComm : pthread_create( &writerThread, NULL, writeClientsThread, this)",
                        pthread_create( &writerThread, NULL, writeClientsThread, this)) == 0) {
            writerThreadRunning = true;
        }
    }
}


TCPComm::~TCPComm()
{
    cancel();

    close(serverFD);
    set<int>::iterator it;
    for( it = clientInfo.FDs.begin(); it != clientInfo.FDs.end(); it++ )
    {
        close(*it);
    }
    close(pipeWriteFD);
    close(pipeReadFD);
    pthread_mutex_destroy(&clientInfo.sleeplock);
    pthread_mutex_destroy(&clientInfo.countlock);
    pthread_cond_destroy(&clientInfo.wakeup);
}

int TCPComm::getPort()
{
    return port;
}

/* reads packet */
bool TCPComm::readPacket(int pFD, SFPacket &pPacket)
{
    char l;
    char* buffer[SFPacket::getMaxPayloadLength()];
    int err;
    
    if (readFD(pFD, &l, 1, &err) != 1)
    {
        return false;
    }
    if (l > SFPacket::getMaxPayloadLength())
    {
        return false;
    }
    if (readFD(pFD, (char*) buffer, static_cast<int>(l), &err) != l)
    {
        return false;
    }
    if (pPacket.setPayload((char*)buffer ,l))
    {
        return true;
    }
    else
    {
        return false;
    }
}

int TCPComm::writeFD(int fd, const char *buffer, int count, int *err)
{
    int actual = 0;
    while (count > 0)
    {
#ifdef __APPLE__
        int n = send(fd, buffer, count, 0);
#else
        int n = send(fd, buffer, count, MSG_NOSIGNAL);
#endif
        if (n == -1) {
            *err = errno;
            return -1;
        }
        count -= n;
        actual += n;
        buffer += n;
    }
    return actual;
}

/* writes packet */
bool TCPComm::writePacket(int pFD, SFPacket &pPacket)
{
    int len = pPacket.getTcpLength();
    int err;
    return (writeFD(pFD, pPacket.getTcpPayload(), len, &err) == len);
}

/* checks for correct version of SF protocol */
bool TCPComm::versionCheck(int clientFD)
{
    char check[2], us[2];
    int version;
    int err = 0;
    /* Indicate version and check if a TinyOS 2.0 serial forwarder on the other end */
    us[0] = 'U';
    us[1] = ' ';
    
    if (writeFD(clientFD, us, 2, &err) != 2)
    {
        return false;
    }
    if (readFD(clientFD, check, 2, &err) != 2)
    {
        return false;
    }
    if (check[0] != 'U')
    {
        return false;
    }

    version = check[1];
    if (us[1] < version)
    {
        version = us[1];
    }
    /* Add other cases here for later protocol versions */
    switch (version)
    {
    case ' ':
        break;
    default:
        return false;
    }

    return true;
}

/* adds a client to the client list and wakes up all threads */
void TCPComm::addClient(int clientFD)
{
    DEBUG("TCPComm::addClient : lock")
    pthread_testcancel();
    pthread_mutex_lock( &clientInfo.countlock );
    bool wakeupClientThreads = false;
    if (clientInfo.count == 0)
    {
        wakeupClientThreads = true;
    }
    ++clientInfo.count;
    clientInfo.FDs.insert(clientFD);
    if (wakeupClientThreads)
    {
        pthread_cond_broadcast( &clientInfo.wakeup );
    }
    pthread_mutex_unlock( &clientInfo.countlock );
    stuffPipe();
    DEBUG("TCPComm::addClient : unlock")
}

void TCPComm::removeClient(int clientFD)
{
    DEBUG("TCPComm::removeClient : lock")
    pthread_testcancel();
    pthread_mutex_lock( &clientInfo.countlock );
    if (clientInfo.count > 0)
    {
        clientInfo.FDs.erase(clientFD);
        if (close(clientFD) != 0)
        {
            DEBUG("TCPComm::removeClient : error closing fd " << clientFD)
        }
        else
        {
            --clientInfo.count;
        }
    }
    if (clientInfo.count == 0)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -