⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 protocol.h

📁 一个语言识别引擎
💻 H
字号:
// -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-

/*
 * Copyright (C) 2006 Paul Fitzpatrick
 * CopyPolicy: Released under the terms of the GNU GPL v2.0.
 *
 */

#ifndef _YARP2_PROTOCOL_
#define _YARP2_PROTOCOL_

#include <yarp/Carrier.h>
#include <yarp/Logger.h>
#include <yarp/String.h>
#include <yarp/TwoWayStream.h>
#include <yarp/Carriers.h>
#include <yarp/BufferedConnectionWriter.h>
#include <yarp/StreamConnectionReader.h>
#include <yarp/ManagedBytes.h>
#include <yarp/NetType.h>
#include <yarp/ShiftStream.h>

namespace yarp {
    class Protocol;
}

/**
 * Connection Communication choreographer.  Handles a single YARP connection,
 * and insulates ports from the details of the particular Carrier in use.
 */
class yarp::Protocol : public OutputProtocol, public InputProtocol {
public:

    // everything could throw IOException

    /**
     * This becomes owner of shiftstream
     */
    Protocol(TwoWayStream *stream) : 
        log(Logger::get()), header(8), number(4), indexHeader(10) {
        shift.takeStream(stream);
        route = Route("null","null","tcp");
        delegate = NULL;
        messageLen = 0;
        pendingAck = false;
        writer = NULL;
        altReader = NULL;
    }

    virtual ~Protocol() {
        /*
          if (delegate!=NULL) {
          delegate->close();
          delete delegate;
          delegate = NULL;
          }
          shift.close();
        */
        closeHelper();
    }

    void setRoute(const Route& route) {
        this->route = route;
    }

    const Route& getRoute() {
        return route;
    }

    void setCarrier(const String& carrierName) {
        setRoute(getRoute().addCarrierName(carrierName));
        YARP_ASSERT(delegate==NULL);
        delegate = Carriers::chooseCarrier(carrierName);
        if (delegate==NULL) {
            throw new IOException("no such carrier");
        }
        delegate->prepareSend(*this);
    }

    void defaultExpectSenderSpecifier() {
        int len = 0;
        int r = NetType::readFull(is(),number.bytes());
        if (r!=number.length()) {
            throw new IOException("did not get sender name length");
        }
        len = NetType::netInt(number.bytes());
        if (len>1000) len = 1000;
        if (len<1) len = 1;
        // expect a string -- these days null terminated, but not in YARP1
        ManagedBytes b(len+1);
        r = NetType::readFull(is(),Bytes(b.get(),len));
        if (r!=len) {
            throw new IOException("did not get sender name");
        }
        // add null termination for YARP1
        b.get()[len] = '\0';
        String s = b.get();
        setRoute(getRoute().addFromName(s));
    }

    void sendHeader() {
        YARP_ASSERT(delegate!=NULL);
        delegate->sendHeader(*this);
    }

    void defaultSendHeader() {
        sendProtocolSpecifier();
        sendSenderSpecifier();
    }

    void expectHeader() {
        messageLen = 0;
        expectProtocolSpecifier();
        expectSenderSpecifier();
        YARP_ASSERT(delegate!=NULL);
        delegate->expectExtraHeader(*this);
    }

    void expectReplyToHeader() {
        YARP_ASSERT(delegate!=NULL);
        delegate->expectReplyToHeader(*this);
        //writer.reset(delegate->isTextMode());
    }

    void respondToHeader() {
        YARP_ASSERT(delegate!=NULL);
        delegate->respondToHeader(*this);
        os().flush();
    }

    int readYarpInt() {
        int len = NetType::readFull(is(),header.bytes());
        ACE_UNUSED_ARG(len);
        if (len!=header.length()) {
            throw IOException("data stream died");
        }
        return interpretYarpNumber(header.bytes());
    }

    void writeYarpInt(int n) {
        createYarpNumber(n,header.bytes());
        os().write(header.bytes());
    }

    static int interpretYarpNumber(const Bytes& b) {
        if (b.length()==8) {
            char *base = b.get();
            if (base[0]=='Y' && base[1]=='A' &&
                base[6]=='R' && base[7]=='P') {
                Bytes b2(b.get()+2,4);
                int x = NetType::netInt(b2);
                return x;
            }
        }
        return -1;
    }

    static void createYarpNumber(int x,const Bytes& header) {
        if (header.length()!=8) {
            throw IOException("wrong header length");
        }
        char *base = header.get();
        base[0] = 'Y';
        base[1] = 'A';
        base[6] = 'R';
        base[7] = 'P';
        Bytes code(base+2,4);
        NetType::netInt(x,code);
    }


    void sendIndex() {
        ACE_DEBUG((LM_DEBUG,"Protocol::sendIndex for %s", getRoute().toString().c_str()));
        YARP_ASSERT(delegate!=NULL);
        delegate->sendIndex(*this);
    }

    void defaultSendIndex();

    void sendContent() {
        YARP_ASSERT(writer!=NULL);
        writer->write(os());
        os().flush();
    }

    void expectIndex() {
        pendingAck = true;
        messageLen = 0;
        YARP_ASSERT(delegate!=NULL);
        getStreams().beginPacket();
        delegate->expectIndex(*this);
        reader.reset(is(),&getStreams(),getRoute(),
                     messageLen,delegate->isTextMode());
    }

    void defaultExpectIndex();

    void respondToIndex() {
    }

    void expectAck() {
        YARP_ASSERT(delegate!=NULL);
        if (delegate->requireAck()) {
            delegate->expectAck(*this);
        }
    }

    void defaultExpectAck() {
        YARP_ASSERT(delegate!=NULL);
        if (delegate->requireAck()) {
            int hdr = NetType::readFull(is(),header.bytes());
            if (hdr!=header.length()) {
                throw IOException("did not get acknowledgement header");
            }
            int len = interpretYarpNumber(header.bytes());
            if (len<0) {
                throw IOException("acknowledgement header is bad");
            }
            int len2 = NetType::readDiscard(is(),len);
            if (len!=len2) {
                throw IOException("did not get an acknowledgement of the promised length");
            }
        }
    }

    void sendAck() {
        pendingAck = false;
        YARP_ASSERT(delegate!=NULL);
        if (delegate->requireAck()) {
            delegate->sendAck(*this);
        }
        getStreams().endPacket();
    }

    void defaultSendAck();

    void interrupt() {
        try {
            if (pendingAck) {
                sendAck();
            }
            shift.getInputStream().interrupt();
        } catch (IOException e) {
            YARP_DEBUG(Logger::get(),
                       String("yarp::Protocol::interrupt exception: ") + 
                       e.toString());
        }
    }

    void close() {
        closeHelper();
    }

    void closeHelper() {
        //YARP_DEBUG(Logger::get(),"Protocol object closing");
        try {
            if (pendingAck) {
                sendAck();
            }
        } catch (IOException e) {
            // ok, comms shutting down
        }
        shift.close();
        if (delegate!=NULL) {
            delegate->close();
            delete delegate;
            delegate = NULL;
        }
        //YARP_DEBUG(Logger::get(),"Protocol object closed");
    }

    TwoWayStream& getStreams() {
        return shift;
    }

    void takeStreams(TwoWayStream *streams) {
        shift.takeStream(streams);
    }

    OutputStream& os() {
        return shift.getOutputStream();
    }

    InputStream& is() {
        return shift.getInputStream();
    }

    OutputStream& getOutputStream() {
        return os();
    }

    InputStream& getInputStream() {
        return is();
    }

    const Address& getRemoteAddress() {
        ACE_DEBUG((LM_ERROR,"Protocol::getRemoteAddress not yet implemented"));
        throw IOException("getRemoteAddress failed");
    }


    ///////////////////////////////////////////////////////////////////////
    // OutputProtocol view

    virtual void open(const Route& route) {
        setRoute(route);
        setCarrier(route.getCarrierName());
        sendHeader();
        expectReplyToHeader();
    }

    virtual void open(const String& name) {
        if (name=="") {
            setCarrier("text");
            setRoute(Route("no-name","no-name","no-carrier"));
        } else {
            expectHeader();
            respondToHeader();
            setRoute(getRoute().addToName(name));
        }
    }

    virtual bool isActive() {
        YARP_ASSERT(delegate!=NULL);
        return delegate->isActive();
    }

    virtual bool isTextMode() {
        YARP_ASSERT(delegate!=NULL);
        return delegate->isTextMode();
    }

    virtual bool isConnectionless() {
        YARP_ASSERT(delegate!=NULL);
        return delegate->isConnectionless();
    }

    virtual void write(SizedWriter& writer) {
        this->writer = &writer;
        if (isActive()) {
            YARP_ASSERT(delegate!=NULL);
            getStreams().beginPacket();
            delegate->write(*this,writer);
            getStreams().endPacket();
            PortReader *reply = writer.getReplyHandler();
            if (reply!=NULL) {
                if (!delegate->supportReply()) {
                    YARP_INFO(log,String("connection ") + getRoute().toString() + " does not support replies (try \"tcp\" or \"text_ack\")");
                }
                reader.reset(is(),&getStreams(), getRoute(),
                             messageLen,delegate->isTextMode());
                reply->read(reader);
            }
            expectAck(); //MOVE ack to after reply, if present
        }
        this->writer = NULL;
    }

    virtual OutputProtocol& getOutput() {
        return *this;
    }

    virtual InputProtocol& getInput() {
        return *this;
    }


    virtual ConnectionReader& beginRead() {
        expectIndex();
        respondToIndex();
        if (altReader!=NULL) {
            YARP_DEBUG(Logger::get(), "alternate reader in operation");
            return *altReader;
        }
        return reader;
    }

    virtual void endRead() {
        if (altReader!=NULL) {
            //altReader->release();
            sendAck();
        } else {
            reader.flushWriter();
            sendAck();  //MOVE ack to after reply, if present
        }
    }

    virtual bool checkStreams() {
        return shift.isOk();
    }

    virtual void resetStreams() {
        shift.reset();
    }


    void setReader(ConnectionReader *altReader) {
        this->altReader = altReader;
    }

private:

    void sendProtocolSpecifier() {
        YARP_ASSERT(delegate!=NULL);
        delegate->getHeader(header.bytes());
        os().write(header.bytes());
        os().flush();
    }

    void expectProtocolSpecifier() {
        int len = NetType::readFull(is(),header.bytes());
        ACE_UNUSED_ARG(len);
        //ACE_OS::printf("len is %d but header is %d\n", len, header.length());
        if (len==-1) {
            throw IOException("no connection");
        }
        if(len!=header.length()) {
            throw IOException("data stream died");
        }
        bool already = false;
        if (delegate!=NULL) {
            if (delegate->checkHeader(header.bytes())) {
                already = true;
            }
        }
        if (!already) {
            delegate = Carriers::chooseCarrier(header.bytes());
        }
        if (delegate==NULL) {
            throw IOException("unrecognized protocol");
        }
        setRoute(getRoute().addCarrierName(delegate->getName()));
        delegate->setParameters(header.bytes());
    }


    void sendSenderSpecifier() {
        const String& senderName = getRoute().getFromName();
        NetType::netInt(senderName.length()+1,number.bytes());
        os().write(number.bytes());
        Bytes b((char*)senderName.c_str(),senderName.length()+1);
        os().write(b);
        os().flush();
    }

    void expectSenderSpecifier() {
        YARP_ASSERT(delegate!=NULL);
        delegate->expectSenderSpecifier(*this);
        ACE_DEBUG((LM_DEBUG,"Sender name is %s",getRoute().getFromName().c_str()));
    }

    int messageLen;
    bool pendingAck;
    Logger& log;
    ManagedBytes header;
    ManagedBytes number;
    ManagedBytes indexHeader;
    ShiftStream shift;
    Carrier *delegate;
    Route route;
    //BufferedConnectionWriter writer;
    SizedWriter *writer;
    StreamConnectionReader reader;
    ConnectionReader *altReader;

};

#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -