📄 twoph0initiator.java
字号:
* <code>REPLY_KEY</code> key.
* @param b the Behaviour that will handle this state
*/
public void registerHandlePropose(Behaviour b) {
registerState(b, HANDLE_PROPOSE);
b.setDataStore(getDataStore());
}
/**
* This method allows to register a user defined <code>Behaviour</code> in the
* HANDLE_ALL_RESPONSES state. This behaviour would override the homonymous method.
* This method also set the data store of the registered <code>Behaviour</code> to
* the DataStore of this current behaviour. The registered behaviour can retrieve
* the vector of ACLMessage proposes, failures, pending and responses from the
* datastore at <code>ALL_PROPOSES_KEY</code>, <code>ALL_FAILURES_KEY</code>,
* <code>ALL_PH0_PENDINGS_KEY</code> and <code>output</code> field.
* @param b the Behaviour that will handle this state
*/
public void registerHandleAllResponses(Behaviour b) {
registerState(b, HANDLE_ALL_RESPONSES);
b.setDataStore(getDataStore());
}
/* User CAN'T override these methods */
//#APIDOC_EXCLUDE_BEGIN
/**
*/
protected String[] getToBeReset() {
if (toBeReset == null) {
toBeReset = new String[] {
HANDLE_PROPOSE,
HANDLE_NOT_UNDERSTOOD,
HANDLE_FAILURE,
HANDLE_OUT_OF_SEQ
};
}
return toBeReset;
}
/**
* Prepare vector containing cfps.
* @param initiation cfp passed in the constructor
* @return Vector of cfps
*/
protected final Vector prepareInitiations(ACLMessage initiation) {
return prepareCfps(initiation);
}
/**
* This method sets for all prepared cfps <code>conversation-id</code> slot (with
* value passed in the constructor), <code>protocol</code> slot and
* <code>reply-with</code> slot with a unique value
* constructed by concatenating receiver's agent name and phase number (i.e. 0).
* After that it sends all cfps.
* @param initiations vector prepared in PREPARE_CFPS state
*/
protected final void sendInitiations(Vector initiations) {
getDataStore().put(ALL_PENDINGS_KEY, new Vector());
super.sendInitiations(initiations);
totSessions = sessions.size();
}
/**
* Check whether a reply is in-sequence and than update the appropriate Session
* and removes corresponding cfp from vector of pendings.
* @param reply message received
* @return true if reply is compliant with flow of protocol, false otherwise
*/
protected final boolean checkInSequence(ACLMessage reply) {
boolean ret = false;
String inReplyTo = reply.getInReplyTo();
Session s = (Session) sessions.get(inReplyTo);
if(s != null) {
int perf = reply.getPerformative();
if(s.update(perf)) {
// The reply is compliant to the protocol
((Vector) getDataStore().get(ALL_RESPONSES_KEY)).add(reply);
if (perf == ACLMessage.PROPOSE) {
((Vector) getDataStore().get(ALL_PROPOSES_KEY)).add(reply);
}
updatePendings(inReplyTo);
ret = true;
}
if(s.isCompleted()) {
sessions.remove(inReplyTo);
}
}
return ret;
}
private void updatePendings(String key) {
Vector pendings = (Vector) getDataStore().get(ALL_PENDINGS_KEY);
for(int i=0; i<pendings.size(); i++) {
ACLMessage pendingMsg = (ACLMessage) pendings.get(i);
if(pendingMsg.getReplyWith().equals(key)) {
pendings.remove(i);
break;
}
}
}
/**
* Check if there are still active sessions or if timeout is expired.
* @param reply last message received
* @return ALL_RESPONSES_RECEIVED or -1 (still active sessions)
*/
protected final int checkSessions(ACLMessage reply) {
if (reply == null) {
// Timeout expired --> clear all sessions
sessions.clear();
}
if (sessions.size() == 0) {
// We have finished --> fill the Vector of initiation messages for next
// phase (unless already filled by the user)
DataStore ds = getDataStore();
Vector nextPhMsgs = (Vector) ds.get(outputKey);
if (nextPhMsgs.size() == 0) {
Vector proposes = (Vector) ds.get(ALL_PROPOSES_KEY);
Vector pendings = (Vector) ds.get(ALL_PENDINGS_KEY);
fillNextPhInitiations(nextPhMsgs, proposes, pendings);
}
return ALL_RESPONSES_RECEIVED;
}
else {
// We are still waiting for some responses
return -1;
}
}
private void fillNextPhInitiations(Vector nextPhMsgs, Vector proposes, Vector pendings) {
if (proposes.size() == totSessions) {
// All responders replied with PROPOSE --> Fill the vector
// of initiation messages for next phase with QUERY_IF
for(int i=0; i<proposes.size(); i++) {
ACLMessage msg = (ACLMessage) proposes.get(i);
ACLMessage queryIf = msg.createReply();
queryIf.setPerformative(ACLMessage.QUERY_IF);
nextPhMsgs.add(queryIf);
}
}
else {
// At least one responder failed or didn't reply --> Fill the vector
// of initiation messages for next phase with REJECT_PROPOSALS
for(int i=0; i<proposes.size(); i++) {
ACLMessage msg = (ACLMessage) proposes.get(i);
ACLMessage reject = msg.createReply();
reject.setPerformative(ACLMessage.REJECT_PROPOSAL);
nextPhMsgs.add(reject);
}
for(int i=0; i<pendings.size(); i++) {
ACLMessage msg = (ACLMessage) pendings.get(i);
ACLMessage reject = (ACLMessage) msg.clone();
reject.setPerformative(ACLMessage.REJECT_PROPOSAL);
nextPhMsgs.add(reject);
}
}
}
/**
* Initialize the data store.
* @param msg Message passed in the constructor
*/
protected void initializeDataStore(ACLMessage msg) {
super.initializeDataStore(msg);
getDataStore().put(ALL_RESPONSES_KEY, new Vector());
getDataStore().put(ALL_PROPOSES_KEY, new Vector());
getDataStore().put(outputKey, new Vector());
}
//#APIDOC_EXCLUDE_END
protected ProtocolSession getSession(ACLMessage msg, int sessionIndex) {
Vector pendings = (Vector) getDataStore().get(ALL_PENDINGS_KEY);
pendings.add(msg);
return new Session("R" + hashCode()+ "_" + Integer.toString(sessionIndex) + "_" + TwoPhConstants.PH0);
}
/**
* Inner class Session
*/
class Session implements ProtocolSession, Serializable {
// Session states
static final int INIT = 0;
static final int REPLY_RECEIVED = 1;
private int state = INIT;
private String myId;
public Session(String id) {
myId = id;
}
public String getId() {
return myId;
}
/**
* Return true if received ACLMessage is consistent with the protocol.
* @param perf
* @return Return true if received ACLMessage is consistent with the protocol
*/
public boolean update(int perf) {
if(state == INIT) {
switch(perf) {
case ACLMessage.PROPOSE:
case ACLMessage.FAILURE:
case ACLMessage.NOT_UNDERSTOOD:
state = REPLY_RECEIVED;
return true;
default:
return false;
}
}
else {
return false;
}
}
public int getState() {
return state;
}
public boolean isCompleted() {
return (state == REPLY_RECEIVED);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -