📄 simplefacconnection.java
字号:
/**
* Get socket in use
*/
protected final Socket getClientSocket()
{
return clientSocket;
}
/**
* Set facilitator
*/
protected final void setFac(SimpleFac f)
{
fac = f;
}
/**
* Get the facilitator
*/
protected final SimpleFac getFac()
{
return fac;
}
/**
* Set id of agent on other side of connection
*/
protected final void setId(ConnectionId id)
{
this.id = id;
}
/**
* Get id of agent on other side of connection
*/
protected final ConnectionId getId()
{
return id;
}
/**
* Request disconnect
*/
public final void disconnect()
{
connected = false;
}
/**
* Check if disconnect requested
*/
public final boolean isConnected()
{
return connected;
}
/**
* If not ready for output, wait until it is.
*/
protected final void waitOutputUntilReady()
{
while(!getOutputReady()) {
try {
getOutputReadySem().acquire();
getOutputReadySem().release();
}
catch(InterruptedException ie) {
shutdown(ie);
return;
}
}
}
public synchronized void shutdown()
{
shutdown(new Exception("Unknown error"));
}
public synchronized void shutdown(Throwable ex)
{
boolean pushed = false;
if(getName() != null) {
NDC.push(getName().toString());
pushed = true;
}
disconnect();
if(logger.isInfoEnabled()) {
IclTerm iclName = getName();
String name;
if(iclName != null) {
name = iclName.toString();
}
else {
name = "unknown";
}
logger.info("SimpleFacConnection.shutdown(): for agent " + id);
if(logger.isDebugEnabled()) {
logger.debug(" Number of pending incoming tasks: " + inBuffer.size());
logger.debug(" Number of pending outgoing tasks: " + outBuffer.size());
}
if(logger.isDebugEnabled()) {
RuntimeException re = new RuntimeException();
re.fillInStackTrace();
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
re.printStackTrace(pw);
pw.flush();
logger.debug("SimpleFacConnection.shutdown() called: " + sw.toString());
}
}
try {
logger.warn("SimpleFacConnection.shutdown(): closing client socket for agent " + id);
getClientSocket().close();
}
catch(IOException ioe) {
throw new RuntimeException("SimpleFacConnection::shutdown could not close client socket");
}
pendingIncoming.shutdown();
pendingOutgoing.shutdown();
if(getFac() != null) {
getFac().releaseId(getId());
}
if(pushed) {
NDC.pop();
}
}
/**
* Send a term. This should actually use some factory to get an appropriate
* OutgoingMessageHandler. The factory is determined by the format that the
* connection accepts.
*/
public void sendTerm(IclTerm toSend)
{
if(!isConnected()) {
if(logger.isEnabledFor(Priority.WARN)) {
logger.warn("SimpleFacConnection.sendTerm(IclTerm): could not send; not connected");
}
return;
}
IclStruct term = new IclStruct("term", toSend);
OutgoingMessageHandler handler = new OutgoingMessageHandler(term, this);
sendTerm(term, handler);
}
/**
* Send term; optionally forcing it (by ignoring whether the client has
* send an ev_ready event).
*
* @see #sendTerm(IclTerm,OutgoingMessageHandler)
*/
public final void sendTerm(IclTerm toSend, boolean force)
{
if(!isConnected()) {
if(logger.isEnabledFor(Priority.WARN)) {
logger.warn("SimpleFacConnection.sendTerm(IclTerm): could not send; not connected");
}
return;
}
IclStruct term = new IclStruct("term", toSend);
OutgoingMessageHandler handler = new OutgoingMessageHandler(term, this);
handler.setForce(force);
sendTerm(term, handler);
}
protected final void sendTerm(IclTerm toSend, OutgoingMessageHandler handler)
{
try {
if(logger.isDebugEnabled()) {
logger.debug("Sending " + toSend.toString());
}
//synchronized(this) {
// handler.run();
//}
pendingOutgoing.execute(handler);
}
catch(RejectedExecutionException ie) {
shutdown(ie);
}
}
public int numTermsSent = 0;
/**
* Loop until disconnected, grabbing terms to be handled, creating handlers for
* them and putting the handlers on the pendingIncoming Executor queue.
*/
public void run()
{
IclTerm t;
IncomingMessageHandler handler = null;
GETTING_TERMS:
while(true) {
if(!isConnected()) {
shutdown(new Exception("Unexpected: not connected"));
return;
}
t = null;
try {
if(runLogger.isDebugEnabled()) {
runLogger.debug("SimpleFacConnection.run() requesting term");
}
t = getTermReceiver().getNextTerm();
if(runLogger.isDebugEnabled()) {
runLogger.debug("SimpleFacConnection.run() got term");
}
}
catch(Exception e) {
}
if(!getTermReceiver().isConnected()) {
shutdown(getTermReceiver().getCloseException());
return;
}
if(t != null) {
if(runLogger.isDebugEnabled()) {
runLogger.debug("SimpleFacConnection.run() giving term " + t.toString() + " to IncomingMessageHandler");
}
handler = new IncomingMessageHandler(t, this);
try {
pendingIncoming.execute(handler);
if(runLogger.isDebugEnabled()) {
runLogger.debug("SimpleFacConnection.run() pendingIncoming queue is length " + inBuffer.size());
}
}
catch(RejectedExecutionException ie) {
shutdown(ie);
return;
}
continue GETTING_TERMS;
}
}
}
/**
* Set server socket port number
*/
protected final void setListenPort(int p)
{
listenPort = p;
}
/**
* Get server socket port number
*/
protected final int getListenPort()
{
return listenPort;
}
/**
* Set our address
*/
protected final void setAddress(IclStruct t)
{
ourAddress = t;
}
/**
* Get (and possibly compute and set) our address
*/
public synchronized final IclStruct getAddress()
{
if(ourAddress != null) {
return ourAddress;
}
String hostAddr = "";
try {
hostAddr = InetAddress.getLocalHost().getHostAddress();
}
catch(UnknownHostException uhe) {
throw new RuntimeException(uhe.toString());
}
IclStruct addrPort = new IclStruct("tcp",
new IclStr("'" +
hostAddr +
"'"),
new IclInt(getListenPort()));
setAddress(new IclStruct("addr", addrPort));
return ourAddress;
}
public synchronized final IclStruct getAddressWithId()
{
if(ourAddressWithId != null) {
return ourAddressWithId;
}
ourAddressWithId = (IclStruct)getAddress().clone();
ourAddressWithId.add(getId().toIclTerm());
return ourAddressWithId;
}
public final IclTerm generateConnected()
{
String hostAddr = "";
IclStruct event = new IclStruct("event");
IclStruct ev_connected = new IclStruct("ev_connected");
IclList params = new IclList();
IclStruct addrWithId = (IclStruct)getAddress().clone();
try {
addrWithId.add(getId().toIclTerm());
}
catch(Exception e) {
throw new RuntimeException(e.toString());
}
IclStruct addr = new IclStruct("oaa_address", addrWithId);
IclStruct other_id = new IclStruct("other_id", new IclInt(0));
IclStruct other_type = new IclStruct("other_type", new IclStr("facilitator"));
IclStruct other_name = new IclStruct("other_name", new IclStr("root"));
IclStruct other_language = new IclStruct("other_language", new IclStr("java"));
IclStruct other_version = new IclStruct("other_version", LibOaa.getLibraryVersion());
IclStruct other_dialect = new IclStruct("other_version", new IclStr("sicstus"));
IclStruct format = new IclStruct("format", new IclStr("default"));
params.add(addr);
params.add(other_id);
params.add(other_type);
params.add(other_name);
params.add(other_language);
params.add(other_version);
params.add(other_dialect);
params.add(format);
ev_connected.add(params);
event.add(ev_connected);
event.add(new IclList());
return event;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -