portcore.cpp

来自「一个语言识别引擎」· C++ 代码 · 共 914 行 · 第 1/2 页

CPP
914
字号
// -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-

/*
 * Copyright (C) 2006 Paul Fitzpatrick
 * CopyPolicy: Released under the terms of the GNU GPL v2.0.
 *
 */

#include <yarp/InputProtocol.h>
#include <yarp/Logger.h>
#include <yarp/PortCore.h>
#include <yarp/BufferedConnectionWriter.h>
#include <yarp/NameClient.h>
#include <yarp/PortCoreInputUnit.h>
#include <yarp/PortCoreOutputUnit.h>
#include <yarp/StreamConnectionReader.h>
#include <yarp/Name.h>

#include <yarp/Companion.h>
#include <yarp/os/Network.h>
#include <yarp/os/Bottle.h>

#include <ace/OS_NS_stdio.h>


//#define YMSG(x) ACE_OS::printf x;
//#define YTRACE(x) YMSG(("at %s\n",x))

#define YMSG(x)
#define YTRACE(x) 

using namespace yarp;
using namespace yarp::os;

/*
  Phases:
  dormant
  listening
  running
*/

PortCore::~PortCore() {
    closeMain();
}


bool PortCore::listen(const Address& address) {
    bool success = false;

    YTRACE("PortCore::listen");

    if (!address.isValid()) {
        YARP_ERROR(log, "Port does not have a valid address");
        return false;
    }

    YARP_ASSERT(address.isValid());

    // try to enter listening phase
    stateMutex.wait();
    YARP_ASSERT(listening==false);
    YARP_ASSERT(running==false);
    YARP_ASSERT(closing==false);
    YARP_ASSERT(finished==false);
    YARP_ASSERT(face==NULL);
    this->address = address;
    setName(address.getRegName());

    try {
        face = Carriers::listen(address);
        if (face==NULL) {
            throw IOException("no carrier");
        }
    } catch (IOException e) {
        //YMSG(("listen failed: %s\n",e.toString().c_str()));
        if (face!=NULL) {
            face->close();
            delete face;
        }
        stateMutex.post();
        throw e;
    }
    if (face!=NULL) {
        listening = true;
        success = true;
    }

    if (success) {
        log.setPrefix(address.getRegName().c_str());
    }

    stateMutex.post();

    // we have either entered listening phase (face=valid, listening=true)
    // or remained in dormant phase

    return success;
}


void PortCore::setReadHandler(Readable& reader) {
    YARP_ASSERT(running==false);
    YARP_ASSERT(this->reader==NULL);
    this->reader = &reader;
}

void PortCore::setReadCreator(ReadableCreator& creator) {
    YARP_ASSERT(running==false);
    YARP_ASSERT(this->readableCreator==NULL);
    this->readableCreator = &creator;
}



void PortCore::run() {
    YTRACE("PortCore::run");

    // enter running phase
    YARP_ASSERT(listening==true);
    YARP_ASSERT(running==false);
    YARP_ASSERT(closing==false);
    YARP_ASSERT(finished==false);
    YARP_ASSERT(starting==true); // can only run if called from start
    running = true;
    starting = false;
    stateMutex.post();

    YTRACE("PortCore::run running");

    // main loop
    bool shouldStop = false;
    while (!shouldStop) {

        // block and wait for an event
        InputProtocol *ip = NULL;
        try {
            ip = face->read();
            YARP_DEBUG(log,"PortCore got something");
        } catch (IOException e) {
            YMSG(("read failed: %s\n",e.toString().c_str()));
        }

        // got an event, but before processing it, we check whether
        // we should shut down
        stateMutex.wait();
        shouldStop |= closing;
        events++;
        //YMSG(("*** event count boost to %d\n", events));
        stateMutex.post();

        if (!shouldStop) {
            // process event
            //YMSG(("PortCore::run got something, but no processing yet\n"));
            addInput(ip);
            ip = NULL;
        }

        // the event normally gets handed off.  If it remains, delete it.
        if (ip!=NULL) {
            try {
                ip->close();
                delete ip;
            } catch (IOException e) {
                YMSG(("input protocol close failed: %s\n",e.toString().c_str()));
            }
            ip = NULL;
        }
        reapUnits();
        stateMutex.wait();
        for (int i=0; i<connectionListeners; i++) {
            connectionChange.post();
        }
        connectionListeners = 0;
        stateMutex.post();
    }

    YTRACE("PortCore::run closing");

    // closing phase
    stateMutex.wait();
    for (int i=0; i<connectionListeners; i++) {
        connectionChange.post();
    }
    connectionListeners = 0;
    finished = true;
    stateMutex.post();
}


void PortCore::close() {
    closeMain();
}


bool PortCore::start() {
    YTRACE("PortCore::start");

    stateMutex.wait();
    YARP_ASSERT(listening==true);
    YARP_ASSERT(running==false);
    YARP_ASSERT(starting==false);
    YARP_ASSERT(finished==false);
    YARP_ASSERT(closing==false);
    starting = true;
    bool started = ThreadImpl::start();
    if (!started) {
        // run() won't be happening
        stateMutex.post();
    } else {
        // wait for run() to change state
        stateMutex.wait();
        YARP_ASSERT(running==true);
        stateMutex.post();
    }
    return started;
}



void PortCore::closeMain() {
    YTRACE("PortCore::closeMain");


    // Politely pre-disconnect inputs
    finishing = true;
    bool done = false;
    String prevName = "";
    while (!done) {
        done = true;
        String removeName = "";
        stateMutex.wait();
        for (unsigned int i=0; i<units.size(); i++) {
            PortCoreUnit *unit = units[i];
            if (unit!=NULL) {
                if (unit->isInput()) {
                    if (!unit->isDoomed()) {
                        Route r = unit->getRoute();
                        String s = r.getFromName();
                        if (s.length()>=1) {
                            if (s[0]=='/') {
                                if (s!=getName()) {
                                    if (s!=prevName) {
                                        removeName = s;
                                        done = false;
                                        break;
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        stateMutex.post();
        if (!done) {
            yarp::os::Network::disconnect(removeName.c_str(),
                                          getName().c_str(),
										  true);
            prevName = removeName;
        }
    }

	// politely remove all outputs
	done = false;
    while (!done) {
        done = true;
        Route removeRoute;
        stateMutex.wait();
        for (unsigned int i=0; i<units.size(); i++) {
            PortCoreUnit *unit = units[i];
            if (unit!=NULL) {
                if (unit->isOutput()&&!unit->isFinished()) {
					removeRoute = unit->getRoute();
					if (removeRoute.getFromName()==getName()) {
						done = false;
						break;
					}
                }
            }
        }
        stateMutex.post();
        if (!done) {
            // stray debugging message
			//printf("SHOULD remove %s for %s\n", 
            //  removeRoute.toString().c_str(),
            //	getName().c_str());
			removeUnit(removeRoute,true);
        }
    }

    stateMutex.wait();
    bool stopRunning = running;
    stateMutex.post();

    if (stopRunning) {
        // we need to stop the thread
        stateMutex.wait();
        closing = true;
        stateMutex.post();
        try {
            // wake it up
            OutputProtocol *op = face->write(address);
            if (op!=NULL) {
                op->close();
                delete op;
            }
        } catch (IOException e) {
            // no problem
        }
        join();

        // should be finished
        stateMutex.wait();
        YARP_ASSERT(finished==true);
        stateMutex.post();
    
        // should down units - this is the only time it is valid to do this
        closeUnits();

        stateMutex.wait();
        finished = false;
        closing = false;
        running = false;
        stateMutex.post();

        String name = getName();
        if (name!=String("")) {
            NameClient::getNameClient().unregisterName(name);
        }
    }

    // there should be no other threads at this point
    // can stop listening

    if (listening) {
        YARP_ASSERT(face!=NULL);
        try {
            face->close();
            delete face;
        } catch (IOException e) {
            YMSG(("face close failed: %s\n",e.toString().c_str()));
        }
        face = NULL;
        listening = false;
    }

    // Check if someone is waiting for input.  If so, wake them up
    if (reader!=NULL) {
        // send empty data out
        YARP_DEBUG(log,"sending end-of-port message to listener");
        StreamConnectionReader sbr;
        reader->read(sbr);
        reader = NULL;
    }

    // fresh as a daisy
    YARP_ASSERT(listening==false);
    YARP_ASSERT(running==false);
    YARP_ASSERT(starting==false);
    YARP_ASSERT(closing==false);
    YARP_ASSERT(finished==false);
    YARP_ASSERT(face==NULL);
}


int PortCore::getEventCount() {
    stateMutex.wait();
    int ct = events;
    stateMutex.post();
    return ct;
}


void PortCore::closeUnits() {
    stateMutex.wait();
    YARP_ASSERT(finished==true); // this is the only valid phase for this
    stateMutex.post();

    // in the "finished" phase, nobody else touches the units,
    // so we can go ahead and shut them down and delete them

    for (unsigned int i=0; i<units.size(); i++) {
        PortCoreUnit *unit = units[i];
        if (unit!=NULL) {
            YARP_DEBUG(log,"closing a unit");
            unit->close();
            YARP_DEBUG(log,"joining a unit");
            unit->join();
            delete unit;
            YARP_DEBUG(log,"deleting a unit");
            units[i] = NULL;
        }
    }
    units.clear();
    //YMSG(("closeUnits: there are now %d units\n", units.size()));
}

void PortCore::reapUnits() {
    stateMutex.wait();
    if (!finished) {
        for (unsigned int i=0; i<units.size(); i++) {
            PortCoreUnit *unit = units[i];
            if (unit!=NULL) {
                if (unit->isDoomed()&&!unit->isFinished()) {	
                    YARP_DEBUG(log,"REAPING a unit");
                    //printf("Reaping...%s\n", unit->getRoute().toString().c_str());
                    unit->close();
                    YARP_DEBUG(log,"closed REAPING a unit");
                    unit->join();
                    //printf("done Reaping...%s\n", unit->getRoute().toString().c_str());
                    YARP_DEBUG(log,"done REAPING a unit");
                }
            }
        }
    }
    stateMutex.post();
    cleanUnits();
}

void PortCore::cleanUnits() {
    YARP_DEBUG(log,"CLEANING scan");
    stateMutex.wait();
    if (!finished) {
    
        for (unsigned int i=0; i<units.size(); i++) {
            PortCoreUnit *unit = units[i];
            if (unit!=NULL) {
                YARP_DEBUG(log,String("checking ") + unit->getRoute().toString());
                if (unit->isFinished()) {
                    YARP_DEBUG(log,"CLEANING a unit");
                    try {
                        unit->close();
                        unit->join();
                    } catch (IOException e) {
                        YARP_DEBUG(log,e.toString() + " <<< cleanUnits error");
                    }
                    delete unit;
                    units[i] = NULL;
                    YARP_DEBUG(log,"done CLEANING a unit");
                }
            }
        }
        unsigned int rem = 0;
        for (unsigned int i2=0; i2<units.size(); i2++) {
            if (units[i2]!=NULL) {
                if (rem<i2) {
                    units[rem] = units[i2];
                    units[i2] = NULL;
                }
                rem++;
            }
        }
        for (unsigned int i3=0; i3<units.size()-rem; i3++) {
            units.pop_back();
        }
        //YMSG(("cleanUnits: there are now %d units\n", units.size()));
    }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?