⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 twoph0initiator.java

📁 java实现的P2P多agent中间件
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     * <code>REPLY_KEY</code> key.
     * @param b the Behaviour that will handle this state
     */
    public void registerHandlePropose(Behaviour b) {
        registerState(b, HANDLE_PROPOSE);
        b.setDataStore(getDataStore());
    }

    /**
     * This method allows to register a user defined <code>Behaviour</code> in the
     * HANDLE_ALL_RESPONSES state. This behaviour would override the homonymous method.
     * This method also set the data store of the registered <code>Behaviour</code> to
     * the DataStore of this current behaviour. The registered behaviour can retrieve
     * the vector of ACLMessage proposes, failures, pending and responses from the
     * datastore at <code>ALL_PROPOSES_KEY</code>, <code>ALL_FAILURES_KEY</code>,
     * <code>ALL_PH0_PENDINGS_KEY</code> and <code>output</code> field.
     * @param b the Behaviour that will handle this state
     */
    public void registerHandleAllResponses(Behaviour b) {
        registerState(b, HANDLE_ALL_RESPONSES);
        b.setDataStore(getDataStore());
    }

    /* User CAN'T override these methods */
		//#APIDOC_EXCLUDE_BEGIN
	  /**
	   */
	  protected String[] getToBeReset() {
	  	if (toBeReset == null) {
				toBeReset = new String[] {
					HANDLE_PROPOSE, 
					HANDLE_NOT_UNDERSTOOD,
					HANDLE_FAILURE,
					HANDLE_OUT_OF_SEQ
				};
	  	}
	  	return toBeReset;
	  }
    
    /**
     * Prepare vector containing cfps.
     * @param initiation cfp passed in the constructor
     * @return Vector of cfps
     */
    protected final Vector prepareInitiations(ACLMessage initiation) {
        return prepareCfps(initiation);
    }

    /**
     * This method sets for all prepared cfps <code>conversation-id</code> slot (with
     * value passed in the constructor), <code>protocol</code> slot and
     * <code>reply-with</code> slot with a unique value
     * constructed by concatenating receiver's agent name and phase number (i.e. 0).
     * After that it sends all cfps.
     * @param initiations vector prepared in PREPARE_CFPS state
     */
    protected final void sendInitiations(Vector initiations) {
      getDataStore().put(ALL_PENDINGS_KEY, new Vector());
      
      super.sendInitiations(initiations);
      
      totSessions = sessions.size();
    }

    /**
     * Check whether a reply is in-sequence and than update the appropriate Session
     * and removes corresponding cfp from vector of pendings.
     * @param reply message received
     * @return true if reply is compliant with flow of protocol, false otherwise
     */
    protected final boolean checkInSequence(ACLMessage reply) {
      boolean ret = false;
      String inReplyTo = reply.getInReplyTo();
      Session s = (Session) sessions.get(inReplyTo);
      if(s != null) {
        int perf = reply.getPerformative();
        if(s.update(perf)) {
          // The reply is compliant to the protocol
          ((Vector) getDataStore().get(ALL_RESPONSES_KEY)).add(reply);
          if (perf == ACLMessage.PROPOSE) {
        		((Vector) getDataStore().get(ALL_PROPOSES_KEY)).add(reply);
          }
          updatePendings(inReplyTo);
          
          ret = true;
        }
        if(s.isCompleted()) {
          sessions.remove(inReplyTo);
        }
      }
      return ret;
    }

    private void updatePendings(String key) {
    	Vector pendings = (Vector) getDataStore().get(ALL_PENDINGS_KEY);
      for(int i=0; i<pendings.size(); i++) {
          ACLMessage pendingMsg = (ACLMessage) pendings.get(i);
          if(pendingMsg.getReplyWith().equals(key)) {
              pendings.remove(i);
              break;
          }
      }
    }
    	
    /**
     * Check if there are still active sessions or if timeout is expired.
     * @param reply last message received
     * @return ALL_RESPONSES_RECEIVED or -1 (still active sessions)
     */
    protected final int checkSessions(ACLMessage reply) {
    	if (reply == null) {
    		// Timeout expired --> clear all sessions 
    		sessions.clear();
    	}
    	if (sessions.size() == 0) {
    		// We have finished --> fill the Vector of initiation messages for next 
    		// phase (unless already filled by the user)
        DataStore ds = getDataStore();
    		Vector nextPhMsgs = (Vector) ds.get(outputKey);
    		if (nextPhMsgs.size() == 0) {
	        Vector proposes = (Vector) ds.get(ALL_PROPOSES_KEY);
	        Vector pendings = (Vector) ds.get(ALL_PENDINGS_KEY);
	        fillNextPhInitiations(nextPhMsgs, proposes, pendings);
    		}
        return ALL_RESPONSES_RECEIVED;
    	}
    	else {
    		// We are still waiting for some responses
    		return -1;
    	}
    }
    
    private void fillNextPhInitiations(Vector nextPhMsgs, Vector proposes, Vector pendings) {
    	if (proposes.size() == totSessions) {
    		// All responders replied with PROPOSE --> Fill the vector 
    		// of initiation messages for next phase with QUERY_IF
        for(int i=0; i<proposes.size(); i++) {
          ACLMessage msg = (ACLMessage) proposes.get(i);
          ACLMessage queryIf = msg.createReply();
          queryIf.setPerformative(ACLMessage.QUERY_IF);
          nextPhMsgs.add(queryIf);
        }
    	}
    	else {
    		// At least one responder failed or didn't reply --> Fill the vector 
    		// of initiation messages for next phase with REJECT_PROPOSALS
        for(int i=0; i<proposes.size(); i++) {
          ACLMessage msg = (ACLMessage) proposes.get(i);
          ACLMessage reject = msg.createReply();
          reject.setPerformative(ACLMessage.REJECT_PROPOSAL);
          nextPhMsgs.add(reject);
        }
        for(int i=0; i<pendings.size(); i++) {
          ACLMessage msg = (ACLMessage) pendings.get(i);
          ACLMessage reject = (ACLMessage) msg.clone();
          reject.setPerformative(ACLMessage.REJECT_PROPOSAL);
          nextPhMsgs.add(reject);
        }
    	}
    }
    		
    		

    /**
     * Initialize the data store.
     * @param msg Message passed in the constructor
     */
    protected void initializeDataStore(ACLMessage msg) {
        super.initializeDataStore(msg);
        getDataStore().put(ALL_RESPONSES_KEY, new Vector());
        getDataStore().put(ALL_PROPOSES_KEY, new Vector());
        getDataStore().put(outputKey, new Vector());
    }
    //#APIDOC_EXCLUDE_END


  protected ProtocolSession getSession(ACLMessage msg, int sessionIndex) {
    Vector pendings = (Vector) getDataStore().get(ALL_PENDINGS_KEY);
    pendings.add(msg);
		
  	return new Session("R" + hashCode()+  "_" + Integer.toString(sessionIndex) + "_" + TwoPhConstants.PH0);
  }
  
    /**
     * Inner class Session
     */
    class Session implements ProtocolSession, Serializable {
        // Session states 
        static final int INIT = 0;
        static final int REPLY_RECEIVED = 1;

        private int state = INIT;
        private String myId;

        public Session(String id) {
        	myId = id;
        }
        
				public String getId() {
					return myId;
				}
				
        /**
         * Return true if received ACLMessage is consistent with the protocol.
         * @param perf
         * @return Return true if received ACLMessage is consistent with the protocol
         */
        public boolean update(int perf) {
            if(state == INIT) {
                switch(perf) {
                    case ACLMessage.PROPOSE:
                    case ACLMessage.FAILURE:
                    case ACLMessage.NOT_UNDERSTOOD:
                      state = REPLY_RECEIVED;
	                    return true;
                    default: 
                    	return false;
                }
            }
            else {
                return false;
            }
        }

        public int getState() {
            return state;
        }

        public boolean isCompleted() {
            return (state == REPLY_RECEIVED);
        }
    }
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -