📄 simplefacconnection.java
字号:
package com.sri.oaa2.simplefac;
import com.sri.oaa2.icl.*;
import com.sri.oaa2.lib.*;
import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import java.net.*;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
import java.io.*;
import org.apache.log4j.NDC;
/**
* Represents a connection to an agent
*/
public class SimpleFacConnection implements Runnable
{
// Logger
static Logger logger = Logger.getLogger(SimpleFacConnection.class.getName());
static Logger runLogger = Logger.getLogger(SimpleFacConnection.class.getName() + ".run");
static Logger sendTermLogger = Logger.getLogger(SimpleFacConnection.class.getName() + ".sendTerm");
// Maximum number of pending messages allowed; the lower this number is,
// the more in sync senders and receivers are. The higher it is, the
// more a sender can send, even when the receiver can't receive any more
// data.
public static final int MAXBACKLOG = 64;
// Socket timeout
public static final int SOCKETTIMEOUT = 30000;
// ID of agent on other side of connection
private ConnectionId id;
// List of incoming IclTerms that have not been processed
private ExecutorService pendingIncoming;
// List of outgoing IclTerms on this link
private ExecutorService pendingOutgoing;
private ArrayBlockingQueue inBuffer;
private ArrayBlockingQueue outBuffer;
// Object to get next IclTerm
private IclTermReceiver termReceiver;
// Socket
private Socket clientSocket;
// The facilitator
private SimpleFac fac;
// Connected?
private boolean connected;
// Name
private IclTerm name;
// Language
private IclTerm lang;
// Type
private IclTerm type;
// Format
private IclTerm format;
// Version
private IclTerm version;
// port number of ServerSocket which created this connection
private int listenPort;
// The thing that will do the output writing
private FormatWriter writer;
// Our address as addr(tcp(...))
private IclStruct ourAddress = null;
// Our address as address(ourAddress, id)
private IclStruct ourAddressWithId = null;
// Semaphore for waiting until output is ready
private Semaphore outputReadySem = new Semaphore(0);
// Boolean indicating output is ready
private boolean outputReady = false;
// Number of read_bbs that have been requested
private int numReadBB = 0;
// Goal number for OAA Version 2 messages
private long v2GoalNum = 0;
protected SimpleFacConnection()
{
}
/**
* Create using the given socket and facilitator
*/
public SimpleFacConnection(Socket s, SimpleFac f)
{
setFac(f);
setClientSocket(s);
try {
getClientSocket().setSoTimeout(20000);
}
catch(Exception e) {
throw new RuntimeException("SimpleFacConnection " + e.toString());
}
setupQueues();
}
/**
* Use given socket and id
*/
public SimpleFacConnection(ConnectionId id, Socket s, SimpleFac f)
{
setFac(f);
setId(id);
setClientSocket(s);
try {
getClientSocket().setSoTimeout(SOCKETTIMEOUT);
}
catch(Exception e) {
throw new RuntimeException("SimpleFacConnection " + e.toString());
}
connected = true;
setupQueues();
}
private void setupQueues()
{
inBuffer = new ArrayBlockingQueue(MAXBACKLOG);
outBuffer = new ArrayBlockingQueue(MAXBACKLOG);
pendingIncoming = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, inBuffer);
pendingOutgoing = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, outBuffer);
}
/**
* Set name
*/
protected final void setName(IclTerm n)
{
name = n;
}
/**
* Get name
*/
protected final IclTerm getName()
{
return name;
}
/**
* Set language
*/
protected final void setLang(IclTerm n)
{
lang = n;
}
/**
* Get language
*/
protected final IclTerm getLang()
{
return lang;
}
/**
* Set term receiver
*/
public final void setTermReceiver(IclTermReceiver t)
{
termReceiver = t;
}
/**
* Get term receiver
*/
protected final IclTermReceiver getTermReceiver()
{
return termReceiver;
}
/**
* Set type
*/
protected final void setType(IclTerm n)
{
type = n;
}
/**
* Get type
*/
protected final IclTerm getType()
{
return type;
}
protected final Semaphore getOutputReadySem()
{
return outputReadySem;
}
protected final void setOutputReadySem(Semaphore s)
{
outputReadySem = s;
}
protected synchronized final boolean getOutputReady()
{
return outputReady;
}
protected synchronized final void setOutputReady(boolean b)
{
outputReady = b;
}
protected final synchronized void setNumReadBB(int i)
{
numReadBB = i;
}
protected final synchronized int getNumReadBB()
{
return numReadBB;
}
protected final synchronized IclTerm getV2GoalId()
{
String s = "g_" + v2GoalNum;
++v2GoalNum;
return new IclStr(s);
}
/**
* Set format
*/
public final void setFormat(IclTerm n)
{
if(format != null) {
throw new RuntimeException("SimpleFacConnection.setFormat() called twice");
}
format = n;
try {
setFormatWriter(FormatWriterFactory.createFormatWriter(format, getClientSocket().getOutputStream(), getClientSocket().getSendBufferSize()));
}
catch(IOException ioe) {
throw new RuntimeException(ioe.toString());
}
}
/**
* Verify format is the same as the one set
*/
public final void verifyFormat(IclTerm n)
{
if(n == null) {
throw new RuntimeException("SimpleFacConnection.verifyFormat() null comparison");
}
if(getFormat() == null) {
throw new RuntimeException("SimpleFacConnection.verifyFormat() no previous format");
}
String toVerify = n.toString();
if(toVerify.equals("pure_text")) {
toVerify = "default";
}
if(!toVerify.equals(getFormat().toString())) {
throw new RuntimeException("SimpleFacConnection.verifyFormat() different formats; set format = " + getFormat().toString() + "; and format to verify = " + toVerify);
}
}
/**
* Get format
*/
public final IclTerm getFormat()
{
return format;
}
/**
* Set FormatWriter
*/
protected final void setFormatWriter(FormatWriter f)
{
writer = f;
}
/**
* Get FormatWriter
*/
protected final FormatWriter getFormatWriter()
{
return writer;
}
/**
* Set version
*/
protected final void setVersion(IclTerm n)
{
version = n;
}
/**
* Get version
*/
protected final IclTerm getVersion()
{
return version;
}
/**
* Set socket to use
*/
protected final void setClientSocket(Socket s)
{
clientSocket = s;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -