port.cpp

来自「一个语言识别引擎」· C++ 代码 · 共 397 行

CPP
397
字号
// -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-

/*
 * Copyright (C) 2006 Paul Fitzpatrick
 * CopyPolicy: Released under the terms of the GNU GPL v2.0.
 *
 */


#include <yarp/os/Port.h>
#include <yarp/PortCore.h>
#include <yarp/Logger.h>
#include <yarp/NameClient.h>
#include <yarp/os/Contact.h>
#include <yarp/os/Network.h>
#include <yarp/os/Bottle.h>
#include <yarp/SemaphoreImpl.h>

using namespace yarp;
using namespace yarp::os;


class PortCoreAdapter : public PortCore {
private:
    Port& owner;
    SemaphoreImpl stateMutex;
    PortReader *readDelegate;
    PortWriter *writeDelegate;
    //PortReaderCreator *readCreatorDelegate;
    bool readResult, readActive, readBackground, willReply, closed, opened;
    SemaphoreImpl produce, consume;
    ReadableCreator *recReadCreator;
    int recWaitAfterSend;
public:
    PortCoreAdapter(Port& owner) : 
        owner(owner), stateMutex(1), readDelegate(NULL), writeDelegate(NULL),
        readResult(false),
        readActive(false),
        readBackground(false),
        willReply(false),
        closed(false),
        opened(false),
        produce(0), consume(0),
        recReadCreator(NULL),
        recWaitAfterSend(-1)
    {}

    void openable() {
        stateMutex.wait();
        closed = false;
        opened = true;
        stateMutex.post();
    }

    void finishReading() {
        if (!readBackground) {
            stateMutex.wait();
            closed = true;
            consume.post();
            consume.post();
            stateMutex.post();
        }
    }

    virtual bool read(ConnectionReader& reader) {
        // called by comms code
        if (!reader.isValid()) {
            // termination
            stateMutex.wait();
            if (readDelegate!=NULL) {
                readResult = readDelegate->read(reader);
            }
            stateMutex.post();
            produce.post();
            return false;
        } 

        // wait for happy consumer - don't want to miss a packet
        if (!readBackground) {
            consume.wait();
        }

        if (closed) {
            throw IOException("Port::read shutting down");
        }

        stateMutex.wait();
        readResult = false;
        if (readDelegate!=NULL) {
            readResult = readDelegate->read(reader);
        } else {
            // read and ignore
            YARP_DEBUG(Logger::get(),"data received in Port, no reader for it");
            Bottle b;
            b.read(reader);
        }
        if (!readBackground) {
            readDelegate = NULL;
            writeDelegate = NULL;
        }
        stateMutex.post();
        if (!readBackground) {
            produce.post();
        }
        if (readResult&&willReply) {
            consume.wait();
            if (closed) {
                throw IOException("Port::read shutting down");
            }
            stateMutex.wait();
            ConnectionWriter *writer = reader.getWriter();
            if (writer!=NULL) {
                readResult = writeDelegate->write(*writer);
            }
            stateMutex.post();
            produce.post();
        }
        return readResult;
    }

    bool read(PortReader& reader, bool willReply = false) {
        // called by user

        stateMutex.wait();
        readActive = true;
        readDelegate = &reader;
        writeDelegate = NULL;
        this->willReply = willReply;
        consume.post(); // happy consumer
        stateMutex.post();

        produce.wait();
        bool result = readResult;
        return result;
    }

    bool reply(PortWriter& writer) {
        writeDelegate = &writer;
        consume.post();
        produce.wait();
        bool result = readResult;
        return result;
    }

    /*
      Configuration of a port that should be remembered 
      between opens and closes
    */

    void configReader(PortReader& reader) {
        stateMutex.wait();
        readActive = true;
        readBackground = true;
        readDelegate = &reader;
        consume.post(); // just do this once
        stateMutex.post();
    }

    void configReadCreator(ReadableCreator& creator) {
        recReadCreator = &creator;
        setReadCreator(creator);
    }

    void configWaitAfterSend(bool waitAfterSend) {
        recWaitAfterSend = waitAfterSend?1:0;
        setWaitAfterSend(waitAfterSend);
    }

    PortReader *checkPortReader() {
        return readDelegate;
    }

    ReadableCreator *checkReadCreator() {
        return recReadCreator;
    }

    int checkWaitAfterSend() {
        return recWaitAfterSend;
    }


    bool isOpened() {
        return opened;
    }

    void setOpen(bool opened) {
        this->opened = opened;
    }
};

// implementation is a PortCoreAdapter
#define HELPER(x) (*((PortCoreAdapter*)(x)))


Port::Port() {
    implementation = new PortCoreAdapter(*this);
    YARP_ASSERT(implementation!=NULL);
}


bool Port::open(const char *name) {
    return open(Contact::byName(name));
}


