portcore.cpp

来自「一个语言识别引擎」· C++ 代码 · 共 914 行 · 第 1/2 页

CPP
914
字号
    stateMutex.post();
    YARP_DEBUG(log,"CLEANING scan done");
}


// only called by manager, in running phase
void PortCore::addInput(InputProtocol *ip) {
    YARP_ASSERT(ip!=NULL);
    stateMutex.wait();
    PortCoreUnit *unit = new PortCoreInputUnit(*this,ip,autoHandshake);
    YARP_ASSERT(unit!=NULL);
    unit->start();
  
    units.push_back(unit);
    YMSG(("there are now %d units\n", units.size()));
    stateMutex.post();
}


void PortCore::addOutput(OutputProtocol *op) {
    YARP_ASSERT(op!=NULL);

    stateMutex.wait();
    if (!finished) {
        PortCoreUnit *unit = new PortCoreOutputUnit(*this,op);
        YARP_ASSERT(unit!=NULL);
    
        unit->start();
    
        units.push_back(unit);
        //YMSG(("there are now %d units\n", units.size()));
    }
    stateMutex.post();
}


bool PortCore::isUnit(const Route& route) {
    // not mutexed
    bool needReap = false;
    if (!finished) {
        for (unsigned int i=0; i<units.size(); i++) {
            PortCoreUnit *unit = units[i];
            if (unit!=NULL) {
                Route alt = unit->getRoute();
                String wild = "*";
                bool ok = true;
                if (route.getFromName()!=wild) {
                    ok = ok && (route.getFromName()==alt.getFromName());
                }
                if (route.getToName()!=wild) {
                    ok = ok && (route.getToName()==alt.getToName());
                }
                if (route.getCarrierName()!=wild) {
                    ok = ok && (route.getCarrierName()==alt.getCarrierName());
                }
	
                if (ok) {
                    needReap = true;
                    break;
                }
            }
        }
    }
    //printf("Reporting %s as %d\n", route.toString().c_str(), needReap);
    return needReap;
}


bool PortCore::removeUnit(const Route& route, bool synch) {
    // a request to remove a unit
    // this is the trickiest case, since any thread could here
    // affect any other thread

    // how about waking up the manager to do this?
    stateMutex.wait();
    bool needReap = false;
    if (!finished) {
        for (unsigned int i=0; i<units.size(); i++) {
            PortCoreUnit *unit = units[i];
            if (unit!=NULL) {
                Route alt = unit->getRoute();
                String wild = "*";
                bool ok = true;
                if (route.getFromName()!=wild) {
                    ok = ok && (route.getFromName()==alt.getFromName());
                }
                if (route.getToName()!=wild) {
                    ok = ok && (route.getToName()==alt.getToName());
                }
                if (route.getCarrierName()!=wild) {
                    ok = ok && (route.getCarrierName()==alt.getCarrierName());
                }
	
                if (ok) {
                    YARP_DEBUG(log, 
                               String("removing unit ") + alt.toString());
					unit->setDoomed();
                    needReap = true;
                    if (route.getToName()!="*") {
						// not needed any more
                        //Companion::disconnectInput(alt.getToName().c_str(),
                        //                         alt.getFromName().c_str(),
                        //                       true);
						break;
                    }
                }
            }
        }
    }
    stateMutex.post();
    YARP_DEBUG(log,"should I reap?");
    if (needReap) {
        YARP_DEBUG(log,"reaping...");
        // death will happen in due course; we can speed it up a bit
        // by waking up the grim reaper
        try {
	        YARP_DEBUG(log,"reaping - send message...");
            OutputProtocol *op = face->write(address);
            if (op!=NULL) {
                op->close();
                delete op;
            }
	        YARP_DEBUG(log,"reaping - sent message...");

			if (synch) {
   	            YARP_DEBUG(log,"reaping - synch...");
				// wait until disconnection process is complete
				bool cont = false;
				do {
					//printf("Waiting for close to finish...\n");
					stateMutex.wait();
					cont = isUnit(route);
					if (cont) {
						connectionListeners++;
					}
					stateMutex.post();
					if (cont) {
						connectionChange.wait();
					}
				} while (cont);
				//printf("Waited for close to finish...\n");
			}

		} catch (IOException e) {
			YARP_DEBUG(log,"could not write to self");
            // no problem
        }
    }
    return needReap;
}




////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
//
// PortManager interface
//


void PortCore::addOutput(const String& dest, void *id, OutputStream *os) {
    BufferedConnectionWriter bw(true);

    Address parts = Name(dest).toAddress();
    Address address = NameClient::getNameClient().queryName(parts.getRegName());
    if (address.isValid()) {
        // as a courtesy, remove any existing connections between 
        // source and destination
        removeUnit(Route(getName(),address.getRegName(),"*"),true);

        OutputProtocol *op = NULL;
        try {
            op = Carriers::connect(address);
            if (op!=NULL) {
                op->open(Route(getName(),address.getRegName(),
                               parts.hasCarrierName()?parts.getCarrierName():"tcp"));
            }
        } catch (IOException e) { 
            YARP_DEBUG(log,e.toString() + " <<< open route error");            
            if (op!=NULL) {
                delete op;
                op = NULL;
            }
        }
        if (op!=NULL) {
            addOutput(op);
            bw.appendLine(String("Added output connection from ") + getName() + " to " + dest);
        } else {
            bw.appendLine(String("Cannot connect to ") + dest);
        }
    } else {
        bw.appendLine(String("Do not know how to connect to ") + dest);
    }

    if(os!=NULL) {
        bw.write(*os);
    }
    cleanUnits();
}

void PortCore::removeOutput(const String& dest, void *id, OutputStream *os) {
    BufferedConnectionWriter bw(true);
    if (removeUnit(Route("*",dest,"*"),true)) {
        bw.appendLine(String("Removed connection from ") + getName() +
                      " to " + dest);
    } else {
        bw.appendLine(String("Could not find an outgoing connection to ") +
                      dest);
    }
    if(os!=NULL) {
        bw.write(*os);
    }
    cleanUnits();
}

void PortCore::removeInput(const String& dest, void *id, OutputStream *os) {
    BufferedConnectionWriter bw(true);
    if (removeUnit(Route(dest,"*","*"),true)) {
        bw.appendLine(String("Removing connection from ") + dest + " to " +
                      getName());
    } else {
        bw.appendLine(String("Could not find an incoming connection from ") +
                      dest);
    }
    if(os!=NULL) {
        bw.write(*os);
    }
    cleanUnits();
}

void PortCore::describe(void *id, OutputStream *os) {
    cleanUnits();

    BufferedConnectionWriter bw(true);

    stateMutex.wait();

    bw.appendLine(String("This is ") + address.getRegName() + " at " + 
                  address.toString());

    int oct = 0;
    int ict = 0;
    for (unsigned int i=0; i<units.size(); i++) {
        PortCoreUnit *unit = units[i];
        if (unit!=NULL) {
            if (unit->isOutput()&&!unit->isFinished()) {
                Route route = unit->getRoute();
                String msg = "There is an output connection from " + 
                    route.getFromName() +
                    " to " + route.getToName() + " using " + 
                    route.getCarrierName();
                bw.appendLine(msg);
                oct++;
            }
        }
    }
    if (oct<1) {
        bw.appendLine("There are no outgoing connections");
    } 
    for (unsigned int i2=0; i2<units.size(); i2++) {
        PortCoreUnit *unit = units[i2];
        if (unit!=NULL) {
            if (unit->isInput()&&!unit->isFinished()) {
                Route route = unit->getRoute();
                String msg = "There is an input connection from " + 
                    route.getFromName() +
                    " to " + route.getToName() + " using " + 
                    route.getCarrierName();
                bw.appendLine(msg);
                ict++;
            }
        }
    }
    if (ict<1) {
        bw.appendLine("There are no incoming connections");
    } 

    stateMutex.post();

    if (os!=NULL) {
        bw.write(*os);
    }
}

void PortCore::readBlock(ConnectionReader& reader, void *id, OutputStream *os) {
    // pass the data on out

    // we are in the context of one of the input threads,
    // so our contact with the PortCore must be absolutely minimal.
    //
    // it is safe to pick up the address of the reader since this is 
    // constant over the lifetime of the input threads.

    if (this->reader!=NULL) {
        this->reader->read(reader);
    } else {
        // read and ignore
        YARP_DEBUG(Logger::get(),"data received in PortCore, no reader for it");
        Bottle b;
        b.read(reader);
    }
}


