📄 incomingmessagehandler.java
字号:
getConn().setVersion(version);
getConn().sendTerm(getConn().generateConnected(), true);
}
/**
* Handle update data request
*/
protected final void handleUpdateData(IclTerm event, IclTerm params)
{
//RECEIVE: term(event(ev_update_data(g_11,add,agent_host(addr(tcp('130.107.65.150',3378),1),'TestOaaListener',agent1),[address(addr(tcp('130.107.65.150',3378)))]),[])) }
//REPLY : term(event(ev_data_updated(g_11,add,agent_host(addr(tcp('130.107.65.150',3378),1),'TestOaaListener',agent1),[address(addr(tcp('130.107.65.150',3378)))],[addr(tcp('130.107.65.150',3378))],[addr(tcp('130.107.65.150',3378))]),[address(addr(tcp('130.107.65.150',3378),1))])).
if(event.size() != 4) {
getConn().shutdown(new Exception("handleUpdateData: Received an event whose size was not 4:\n" +
event + "\nparams: " + params));
return;
}
IclTerm goalId = event.getTerm(0);
IclTerm mode = event.getTerm(1);
IclTerm host = event.getTerm(2);
IclTerm goalParams = event.getTerm(3);
IclTerm reply = new IclStruct("ev_data_updated");
reply.add(goalId);
reply.add(mode);
reply.add(host);
reply.add(goalParams);
reply.add(new IclList(getConn().getAddress()));
reply.add(new IclList(getConn().getAddress()));
IclStruct wrappedReply = new IclStruct("event");
wrappedReply.add(reply);
IclStruct addrWithId = (IclStruct)(getConn().getAddress().clone());
addrWithId.add(getConn().getId().toIclTerm());
wrappedReply.add(new IclList(new IclStruct("address", addrWithId)));
getConn().sendTerm(wrappedReply, true);
}
/**
* Register the solvables with the facilitator
*/
protected final void handleRegisterSolvables(IclTerm event, IclTerm params)
{
// term(event(ev_register_solvables(add,[solvable(broadcastif(X1,X2),[],[]),solvable(echo_me(P1,P2,P3,P4),[],[]),solvable(echo_me(P1,P2,P3),[],[]),solvable(echo_me(Msg),[],[])],'TestOaaListener',[if_exists(overwrite)]),[]))
// ev_register_solvables(add,[solvable(echo_me(Msg),[callback(myDefaultCallback)],[]),solvable(echo_me(P1,P2,P3),[callback(myDefaultCallback)],[]),solvable(echo_me(P1,P2,P3,P4),[callback(myDefaultCallback)],[]),solvable(broadcastif(X1,X2),[callback(myDefaultCallback)],[])],'OaaListener',[if_exists(overwrite)])
//IclTerm mode = event.getTerm(0);
IclTerm solvables = event.getTerm(1);
//IclTerm name = event.getTerm(2);
//IclTerm eventParams = event.getTerm(3);
// 1 DEBUG [IncomingMessageHandler:1:ev_register_solvables] (IncomingMessageHandler.handleRegisterSolvables) - Registering solvables: [solvable(broadcastif(X1,X2),[],[]),solvable(echo_me(P1,P2,P3,P4),[],[]),solvable(echo_me(P1,P2,P3),[],[]),solvable(echo_me(Msg),[],[])]
// 6617 DEBUG [IncomingMessageHandler:2:ev_register_solvables] (IncomingMessageHandler.handleRegisterSolvables) - Registering solvables: [solvable(echo_me(Msg),[callback(myDefaultCallback)],[]),solvable(echo_me(P1,P2,P3),[callback(myDefaultCallback)],[]),solvable(echo_me(P1,P2,P3,P4),[callback(myDefaultCallback)],[]),solvable(broadcastif(X1,X2),[callback(myDefaultCallback)],[])]
if(loggerRegisterSolvables.isDebugEnabled()) {
loggerRegisterSolvables.debug("Registering solvables: " + solvables);
}
HashSet solvSet = new HashSet();
IclTerm oneSolv;
for(int i = 0; i < solvables.size(); ++i) {
oneSolv = StandardizeVars.getInstance().from(solvables.getTerm(i));
solvSet.add(oneSolv.getTerm(0));
}
getConn().getFac().addSolvablesForId(getConn().getId(), solvSet);
}
/**
* Release the semaphore controlling whether an unforced message can be sent
*/
protected final void handleReady(IclTerm event, IclTerm params)
{
// term(event(ev_ready('TestOaaListener'),[]))
getConn().setOutputReady(true);
getConn().getOutputReadySem().release();
}
public final String makeLongMessage(int index)
{
int third = 50;
StringBuffer b = new StringBuffer(1024);
b.append("echo_me(");
b.append(Integer.toString(index));
b.append(",blah(");
for(int i = 0; i < third; ++i) {
b.append("goodbye,");
}
b.deleteCharAt(b.length() - 1);
b.append("),blah(");
for(int i = 0; i < third; ++i) {
b.append("hello,");
}
b.deleteCharAt(b.length() - 1);
b.append("),[");
for(int i = 0; i < third; ++i) {
b.append("what,");
}
b.deleteCharAt(b.length() - 1);
b.append("])");
return b.toString();
}
/**
* Send solvable out to all agents that can handle the solvable request.
* For queries that request some kind of blocking behaviour, we would
* normally create a future in the SimpleFacConnection and get() the
* result, but for now, we'll just output some message saying we don't
* handle that kind of message.
*/
protected final void handleSolve(IclTerm event, IclTerm params)
{
long start, end;
start = System.currentTimeMillis();
/* This is just for testing how fast a receiver can receive messages
for(int i = 0; i < 1000; ++i) {
String goalIdStr = "g_" + Integer.toString(i + 12);
StringBuffer eventStr = new StringBuffer();
eventStr.append("ev_solve(");
eventStr.append(goalIdStr);
eventStr.append(',');
eventStr.append(makeLongMessage(i));
eventStr.append(",[reply(none)])");
IclTerm newEvent = IclTerm.fromString(true, eventStr.toString());
event = newEvent;
*/
// term(event(ev_solve(g_12,echo_me(0,blah(goodbye)),[reply(none)]),[]))
if(event.size() != 3) {
loggerSolve.error("IncomingMessageHandler.handleSolve(): ev_solve message has incorrect number of arguments. Ignoring event: " + event.toString());
return;
}
IclTerm goalId = event.getTerm(0);
IclTerm goal = event.getTerm(1);
IclTerm goalParamsTerm = null;
if(event.size() > 2) {
goalParamsTerm = event.getTerm(2);
}
else {
loggerSolve.warn("IncomingMessageHandler.handleSolve(): will assume reply(none) as parameter");
if(loggerSolve.isDebugEnabled()) {
loggerSolve.debug("IncomingMessageHandler.handleSolve(): event that had parameters missing was: " + event.toString());
}
goalParamsTerm = IclTermCache.get("[reply(none)]");
}
IclList goalParams;
if(goalParamsTerm == null) {
loggerSolve.error("IncomingMessageHandler.handleSolve(): goalParamsTerm is null--unexpected");
return;
}
if(goalParamsTerm.isList()) {
goalParams = (IclList)event.getTerm(2);
}
else {
goalParams = null;
}
IclTerm reply = IclUtils.getParamValue("reply", new IclStr("true"), goalParams);
IclTerm blocking = IclUtils.getParamValue("block", new IclStr("false"), goalParams);
String replyVal = reply.toIdentifyingString();
String blockingVal = blocking.toIdentifyingString();
StandardizeVars.getInstance().from(goal);
HashSet ids = getConn().getFac().whoCanSolve(goal);
if(loggerSolve.isDebugEnabled()) {
loggerSolve.debug("IncomingMessageHandler.handleSolve(): able to solve: " + ids);
}
if(ids == null) {
if(loggerSolve.isDebugEnabled()) {
loggerSolve.debug("IncomingMessageHandler.handleSolve() No agents can solve goal: " + goal.toString() + "]");
}
else if(loggerSolve.isInfoEnabled()) {
loggerSolve.info("IncomingMessageHandler.handleSolve() No agents can solve goal");
}
return;
}
if(replyVal.equals("none") || blockingVal.equals("false")) {
}
else {
if(loggerSolve.isEnabledFor(Priority.WARN)) {
loggerSolve.warn("IncomingMessageHandler.handleSolve() Solvable would block; ignoring: [" + goal.toString() + "]");
}
return;
}
// term(event(ev_solve(id(g_32,1),echo_me(0,blah(goodbye)),[reply(none)]),[address(addr(tcp('127.0.0.1',3378),1)),origin(addr(tcp('127.0.0.1',3378),3))]))
SimpleFacConnection conn;
Iterator it = ids.iterator();
ConnectionId id;
IclTerm connGoalId = null;
IclTerm origin = null;
IclTerm dest = null;
IclTerm outEvSolv;
IclTerm toSend;
if(SimpleFacConnection.sendTermLogger.isDebugEnabled()) {
++getConn().numTermsSent;
if((getConn().numTermsSent % 10) == 6) {
SimpleFacConnection.sendTermLogger.debug("Sending term number " + getConn().numTermsSent);
}
}
origin = new IclStruct("origin", getConn().getAddressWithId());
while(it.hasNext()) {
id = (ConnectionId)it.next();
if(loggerSolve.isDebugEnabled()) {
loggerSolve.debug("IncomingMessageHandler.handleSolve(): will try to send message [" +
goal.toString() + "] to agent " + id);
}
conn = getConn().getFac().getConnectionForId(id);
if(conn != null) {
if((conn.getVersion() != null) && (conn.getVersion().toString().equals("2.0"))) {
// term(event(KS_NAME, solve(goalId, goal, params))).
connGoalId = new IclStruct("id", goalId, id.toIclTerm());
outEvSolv = new IclStruct("solve", connGoalId, goal, goalParams);
toSend = new IclStruct("event", connGoalId, outEvSolv);
conn.sendTerm(toSend);
}
else {
connGoalId = new IclStruct("id", goalId, id.toIclTerm());
dest = new IclStruct("address", (IclTerm)conn.getAddressWithId().clone());
outEvSolv = new IclStruct("ev_solve", connGoalId, goal, goalParams);
toSend = new IclStruct("event", outEvSolv, new IclList(dest, origin));
conn.sendTerm(toSend);
}
}
}
if(loggerSolve.isInfoEnabled()) {
end = System.currentTimeMillis();
loggerSolve.info("IncomingMessageHandler.handleSolve(): time to end of handleSolve = " + (end - start));
}
}
protected final void handleVersion2(IclTerm t)
{
String name;
if(t.isStruct()) {
try {
name = ToFunctor.getInstance().from(t);
}
catch(UnsupportedOperationException uoe) {
loggerV2.warn("IncomingMessageHandler.handleVersion2() expecting IclStruct, got: [" + t.toString() + "]");
return;
}
}
else if(t.isStr()) {
loggerV2.warn("IncomingMessageHnandler.handleVersion2() ignoring IclStr: [" + t.toString() + "]");
return;
}
else {
loggerV2.warn("IncomingMessageHandler.handleVersion2() expecting IclStruct or IclStr, got: [" + t.toString() + "]");
return;
}
if(name.equals("post_query")) {
if(loggerV2.isDebugEnabled()) {
loggerV2.debug("IncomingMessageHandler.handleVersion2() found post_query");
}
handleV2PostQuery(t);
}
else if(name.equals("write_bb")) {
if(loggerV2.isDebugEnabled()) {
loggerV2.debug("IncomingMessageHandler.handleVersion2() found write_bb");
}
handleV2WriteBB(t);
}
else if(name.equals("read_bb")) {
if(loggerV2.isDebugEnabled()) {
loggerV2.debug("IncomingMessageHandler.handleVersion2() found read_bb");
}
handleV2ReadBB(t);
}
else if(name.equals("replace_bb")) {
if(loggerV2.isDebugEnabled()) {
loggerV2.debug("IncomingMessageHandler.handleVersion2() found replace_bb");
}
if((t.size() == 3) &&
(t.getTerm(1).isList() && (t.getTerm(1).size() == 4)) &&
(t.getTerm(2).isList() && (t.getTerm(2).size() == 4)) &&
(t.getTerm(2).getTerm(1).isStr() && t.getTerm(2).getTerm(1).toIdentifyingString().equals("ready"))) {
handleReady(null, null);
return;
}
loggerV2.warn("IncomingMessageHandler.handleVersion2() ignoring replace_bb [" + t.toString() + "]");
return;
}
else if(name.equals("register_solvable_goals")) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -