portcore.cpp
来自「一个语言识别引擎」· C++ 代码 · 共 914 行 · 第 1/2 页
CPP
914 行
// -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
/*
* Copyright (C) 2006 Paul Fitzpatrick
* CopyPolicy: Released under the terms of the GNU GPL v2.0.
*
*/
#include <yarp/InputProtocol.h>
#include <yarp/Logger.h>
#include <yarp/PortCore.h>
#include <yarp/BufferedConnectionWriter.h>
#include <yarp/NameClient.h>
#include <yarp/PortCoreInputUnit.h>
#include <yarp/PortCoreOutputUnit.h>
#include <yarp/StreamConnectionReader.h>
#include <yarp/Name.h>
#include <yarp/Companion.h>
#include <yarp/os/Network.h>
#include <yarp/os/Bottle.h>
#include <ace/OS_NS_stdio.h>
//#define YMSG(x) ACE_OS::printf x;
//#define YTRACE(x) YMSG(("at %s\n",x))
#define YMSG(x)
#define YTRACE(x)
using namespace yarp;
using namespace yarp::os;
/*
Phases:
dormant
listening
running
*/
PortCore::~PortCore() {
closeMain();
}
bool PortCore::listen(const Address& address) {
bool success = false;
YTRACE("PortCore::listen");
if (!address.isValid()) {
YARP_ERROR(log, "Port does not have a valid address");
return false;
}
YARP_ASSERT(address.isValid());
// try to enter listening phase
stateMutex.wait();
YARP_ASSERT(listening==false);
YARP_ASSERT(running==false);
YARP_ASSERT(closing==false);
YARP_ASSERT(finished==false);
YARP_ASSERT(face==NULL);
this->address = address;
setName(address.getRegName());
try {
face = Carriers::listen(address);
if (face==NULL) {
throw IOException("no carrier");
}
} catch (IOException e) {
//YMSG(("listen failed: %s\n",e.toString().c_str()));
if (face!=NULL) {
face->close();
delete face;
}
stateMutex.post();
throw e;
}
if (face!=NULL) {
listening = true;
success = true;
}
if (success) {
log.setPrefix(address.getRegName().c_str());
}
stateMutex.post();
// we have either entered listening phase (face=valid, listening=true)
// or remained in dormant phase
return success;
}
void PortCore::setReadHandler(Readable& reader) {
YARP_ASSERT(running==false);
YARP_ASSERT(this->reader==NULL);
this->reader = &reader;
}
void PortCore::setReadCreator(ReadableCreator& creator) {
YARP_ASSERT(running==false);
YARP_ASSERT(this->readableCreator==NULL);
this->readableCreator = &creator;
}
void PortCore::run() {
YTRACE("PortCore::run");
// enter running phase
YARP_ASSERT(listening==true);
YARP_ASSERT(running==false);
YARP_ASSERT(closing==false);
YARP_ASSERT(finished==false);
YARP_ASSERT(starting==true); // can only run if called from start
running = true;
starting = false;
stateMutex.post();
YTRACE("PortCore::run running");
// main loop
bool shouldStop = false;
while (!shouldStop) {
// block and wait for an event
InputProtocol *ip = NULL;
try {
ip = face->read();
YARP_DEBUG(log,"PortCore got something");
} catch (IOException e) {
YMSG(("read failed: %s\n",e.toString().c_str()));
}
// got an event, but before processing it, we check whether
// we should shut down
stateMutex.wait();
shouldStop |= closing;
events++;
//YMSG(("*** event count boost to %d\n", events));
stateMutex.post();
if (!shouldStop) {
// process event
//YMSG(("PortCore::run got something, but no processing yet\n"));
addInput(ip);
ip = NULL;
}
// the event normally gets handed off. If it remains, delete it.
if (ip!=NULL) {
try {
ip->close();
delete ip;
} catch (IOException e) {
YMSG(("input protocol close failed: %s\n",e.toString().c_str()));
}
ip = NULL;
}
reapUnits();
stateMutex.wait();
for (int i=0; i<connectionListeners; i++) {
connectionChange.post();
}
connectionListeners = 0;
stateMutex.post();
}
YTRACE("PortCore::run closing");
// closing phase
stateMutex.wait();
for (int i=0; i<connectionListeners; i++) {
connectionChange.post();
}
connectionListeners = 0;
finished = true;
stateMutex.post();
}
void PortCore::close() {
closeMain();
}
bool PortCore::start() {
YTRACE("PortCore::start");
stateMutex.wait();
YARP_ASSERT(listening==true);
YARP_ASSERT(running==false);
YARP_ASSERT(starting==false);
YARP_ASSERT(finished==false);
YARP_ASSERT(closing==false);
starting = true;
bool started = ThreadImpl::start();
if (!started) {
// run() won't be happening
stateMutex.post();
} else {
// wait for run() to change state
stateMutex.wait();
YARP_ASSERT(running==true);
stateMutex.post();
}
return started;
}
void PortCore::closeMain() {
YTRACE("PortCore::closeMain");
// Politely pre-disconnect inputs
finishing = true;
bool done = false;
String prevName = "";
while (!done) {
done = true;
String removeName = "";
stateMutex.wait();
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
if (unit->isInput()) {
if (!unit->isDoomed()) {
Route r = unit->getRoute();
String s = r.getFromName();
if (s.length()>=1) {
if (s[0]=='/') {
if (s!=getName()) {
if (s!=prevName) {
removeName = s;
done = false;
break;
}
}
}
}
}
}
}
}
stateMutex.post();
if (!done) {
yarp::os::Network::disconnect(removeName.c_str(),
getName().c_str(),
true);
prevName = removeName;
}
}
// politely remove all outputs
done = false;
while (!done) {
done = true;
Route removeRoute;
stateMutex.wait();
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
if (unit->isOutput()&&!unit->isFinished()) {
removeRoute = unit->getRoute();
if (removeRoute.getFromName()==getName()) {
done = false;
break;
}
}
}
}
stateMutex.post();
if (!done) {
// stray debugging message
//printf("SHOULD remove %s for %s\n",
// removeRoute.toString().c_str(),
// getName().c_str());
removeUnit(removeRoute,true);
}
}
stateMutex.wait();
bool stopRunning = running;
stateMutex.post();
if (stopRunning) {
// we need to stop the thread
stateMutex.wait();
closing = true;
stateMutex.post();
try {
// wake it up
OutputProtocol *op = face->write(address);
if (op!=NULL) {
op->close();
delete op;
}
} catch (IOException e) {
// no problem
}
join();
// should be finished
stateMutex.wait();
YARP_ASSERT(finished==true);
stateMutex.post();
// should down units - this is the only time it is valid to do this
closeUnits();
stateMutex.wait();
finished = false;
closing = false;
running = false;
stateMutex.post();
String name = getName();
if (name!=String("")) {
NameClient::getNameClient().unregisterName(name);
}
}
// there should be no other threads at this point
// can stop listening
if (listening) {
YARP_ASSERT(face!=NULL);
try {
face->close();
delete face;
} catch (IOException e) {
YMSG(("face close failed: %s\n",e.toString().c_str()));
}
face = NULL;
listening = false;
}
// Check if someone is waiting for input. If so, wake them up
if (reader!=NULL) {
// send empty data out
YARP_DEBUG(log,"sending end-of-port message to listener");
StreamConnectionReader sbr;
reader->read(sbr);
reader = NULL;
}
// fresh as a daisy
YARP_ASSERT(listening==false);
YARP_ASSERT(running==false);
YARP_ASSERT(starting==false);
YARP_ASSERT(closing==false);
YARP_ASSERT(finished==false);
YARP_ASSERT(face==NULL);
}
int PortCore::getEventCount() {
stateMutex.wait();
int ct = events;
stateMutex.post();
return ct;
}
void PortCore::closeUnits() {
stateMutex.wait();
YARP_ASSERT(finished==true); // this is the only valid phase for this
stateMutex.post();
// in the "finished" phase, nobody else touches the units,
// so we can go ahead and shut them down and delete them
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
YARP_DEBUG(log,"closing a unit");
unit->close();
YARP_DEBUG(log,"joining a unit");
unit->join();
delete unit;
YARP_DEBUG(log,"deleting a unit");
units[i] = NULL;
}
}
units.clear();
//YMSG(("closeUnits: there are now %d units\n", units.size()));
}
void PortCore::reapUnits() {
stateMutex.wait();
if (!finished) {
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
if (unit->isDoomed()&&!unit->isFinished()) {
YARP_DEBUG(log,"REAPING a unit");
//printf("Reaping...%s\n", unit->getRoute().toString().c_str());
unit->close();
YARP_DEBUG(log,"closed REAPING a unit");
unit->join();
//printf("done Reaping...%s\n", unit->getRoute().toString().c_str());
YARP_DEBUG(log,"done REAPING a unit");
}
}
}
}
stateMutex.post();
cleanUnits();
}
void PortCore::cleanUnits() {
YARP_DEBUG(log,"CLEANING scan");
stateMutex.wait();
if (!finished) {
for (unsigned int i=0; i<units.size(); i++) {
PortCoreUnit *unit = units[i];
if (unit!=NULL) {
YARP_DEBUG(log,String("checking ") + unit->getRoute().toString());
if (unit->isFinished()) {
YARP_DEBUG(log,"CLEANING a unit");
try {
unit->close();
unit->join();
} catch (IOException e) {
YARP_DEBUG(log,e.toString() + " <<< cleanUnits error");
}
delete unit;
units[i] = NULL;
YARP_DEBUG(log,"done CLEANING a unit");
}
}
}
unsigned int rem = 0;
for (unsigned int i2=0; i2<units.size(); i2++) {
if (units[i2]!=NULL) {
if (rem<i2) {
units[rem] = units[i2];
units[i2] = NULL;
}
rem++;
}
}
for (unsigned int i3=0; i3<units.size()-rem; i3++) {
units.pop_back();
}
//YMSG(("cleanUnits: there are now %d units\n", units.size()));
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?