dgramtwowaystream.cpp

来自「一个语言识别引擎」· C++ 代码 · 共 427 行

CPP
427
字号
// -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-

/*
 * Copyright (C) 2006 Paul Fitzpatrick
 * CopyPolicy: Released under the terms of the GNU GPL v2.0.
 *
 */


#include <yarp/DgramTwoWayStream.h>

#include <yarp/Logger.h>
#include <yarp/os/Time.h>
#include <yarp/NetType.h>

#include <ace/SOCK_Dgram_Mcast.h>

using namespace yarp;

#define CRC_SIZE 8
#define READ_SIZE (120000-CRC_SIZE)
//#define WRITE_SIZE (65500-CRC_SIZE)
#define WRITE_SIZE (60000-CRC_SIZE)
//#define WRITE_SIZE (1000-CRC_SIZE)



static bool checkCrc(char *buf, int length, int crcLength, int pct,
                     int *store_altPct = NULL) {
    unsigned long alt = NetType::getCrc(buf+crcLength,length-crcLength);
    Bytes b(buf,4);
    Bytes b2(buf+4,4);
    unsigned long curr = (unsigned long)NetType::netInt(b);
    int altPct = NetType::netInt(b2);
    bool ok = (alt == curr && pct==altPct);
    if (!ok) {
        if (alt!=curr) {
            YARP_DEBUG(Logger::get(), "crc mismatch");
        }
        if (pct!=altPct) {
            YARP_DEBUG(Logger::get(), "packet code broken");
        }
    }
    //YARP_ERROR(Logger::get(), String("check crc read as ") + NetType::toString(curr));
    //YARP_ERROR(Logger::get(), String("check crc count is ") + NetType::toString(altPct));
    //YARP_ERROR(Logger::get(), String("check local count ") + NetType::toString(pct));
    //YARP_ERROR(Logger::get(), String("check remote count ") + NetType::toString(altPct));
    if (store_altPct!=NULL) {
        *store_altPct = altPct;
    }

    return ok;
}


static void addCrc(char *buf, int length, int crcLength, int pct) {
    unsigned long alt = NetType::getCrc(buf+crcLength,length-crcLength);
    Bytes b(buf,4);
    Bytes b2(buf+4,4);
    NetType::netInt((NetType::NetInt32)alt,b);
    NetType::netInt((NetType::NetInt32)pct,b2);
    //YARP_ERROR(Logger::get(), String("msg len ") + NetType::toString(length));
    //YARP_ERROR(Logger::get(), String("crc set to ") + NetType::toString(alt));
    //YARP_ERROR(Logger::get(), String("crc ct to ") + NetType::toString(pct));
}


void DgramTwoWayStream::open(const Address& remote) {
    Address local;
    ACE_INET_Addr anywhere((u_short)0, (ACE_UINT32)INADDR_ANY);
    local = Address(anywhere.get_host_addr(),
                    anywhere.get_port_number());
    open(local,remote);
}

void DgramTwoWayStream::open(const Address& local, const Address& remote) {

    localAddress = local;
    remoteAddress = remote;

    localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()),(ACE_UINT32)INADDR_ANY);
    if (remote.isValid()) {
        remoteHandle.set(remoteAddress.getPort(),remoteAddress.getName().c_str());
    }
    dgram = new ACE_SOCK_Dgram;
    YARP_ASSERT(dgram!=NULL);

    int result = dgram->open(localHandle,
                             ACE_PROTOCOL_FAMILY_INET,
                             0,
                             1);
    if (result!=0) {
        throw IOException("could not open datagram socket");
    }

    configureSystemBuffers();

    dgram->get_local_addr(localHandle);
    YARP_DEBUG(Logger::get(),String("starting DGRAM entity on port number ") + NetType::toString(localHandle.get_port_number()));
    localAddress = Address("127.0.0.1",
                           localHandle.get_port_number());
    YARP_DEBUG(Logger::get(),String("Update: DGRAM from ") + 
               localAddress.toString() + 
               " to " + remote.toString());

    allocate();
}

void DgramTwoWayStream::allocate() {
    readBuffer.allocate(READ_SIZE+CRC_SIZE);
    writeBuffer.allocate(WRITE_SIZE+CRC_SIZE);
    readAt = 0;
    readAvail = 0;
    writeAvail = CRC_SIZE;
    happy = true;
    pct = 0;
}


void DgramTwoWayStream::configureSystemBuffers() {
    // ask for more buffer space for udp/mcast
    int window_size_desired = 600000;
    int window_size = window_size_desired;
    int result = dgram->set_option(SOL_SOCKET, SO_RCVBUF,
                                   (char *) &window_size, sizeof(window_size));
    window_size = 0;
    int len = 4;
    int result2 = dgram->get_option(SOL_SOCKET, SO_RCVBUF,
                                    (char *) &window_size, &len);
    if (result!=0||result2!=0||window_size<window_size_desired) {
        // give a warning if we get CRC problems
        bufferAlertNeeded = true;
        bufferAlerted = false;
    }
}


void DgramTwoWayStream::join(const Address& group, bool sender) {

    if (sender) {
        // just use udp as normal
        open(group);
        return;
    }

    ACE_SOCK_Dgram_Mcast *dmcast = new ACE_SOCK_Dgram_Mcast;

    //possible flags: ((ACE_SOCK_Dgram_Mcast::options)(ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL | ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES));

    dgram = dmcast;
    YARP_ASSERT(dgram!=NULL);

    YARP_DEBUG(Logger::get(),String("subscribing to mcast address ") + 
               group.toString());
    ACE_INET_Addr addr(group.getPort(),group.getName().c_str());

    int result = dmcast->join(addr,1);

    configureSystemBuffers();

    if (result!=0) {
        throw IOException("cannot connect to multi-cast address");
    }
    localAddress = group;
    remoteAddress = group;
    localHandle.set(localAddress.getPort(),localAddress.getName().c_str());
    remoteHandle.set(remoteAddress.getPort(),remoteAddress.getName().c_str());

    allocate();
}

DgramTwoWayStream::~DgramTwoWayStream() {
    close();
}

void DgramTwoWayStream::interrupt() {
    bool handsOff = false;
    if (!closed) {
        closed = true;
        if (reader) {
            YARP_DEBUG(Logger::get(),"dgram interrupt");
            try {
                DgramTwoWayStream tmp;
                tmp.open(Address(localAddress.getName(),0),localAddress);
                ManagedBytes empty(10);
                for (int i=0; i<empty.length(); i++) {
                    empty.get()[i] = 0;
                }

                // don't want this message getting into a valid packet
                tmp.pct = -1;

                handsOff = true; // from this moment on, object may have
                                 // been deleted
                tmp.write(empty.bytes());
                tmp.flush();
                tmp.close();
            } catch (IOException e) {
                YARP_DEBUG(Logger::get(),e.toString() + " <<< closer dgram exception");
            }
            YARP_DEBUG(Logger::get(),"finished dgram interrupt");
        }
    }
    if (!handsOff) {
        happy = false;
    }
}

void DgramTwoWayStream::close() {
    if (dgram!=NULL) {
        interrupt();
        if (dgram!=NULL) {
            dgram->close();
            delete dgram;
            dgram = NULL;
        }
    }
    happy = false;
}

int DgramTwoWayStream::read(const Bytes& b) {
    reader = true;
    bool done = false;
    
    while (!done) {

        if (closed) { 
            happy = false;
            return -1; 
        }
        
        // if nothing is available, try to grab stuff
        if (readAvail==0) {
            readAt = 0;
            ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
            YARP_ASSERT(dgram!=NULL);
            //YARP_DEBUG(Logger::get(),"DGRAM Waiting for something!");
            int result =
                dgram->recv(readBuffer.get(),readBuffer.length(),dummy);
            YARP_DEBUG(Logger::get(),
                       String("DGRAM Got ") + NetType::toString(result) +
                       " bytes");
            /*
              // this message isn't needed anymore
            if (result>WRITE_SIZE*1.25) {
                YARP_ERROR(Logger::get(),
                           String("Got big datagram: ")+NetType::toString(result)+
                           " bytes");
            }
            */
            if (closed||result<0) {
                happy = false;
                return result;
            }
            readAvail = result;
            
            // deal with CRC
            int altPct = 0;
            bool crcOk = checkCrc(readBuffer.get(),readAvail,CRC_SIZE,pct,
                                  &altPct);
            if (altPct!=-1) {
                pct++;
                if (!crcOk) {
                    YARP_ERROR(Logger::get(),
                               "*** Multicast/UDP packet dropped - checksum error ***");
                    if (bufferAlertNeeded&&!bufferAlerted) {
                        YARP_INFO(Logger::get(),
                                  "The UDP/MCAST system buffer limit on your system is low.");
                        YARP_INFO(Logger::get(),
                                  "You may get packet loss under heavy conditions.");
#ifdef YARP2_LINUX
                        YARP_INFO(Logger::get(),
                                  "To change the buffer limit on linux: sysctl -w net.core.rmem_max=8388608");
                        YARP_INFO(Logger::get(),
                                  "(Might be something like: sudo /sbin/sysctl -w net.core.rmem_max=8388608)");
#else
                        YARP_INFO(Logger::get(),
                                  "To change the limit use: systcl for Linux/FreeBSD, ndd for Solaris, no for AIX");
#endif
                        bufferAlerted = true;
                    }
                    //readAt = 0;
                    //readAvail = 0;
                    reset();
                    throw IOException("CRC failure");
                    return -1;
                } else {
                    readAt += CRC_SIZE;
                    readAvail -= CRC_SIZE;
                }
                done = true;
            } else {
                // this is just a housekeeping message, ignore it
                readAvail = 0;
            }
        }

        // if stuff is available, take it
        if (readAvail>0) {
            int take = readAvail;
            if (take>b.length()) {
                take = b.length();
            }
            ACE_OS::memcpy(b.get(),readBuffer.get()+readAt,take);
            readAt += take;
            readAvail -= take;
            return take;
        }
    }

    return 0;
}

