portcore.cpp
来自「一个语言识别引擎」· C++ 代码 · 共 914 行 · 第 1/2 页
CPP
914 行
stateMutex.post();
YARP_DEBUG(log,"CLEANING scan done");
}
// only called by manager, in running phase
void PortCore::addInput(InputProtocol *ip) {
YARP_ASSERT(ip!=NULL);
stateMutex.wait();
PortCoreUnit *unit = new PortCoreInputUnit(*this,ip,autoHandshake);
YARP_ASSERT(unit!=NULL);
unit->start();
units.push_back(unit);
YMSG(("there are now %d units\n", units.size()));
stateMutex.post();
}
void PortCore::addOutput(OutputProtocol *op) {
YARP_ASSERT(op!=NULL);
stateMutex.wait();
if (!finished) {
PortCoreUnit *unit = new PortCoreOutputUnit(*this,op);
YARP_ASSERT(unit!=NULL);
unit->start();
units.push_back(unit);
//YMSG(("there are now %d units\n", units.size()));
}
stateMutex.post();
}
bool PortCore::isUnit(const Route& route) {
// not mutexed
bool needReap = false;
if (!finished) {
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
Route alt = unit->getRoute();
String wild = "*";
bool ok = true;
if (route.getFromName()!=wild) {
ok = ok && (route.getFromName()==alt.getFromName());
}
if (route.getToName()!=wild) {
ok = ok && (route.getToName()==alt.getToName());
}
if (route.getCarrierName()!=wild) {
ok = ok && (route.getCarrierName()==alt.getCarrierName());
}
if (ok) {
needReap = true;
break;
}
}
}
}
//printf("Reporting %s as %d\n", route.toString().c_str(), needReap);
return needReap;
}
bool PortCore::removeUnit(const Route& route, bool synch) {
// a request to remove a unit
// this is the trickiest case, since any thread could here
// affect any other thread
// how about waking up the manager to do this?
stateMutex.wait();
bool needReap = false;
if (!finished) {
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
Route alt = unit->getRoute();
String wild = "*";
bool ok = true;
if (route.getFromName()!=wild) {
ok = ok && (route.getFromName()==alt.getFromName());
}
if (route.getToName()!=wild) {
ok = ok && (route.getToName()==alt.getToName());
}
if (route.getCarrierName()!=wild) {
ok = ok && (route.getCarrierName()==alt.getCarrierName());
}
if (ok) {
YARP_DEBUG(log,
String("removing unit ") + alt.toString());
unit->setDoomed();
needReap = true;
if (route.getToName()!="*") {
// not needed any more
//Companion::disconnectInput(alt.getToName().c_str(),
// alt.getFromName().c_str(),
// true);
break;
}
}
}
}
}
stateMutex.post();
YARP_DEBUG(log,"should I reap?");
if (needReap) {
YARP_DEBUG(log,"reaping...");
// death will happen in due course; we can speed it up a bit
// by waking up the grim reaper
try {
YARP_DEBUG(log,"reaping - send message...");
OutputProtocol *op = face->write(address);
if (op!=NULL) {
op->close();
delete op;
}
YARP_DEBUG(log,"reaping - sent message...");
if (synch) {
YARP_DEBUG(log,"reaping - synch...");
// wait until disconnection process is complete
bool cont = false;
do {
//printf("Waiting for close to finish...\n");
stateMutex.wait();
cont = isUnit(route);
if (cont) {
connectionListeners++;
}
stateMutex.post();
if (cont) {
connectionChange.wait();
}
} while (cont);
//printf("Waited for close to finish...\n");
}
} catch (IOException e) {
YARP_DEBUG(log,"could not write to self");
// no problem
}
}
return needReap;
}
////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
//
// PortManager interface
//
void PortCore::addOutput(const String& dest, void *id, OutputStream *os) {
BufferedConnectionWriter bw(true);
Address parts = Name(dest).toAddress();
Address address = NameClient::getNameClient().queryName(parts.getRegName());
if (address.isValid()) {
// as a courtesy, remove any existing connections between
// source and destination
removeUnit(Route(getName(),address.getRegName(),"*"),true);
OutputProtocol *op = NULL;
try {
op = Carriers::connect(address);
if (op!=NULL) {
op->open(Route(getName(),address.getRegName(),
parts.hasCarrierName()?parts.getCarrierName():"tcp"));
}
} catch (IOException e) {
YARP_DEBUG(log,e.toString() + " <<< open route error");
if (op!=NULL) {
delete op;
op = NULL;
}
}
if (op!=NULL) {
addOutput(op);
bw.appendLine(String("Added output connection from ") + getName() + " to " + dest);
} else {
bw.appendLine(String("Cannot connect to ") + dest);
}
} else {
bw.appendLine(String("Do not know how to connect to ") + dest);
}
if(os!=NULL) {
bw.write(*os);
}
cleanUnits();
}
void PortCore::removeOutput(const String& dest, void *id, OutputStream *os) {
BufferedConnectionWriter bw(true);
if (removeUnit(Route("*",dest,"*"),true)) {
bw.appendLine(String("Removed connection from ") + getName() +
" to " + dest);
} else {
bw.appendLine(String("Could not find an outgoing connection to ") +
dest);
}
if(os!=NULL) {
bw.write(*os);
}
cleanUnits();
}
void PortCore::removeInput(const String& dest, void *id, OutputStream *os) {
BufferedConnectionWriter bw(true);
if (removeUnit(Route(dest,"*","*"),true)) {
bw.appendLine(String("Removing connection from ") + dest + " to " +
getName());
} else {
bw.appendLine(String("Could not find an incoming connection from ") +
dest);
}
if(os!=NULL) {
bw.write(*os);
}
cleanUnits();
}
void PortCore::describe(void *id, OutputStream *os) {
cleanUnits();
BufferedConnectionWriter bw(true);
stateMutex.wait();
bw.appendLine(String("This is ") + address.getRegName() + " at " +
address.toString());
int oct = 0;
int ict = 0;
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
if (unit->isOutput()&&!unit->isFinished()) {
Route route = unit->getRoute();
String msg = "There is an output connection from " +
route.getFromName() +
" to " + route.getToName() + " using " +
route.getCarrierName();
bw.appendLine(msg);
oct++;
}
}
}
if (oct<1) {
bw.appendLine("There are no outgoing connections");
}
for (unsigned int i2=0; i2<units.size(); i2++) {
PortCoreUnit *unit = units[i2];
if (unit!=NULL) {
if (unit->isInput()&&!unit->isFinished()) {
Route route = unit->getRoute();
String msg = "There is an input connection from " +
route.getFromName() +
" to " + route.getToName() + " using " +
route.getCarrierName();
bw.appendLine(msg);
ict++;
}
}
}
if (ict<1) {
bw.appendLine("There are no incoming connections");
}
stateMutex.post();
if (os!=NULL) {
bw.write(*os);
}
}
void PortCore::readBlock(ConnectionReader& reader, void *id, OutputStream *os) {
// pass the data on out
// we are in the context of one of the input threads,
// so our contact with the PortCore must be absolutely minimal.
//
// it is safe to pick up the address of the reader since this is
// constant over the lifetime of the input threads.
if (this->reader!=NULL) {
this->reader->read(reader);
} else {
// read and ignore
YARP_DEBUG(Logger::get(),"data received in PortCore, no reader for it");
Bottle b;
b.read(reader);
}
}
void PortCore::send(Writable& writer, Readable *reader) {
String envelopeString = envelope;
//envelope = ""; // let user control wiping
// pass the data to all output units.
// for efficiency, it should be converted to block form first.
// some ports may want text-mode, some may want binary, so there
// may need to be two caches.
// for now, just doing a sequential send with no caching.
YMSG(("------- send in real\n"));
stateMutex.wait();
YMSG(("------- send in\n"));
// The whole darned port is blocked on this operation.
// How long the operation lasts will depend on these flags:
// waitAfterSend and waitBeforeSend,
// set by setWaitAfterSend() and setWaitBeforeSend()
if (!finished) {
packetMutex.wait();
PortCorePacket *packet = packets.getFreePacket();
packet->setContent(&writer);
packetMutex.post();
YARP_ASSERT(packet!=NULL);
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
if (unit->isOutput() && !unit->isFinished()) {
YMSG(("------- -- inc\n"));
packet->inc();
YMSG(("------- -- presend\n"));
void *out = unit->send(writer,reader,(void *)packet,
envelopeString,
waitAfterSend,waitBeforeSend);
YMSG(("------- -- send\n"));
if (out!=NULL) {
packetMutex.wait();
((PortCorePacket *)out)->dec();
packets.checkPacket((PortCorePacket *)out);
packetMutex.post();
}
YMSG(("------- -- dec\n"));
}
}
}
YMSG(("------- pack check\n"));
packetMutex.wait();
packet->dec();
packets.checkPacket(packet);
packetMutex.post();
YMSG(("------- packed\n"));
}
//writer.onCompletion();
YMSG(("------- send out\n"));
//packetMutex.post();
stateMutex.post();
YMSG(("------- send out real\n"));
}
bool PortCore::isWriting() {
bool writing = false;
stateMutex.wait();
if (!finished) {
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
if (unit->isOutput() && !unit->isFinished()) {
if (unit->isBusy()) {
writing = true;
} else {
// not needed anymore, done by callback
/*
void *tracker = unit->takeTracker();
if (tracker!=NULL) {
//YARP_INFO(log,"tracker returned...");
packetMutex.wait();
((PortCorePacket *)tracker)->dec();
packets.checkPacket((PortCorePacket *)tracker);
packetMutex.post();
}
*/
}
}
}
}
}
stateMutex.post();
return writing;
}
void PortCore::notifyCompletion(void *tracker) {
YMSG(("starting notifyCompletion\n"));
packetMutex.wait();
if (tracker!=NULL) {
((PortCorePacket *)tracker)->dec();
packets.checkPacket((PortCorePacket *)tracker);
}
packetMutex.post();
YMSG(("stopping notifyCompletion\n"));
}
bool PortCore::setEnvelope(Writable& envelope) {
BufferedConnectionWriter buf(true);
bool ok = envelope.write(buf);
if (ok) {
setEnvelope(buf.toString());
}
return ok;
}
void PortCore::setEnvelope(const String& envelope) {
this->envelope = envelope;
for (unsigned int i=0; i<envelope.length(); i++) {
if (this->envelope[i]<32) {
this->envelope = this->envelope.substr(0,i);
break;
}
}
YARP_DEBUG(log,String("set envelope to ") + this->envelope);
}
String PortCore::getEnvelope() {
return envelope;
}
bool PortCore::getEnvelope(Readable& envelope) {
StringInputStream sis;
sis.add(this->envelope.c_str());
sis.add("\r\n");
StreamConnectionReader sbr;
Route route;
sbr.reset(sis,NULL,route,0,true);
//this->envelope = ""; // ley user control wiping
return envelope.read(sbr);
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?