void PortCore::send(Writable& writer, Readable *reader) {

    String envelopeString = envelope;
    //envelope = ""; // let user control wiping

    // pass the data to all output units.
    // for efficiency, it should be converted to block form first.
    // some ports may want text-mode, some may want binary, so there
    // may need to be two caches.

    // for now, just doing a sequential send with no caching.
    YMSG(("------- send in real\n"));

    stateMutex.wait();

    YMSG(("------- send in\n"));
    // The whole darned port is blocked on this operation.
    // How long the operation lasts will depend on these flags:
    //   waitAfterSend and waitBeforeSend,
    // set by setWaitAfterSend() and setWaitBeforeSend()
    if (!finished) {
        packetMutex.wait();
        PortCorePacket *packet = packets.getFreePacket();
        packet->setContent(&writer);
        packetMutex.post();
        YARP_ASSERT(packet!=NULL);
        for (unsigned int i=0; i<units.size(); i++) {
            PortCoreUnit *unit = units[i];
            if (unit!=NULL) {
                if (unit->isOutput() && !unit->isFinished()) {
                    YMSG(("------- -- inc\n"));
                    packet->inc();
                    YMSG(("------- -- presend\n"));
                    void *out = unit->send(writer,reader,(void *)packet,
                                           envelopeString,
                                           waitAfterSend,waitBeforeSend);
                    YMSG(("------- -- send\n"));
                    if (out!=NULL) {
                        packetMutex.wait();
                        ((PortCorePacket *)out)->dec();
                        packets.checkPacket((PortCorePacket *)out);
                        packetMutex.post();
                    }
                    YMSG(("------- -- dec\n"));
                }
            }
        }
        YMSG(("------- pack check\n"));
        packetMutex.wait();
        packet->dec();
        packets.checkPacket(packet);
        packetMutex.post();
        YMSG(("------- packed\n"));
    }
    //writer.onCompletion();
    YMSG(("------- send out\n"));
    //packetMutex.post();
    stateMutex.post();
    YMSG(("------- send out real\n"));

}



bool PortCore::isWriting() {
    bool writing = false;

    stateMutex.wait();

    if (!finished) {
        for (unsigned int i=0; i<units.size(); i++) {
            PortCoreUnit *unit = units[i];
            if (unit!=NULL) {
                if (unit->isOutput() && !unit->isFinished()) {
                    if (unit->isBusy()) {
                        writing = true;
                    } else {
						// not needed anymore, done by callback
						/*
                          void *tracker = unit->takeTracker();
                          if (tracker!=NULL) {
                          //YARP_INFO(log,"tracker returned...");
                          packetMutex.wait();
                          ((PortCorePacket *)tracker)->dec();
                          packets.checkPacket((PortCorePacket *)tracker);
                          packetMutex.post();
                          }
						*/
                    }
                }
            }
        }
    }

    stateMutex.post();

    return writing;
}



void PortCore::notifyCompletion(void *tracker) {
    YMSG(("starting notifyCompletion\n"));
    packetMutex.wait();
    if (tracker!=NULL) {
        ((PortCorePacket *)tracker)->dec();
        packets.checkPacket((PortCorePacket *)tracker);
    }
    packetMutex.post();
    YMSG(("stopping notifyCompletion\n"));
}


bool PortCore::setEnvelope(Writable& envelope) {
    BufferedConnectionWriter buf(true);
    bool ok = envelope.write(buf);
    if (ok) {
        setEnvelope(buf.toString());
    }
    return ok;
}


void PortCore::setEnvelope(const String& envelope) {
    this->envelope = envelope;
    for (unsigned int i=0; i<envelope.length(); i++) {
        if (this->envelope[i]<32) {
            this->envelope = this->envelope.substr(0,i);
            break;
        }
    }
    YARP_DEBUG(log,String("set envelope to ") + this->envelope);
}

String PortCore::getEnvelope() {
    return envelope;
}

bool PortCore::getEnvelope(Readable& envelope) {
    StringInputStream sis;
    sis.add(this->envelope.c_str());
    sis.add("\r\n");
    StreamConnectionReader sbr;
    Route route;
    sbr.reset(sis,NULL,route,0,true);
    //this->envelope = ""; // ley user control wiping
    return envelope.read(sbr);
}



⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?