void DgramTwoWayStream::write(const Bytes& b) {
    //YARP_DEBUG(Logger::get(),"DGRAM prep writing");
    //ACE_OS::printf("DGRAM write %d bytes\n",b.length());

    if (reader) {
        return;
    }

    Bytes local = b;
    while (local.length()>0) {
        //YARP_DEBUG(Logger::get(),"DGRAM prep writing");
        int rem = local.length();
        int space = writeBuffer.length()-writeAvail;
        bool shouldFlush = false;
        if (rem>=space) {
            rem = space;
            shouldFlush = true;
        }
        memcpy(writeBuffer.get()+writeAvail, local.get(), rem);
        writeAvail+=rem;
        local = Bytes(local.get()+rem,local.length()-rem);
        if (shouldFlush) {
            flush();
        }
    }
}


void DgramTwoWayStream::flush() {
    // should set CRC
    if (writeAvail<=CRC_SIZE) {
        return;
    }
    addCrc(writeBuffer.get(),writeAvail,CRC_SIZE,pct);
    pct++;

    while (writeAvail>0) {
        int writeAt = 0;
        YARP_ASSERT(dgram!=NULL);
        int len = 0;

        len = dgram->send(writeBuffer.get()+writeAt,writeAvail-writeAt,
                          remoteHandle);
        YARP_DEBUG(Logger::get(),
                   String("DGRAM wrote ") +
                   NetType::toString(len) + " bytes");
        if (len>WRITE_SIZE*0.75) {
            YARP_DEBUG(Logger::get(),
                       "long dgrams might need a little time");

            // Under heavy loads, packets could get dropped
            // 640x480x3 images correspond to about 15 datagrams
            // so there's not much time possible between them
            // looked at iperf, it just does a busy-waiting delay
            // there's an implementation below, but commented out -
            // better solution was to increase recv buffer size

            double first = yarp::os::Time::now();
            double now = first;
            int ct = 0;
            do {
                //printf("Busy wait... %d\n", ct);
                yarp::os::Time::delay(0);
                now = yarp::os::Time::now();
                ct++;
            } while (now-first<0.001);
        }

        if (len<0) {
            happy = false;
            throw IOException("DGRAM failed to write");
        }
        writeAt += len;
        writeAvail -= len;

        if (writeAvail!=0) {
            // well, we have a problem
            // checksums will cause dumping
            YARP_DEBUG(Logger::get(), "dgram/mcast send behaving badly");
        }
    }
    // finally: writeAvail should be 0

    // make space for CRC
    writeAvail = CRC_SIZE;
}


bool DgramTwoWayStream::isOk() {
    return happy;
}


void DgramTwoWayStream::reset() {
    readAt = 0;
    readAvail = 0;
    writeAvail = CRC_SIZE;
    pct = 0;
}


void DgramTwoWayStream::beginPacket() {
    //YARP_ERROR(Logger::get(),String("Packet begins: ")+(reader?"reader":"writer"));
    pct = 0;
}

void DgramTwoWayStream::endPacket() {
    //YARP_ERROR(Logger::get(),String("Packet ends: ")+(reader?"reader":"writer"));
    if (!reader) {
        pct = 0;
    }
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?