portcoreinputunit.cpp

来自「一个语言识别引擎」· C++ 代码 · 共 280 行

CPP
280
字号
// -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-

/*
 * Copyright (C) 2006 Paul Fitzpatrick
 * CopyPolicy: Released under the terms of the GNU GPL v2.0.
 *
 */


#include <yarp/os/Time.h>
#include <yarp/PortCoreInputUnit.h>
#include <yarp/PortCommand.h>
#include <yarp/Logger.h>
#include <yarp/BufferedConnectionWriter.h>
#include <yarp/Name.h>


#define YMSG(x) ACE_OS::printf x;
#define YTRACE(x) YMSG(("at %s\n",x))


using namespace yarp;

bool PortCoreInputUnit::start() {

    if (ip!=NULL) {
        Route route = ip->getRoute();
        YARP_DEBUG(Logger::get(),String("starting output for ") + 
                   route.toString());
    }

    phase.wait();

    bool result = PortCoreUnit::start();
    if (result) {
        phase.wait();
        phase.post();
    } else {
        phase.post();
    }

    return result;
}


void PortCoreInputUnit::run() {
    running = true;
    phase.post();

    Route route;

    try {
        bool done = false;

        YARP_ASSERT(ip!=NULL);

        PortCommand cmd;
  
        if (autoHandshake) {
            ip->open(getName().c_str());
            route = ip->getRoute();
            if (Name(route.getFromName()).isRooted()) {
                YARP_INFO(Logger::get(),String("Receiving input from ") + 
                          route.getFromName() + " to " + route.getToName() + 
                          " using " +
                          route.getCarrierName());
            }
        } else {
            ip->open(""); // anonymous connection
            route = ip->getRoute();
        }

        if (closing) {
            done = true;
        }

        void *id = (void *)this;

        while (!done) {
            try {
                ConnectionReader& br = ip->beginRead();

                if (autoHandshake) {
                    cmd.readBlock(br);
                } else {
                    cmd = PortCommand('d',"");
                }

                if (closing||isDoomed()) {
                    done = true;
                    break;
                }
                char key = cmd.getKey();
                //ACE_OS::printf("Port command is [%c:%d/%s]\n",
                //	     (key>=32)?key:'?', key, cmd.getText().c_str());
	
                PortManager& man = getOwner();
                OutputStream *os = NULL;
                if (br.isTextMode()) {
                    os = &(ip->getOutputStream());
                }
	
                switch (key) {
                case '/':
                    YARP_DEBUG(Logger::get(),String("asking to add output to ")+
                               cmd.getText());
                    man.addOutput(cmd.getText(),id,os);
                    break;
                case '!':
                    man.removeOutput(cmd.getText().substring(1,-1),id,os);
                    break;
                case '~':
                    man.removeInput(cmd.getText().substring(1,-1),id,os);
                    break;
                case '*':
                    man.describe(id,os);
                    break;
                case 'd':
                    {
                        try {
                            String env = cmd.getText();
                            if (env.length()>1) {
                                //YARP_ERROR(Logger::get(),
                                //"***** received an envelope!");
                                man.setEnvelope(env.substr(2,env.length()));
                            }
                            if (localReader) {
                                localReader->read(br);
                            } else {
                                man.readBlock(br,id,os);
                            }
                        } catch (IOException e) {
                            YARP_DEBUG(Logger::get(),e.toString() + " <<< user level PortCoreInputUnit exception, passing on");
                            done = true;
                            throw e;
                        }
                    }
                    break;
                case 'q':
                    done = true;
                    break;
                case '?':
                case 'h':
                    if (os!=NULL) {
                        BufferedConnectionWriter bw(true);
                        bw.appendLine("This is a YARP port.  Here are the commands it responds to:");
                        bw.appendLine("*       Gives a description of this port");
                        bw.appendLine("d       Signals the beginning of input for the port's owner");
                        bw.appendLine("q       Disconnects");
                        bw.appendLine("/port   Requests to send output to /port");
                        bw.appendLine("!/port  Requests to stop sending output to /port");
                        bw.appendLine("~/port  Requests to stop receiving input from /port");
                        bw.appendLine("?       Gives this help");
                        bw.write(*os);
                    }
                    break;
                default:
                    if (os!=NULL) {
                        BufferedConnectionWriter bw(true);
                        bw.appendLine("Port command not understood.");
                        bw.appendLine("Type d to send data to the port's owner.");
                        bw.appendLine("Type ? for help.");
                        bw.write(*os);
                    }
                    break;
                }
                ip->endRead();
            } catch (IOException e) {
                YARP_DEBUG(Logger::get(),e.toString() + " <<< initial PortCoreInputUnit exception");
                if (!ip->checkStreams()) {
                    // pass it on
                    YARP_DEBUG(Logger::get(), "passing on exception");
                    throw e;
                } else {
                    // clear out any garbage
                    ip->resetStreams();
                }
            }
            if (closing||isDoomed()||(!ip->checkStreams())) {
                done = true;
                break;
            }
        }
    } catch (IOException e) {
        /* ok, ports die - it is their nature */
        YARP_DEBUG(Logger::get(),e.toString() + " <<< PortCoreInputUnit exception");
    }

  
    YARP_DEBUG(Logger::get(),"PortCoreInputUnit closing ip");
    ip->close();
    YARP_DEBUG(Logger::get(),"PortCoreInputUnit closed ip");

    if (autoHandshake) {
        if (Name(route.getFromName()).isRooted()) {
            YARP_INFO(Logger::get(),String("Removing input from ") + 
                      route.getFromName() + " to " + route.getToName());
		} else {
	        YARP_DEBUG(Logger::get(),"PortCoreInputUnit (unrooted) shutting down");
		}
    } else {
        YARP_DEBUG(Logger::get(),"PortCoreInputUnit shutting down");
    }

    if (localReader!=NULL) {
        delete localReader;
        localReader = NULL;
    }

    running = false;
    finished = true;

    // it would be nice to get my entry removed from the port immediately,
    // but it would be a bit dodgy to delete this object and join this
    // thread within and from themselves
}



void PortCoreInputUnit::runSimulation() {
    /*
    // simulation
    running = true;
    while (true) {
    ACE_OS::printf("tick\n");
    Time::delay(0.3);
    if (closing) {
    break;
    }
    }
    */

    ACE_OS::printf("stopping\n");

    running = false;
    finished = true;
}


void PortCoreInputUnit::closeMain() {
    YARP_DEBUG(Logger::get(),"PortCoreInputUnit closing");

    if (running) {
        // give a kick (unfortunately unavoidable)
        if (ip!=NULL) {
            YARP_DEBUG(Logger::get(),"PortCoreInputUnit interrupting");
            ip->interrupt();
            YARP_DEBUG(Logger::get(),"PortCoreInputUnit interrupted");
        }
        closing = true;
        YARP_DEBUG(Logger::get(),"PortCoreInputUnit joining");
        join();
        YARP_DEBUG(Logger::get(),"PortCoreInputUnit joined");
    }

    if (ip!=NULL) {
        try {
            ip->close();
        } catch (IOException e) { /*ok*/ }
        try {
            delete ip;
        } catch (IOException e) { /*ok*/ }
        ip = NULL;
    }
    running = false;
    closing = false;
    //finished = false;
    //setDoomed(false);
}


Route PortCoreInputUnit::getRoute() {
    if (ip!=NULL) {
        return ip->getRoute();
    }
    return PortCoreUnit::getRoute();
}


⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?