bool Port::open(const Contact& contact, bool registerName) {

    // Allow for open() to be called safely many times on the same Port
    PortCoreAdapter *currentCore = &(HELPER(implementation));
    if (currentCore->isOpened()) {
        PortCoreAdapter *newCore = new PortCoreAdapter(*this);
        YARP_ASSERT(newCore!=NULL);
        // copy state that should survive in a new open()
        if (currentCore->checkPortReader()!=NULL) {
            newCore->configReader(*(currentCore->checkPortReader()));
        }
        if (currentCore->checkReadCreator()!=NULL) {
            newCore->configReadCreator(*(currentCore->checkReadCreator()));
        }
        if (currentCore->checkWaitAfterSend()>=0) {
            newCore->configWaitAfterSend(currentCore->checkWaitAfterSend());
        }
        close();
        delete ((PortCoreAdapter*)implementation);
        implementation = newCore;
    }

    PortCoreAdapter& core = HELPER(implementation);

    core.openable();

    bool success = true;
    Address caddress(contact.getHost().c_str(),
                     contact.getPort(),
                     contact.getCarrier().c_str(),
                     contact.getName().c_str());
    Address address = caddress;
    try {
        PortCoreAdapter& core = HELPER(implementation);
        core.setReadHandler(core);
        NameClient& nic = NameClient::getNameClient();
        if (registerName) {
            address = nic.registerName(contact.getName().c_str(),caddress);
        }
        success = address.isValid();

        if (success) {
            success = core.listen(address);
            if (success) {
                success = core.start();
            }
            YARP_INFO(Logger::get(),
                      String("Port ") +
                      address.getRegName() +
                      " listening at " +
                      address.toString());
        }
    } catch (IOException e) {
        success = false;
        //YARP_DEBUG(Logger::get(),e.toString() + " <<< Port::register failed");
        YARP_ERROR(Logger::get(),String("port ") + contact.getName().c_str() + String(" failed to open: ") + e.toString() + " (" + address.toString() + ")");
    }
    if (!success) {
        close();
    }
    return success;
}


void Port::close() {
    PortCoreAdapter& core = HELPER(implementation);
    core.finishReading();
    core.close();
    core.join();

    // In fact, open flag means "ever opened", so don't reset it
    // core.setOpened(false); 
}


Port::~Port() {
    if (implementation!=NULL) {
        close();
        delete ((PortCoreAdapter*)implementation);
        implementation = NULL;
    }
}


Contact Port::where() {
    PortCoreAdapter& core = HELPER(implementation);
    Address address = core.getAddress();
    return address.toContact();
}


bool Port::addOutput(const Contact& contact) {
    Contact me = where();
    return Network::connect(me.getName().c_str(),
                            contact.toString().c_str());
}


/**
 * write something to the port
 */
bool Port::write(PortWriter& writer) {
    PortCoreAdapter& core = HELPER(implementation);
    bool result = false;
    try {
        //WritableAdapter adapter(writer);
        core.send(writer);
        //writer.onCompletion();
        result = true;
    } catch (IOException e) {
        YARP_DEBUG(Logger::get(), e.toString() + " <<<< Port::write saw this");
        writer.onCompletion();
        // leave result false
    }
    return result;
}

/**
 * write something to the port
 */
bool Port::write(PortWriter& writer, PortReader& reader) const {
    PortCoreAdapter& core = HELPER(implementation);
    bool result = false;
    try {
        core.send(writer,&reader);
        result = true;
    } catch (IOException e) {
        YARP_DEBUG(Logger::get(), e.toString() + " <<<< Port::write saw this");
        writer.onCompletion();
        // leave result false
    }
    return result;
}

/**
 * read something from the port
 */
bool Port::read(PortReader& reader, bool willReply) {
    PortCoreAdapter& core = HELPER(implementation);
    return core.read(reader,willReply);
}



bool Port::reply(PortWriter& writer) {
    PortCoreAdapter& core = HELPER(implementation);
    return core.reply(writer);
}

/**
 * set an external writer for port data
 */
//void Port::setWriter(PortWriter& writer) {
//  YARP_ERROR(Logger::get(),"Port::setWriter not implemented");
//}

void Port::setReader(PortReader& reader) {
    PortCoreAdapter& core = HELPER(implementation);
    core.configReader(reader);
}

void Port::setReaderCreator(PortReaderCreator& creator) {
    PortCoreAdapter& core = HELPER(implementation);
    core.configReadCreator(creator);
}


void Port::enableBackgroundWrite(bool backgroundFlag) {
    PortCoreAdapter& core = HELPER(implementation);
    core.configWaitAfterSend(!backgroundFlag);
}


bool Port::isWriting() {
    PortCoreAdapter& core = HELPER(implementation);
    return core.isWriting();
}



bool Port::setEnvelope(PortWriter& envelope) {
    PortCoreAdapter& core = HELPER(implementation);
    return core.setEnvelope(envelope);
}


bool Port::getEnvelope(PortReader& envelope) {
    PortCoreAdapter& core = HELPER(implementation);
    return core.getEnvelope(envelope);
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?