📄 clusterqueueimpl.java
字号:
} /** * wake up, and call factorCheck to evaluate the loading factor... * if msg stay more a periode time in timeTable send to an other * (no visited) queue in cluster. */ protected void doReact(WakeUpNot not) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + not + ")"); super.doReact(not); if (clusters.size() > 1) loadingFactor.factorCheck(clusters, getNumberOfPendingMessages(), getNumberOfPendingRequests()); // check if msg arrived befor "period". // if is true send msg to the next (no visited) clusterQueue. Vector toGive = new Vector(); long oldTime = System.currentTimeMillis() - period; for (Enumeration e = timeTable.keys(); e.hasMoreElements(); ) { String msgId = (String) e.nextElement(); if (((Long) timeTable.get(msgId)).longValue() < oldTime) { toGive.add(msgId); storeMsgIdInVisitTable(msgId,destId); } } if (toGive.isEmpty()) return; Hashtable table = new Hashtable(); for (int i = 0; i < toGive.size(); i++) { String msgId = (String) toGive.get(i); Vector visit = (Vector) visitTable.get(msgId); for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); if (! visit.contains(id)) { LBCycleLife cycle = (LBCycleLife) table.get(id); if (cycle == null) { cycle = new LBCycleLife(loadingFactor.getRateOfFlow()); cycle.setClientMessages(new ClientMessages()); } ClientMessages cm = cycle.getClientMessages(); Message msg = removeMessage(msgId); if (msg != null) { cm.addMessage(msg); cycle.putInVisitTable(msgId,visit); table.put(id,cycle); break; } } } } for (Enumeration e = table.keys(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); loadingFactor.processGive(id,(LBCycleLife) table.get(id)); } } /** * The messages are not consumed by an other cluster's queue * in a periode time, try to consume in this queue. * update visitTable, and process clientMessages. */ protected void doReact(AgentId from, LBCycleLife not) { clusters.put(from,new Float(not.getRateOfFlow())); Hashtable vT = not.getVisitTable(); for (Enumeration e = vT.keys(); e.hasMoreElements(); ) { String msgId = (String) e.nextElement(); visitTable.put(msgId,vT.get(msgId)); } if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + not + ")" + "\nvisitTable=" + clusters); ClientMessages cm = not.getClientMessages(); if (cm != null) doProcess(cm); } /** * new queue come in cluster, update clusters. * and spread to clusters the AckjoiningQueue. */ protected void doReact(JoinQueueCluster not) { for (Enumeration e = not.clusters.keys(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); if (! clusters.containsKey(id)) clusters.put(id,not.clusters.get(id)); } for (Enumeration e = not.clients.keys(); e.hasMoreElements(); ) { AgentId user = (AgentId) e.nextElement(); if (clients.containsKey(user)) { Integer right = (Integer) not.clients.get(user); if (right.compareTo((Integer) clients.get(user)) > 0) clients.put(user,right); } else clients.put(user,not.clients.get(user)); } freeReading = freeReading | not.freeReading; freeWriting = freeWriting | not.freeWriting; sendToCluster( new AckJoinQueueCluster(loadingFactor.getRateOfFlow(), clusters, clients, freeReading, freeWriting)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + not + ")" + "\nclusters=" + clusters + "\nclients=" + clients); } protected void doReact(AckJoinQueueCluster not) { for (Enumeration e = not.clusters.keys(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); if (! clusters.containsKey(id)) clusters.put(id,not.clusters.get(id)); } for (Enumeration e = not.clients.keys(); e.hasMoreElements(); ) { AgentId user = (AgentId) e.nextElement(); if (clients.containsKey(user)) { Integer right = (Integer) not.clients.get(user); if (right.compareTo((Integer) clients.get(user)) > 0) clients.put(user,right); } else clients.put(user,not.clients.get(user)); } freeReading = freeReading | not.freeReading; freeWriting = freeWriting | not.freeWriting; if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + not + ")" + "\nclusters=" + clusters + "\nclients=" + clients); } /** * */ protected void doReact(ReceiveRequest not) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + not + ")"); //loadingFactor.setWait(); if (getNumberOfPendingRequests() > loadingFactor.consumThreshold) loadingFactor.factorCheck(clusters, getNumberOfPendingMessages(), getNumberOfPendingRequests()); } /** * load balancing message give by an other cluster queue. * process ClientMessages, no need to check if sender is writer. */ protected void doReact(AgentId from, LBMessageGive not) throws UnknownNotificationException { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + from + "," + not + ")"); clusters.put(from,new Float(not.getRateOfFlow())); ClientMessages cm = not.getClientMessages(); if (cm != null) doProcess(cm); } /** * load balancing message hope by the "from" queue. */ protected void doReact(AgentId from, LBMessageHope not) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + from + "," + not + ")"); clusters.put(from,new Float(not.getRateOfFlow())); int hope = not.getNbMsg(); // TODO: if validityperiod if (loadingFactor.getRateOfFlow() < 1) { int possibleGive = getNumberOfPendingMessages() - getNumberOfPendingRequests(); LBMessageGive msgGive = new LBMessageGive(waitAfterClusterReq,loadingFactor.getRateOfFlow()); ClientMessages cm = new ClientMessages(); for (int i = 0; (i < possibleGive) && (i < hope); i++) { if (! messagesIsEmpty()) { Message msg = removeMessage(0); cm.addMessage(msg); } else break; } msgGive.setClientMessages(cm); msgGive.setRateOfFlow( loadingFactor.evalRateOfFlow(getNumberOfPendingMessages(), getNumberOfPendingRequests())); Channel.sendTo(from,msgGive); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact LBMessageHope : nbMsgSend = " + cm.getMessages().size()); for (Enumeration e = cm.getMessages().elements(); e.hasMoreElements(); ) { Message msg = (Message) e.nextElement(); messageSendToCluster(msg.getIdentifier()); deletePersistenceMessage(msg); } } } /** * send to all queue in cluster. */ protected void sendToCluster(QueueClusterNot not) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.sendToCluster(" + not + ")"); if (clusters.size() < 2) return; for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); if (! id.equals(destId)) Channel.sendTo(id,not); } } /** * return the number of Message send to cluster. */ public long getClusterDeliveryCount() { return clusterDeliveryCount; } /** * return index of message. */ private int getIndexOfMessage(String msgId) { for (int i = 0; i < messages.size(); i++) { Message msg = (Message) messages.get(i); if (msgId.equals(msg.getIdentifier())) return i; } return -1; } private void storeMsgIdInTimeTable(String msgId, Long date) { try { timeTable.put(msgId,date); } catch (NullPointerException exc) {} } private void storeMsgIdInVisitTable(String msgId, AgentId destId) { Vector alreadyVisit = (Vector) visitTable.get(msgId); if (alreadyVisit == null) alreadyVisit = new Vector(); alreadyVisit.add(destId); visitTable.put(msgId,alreadyVisit); } protected void messageDelivered(String msgId) { timeTable.remove(msgId); visitTable.remove(msgId); } protected void messageRemoved(String msgId) { timeTable.remove(msgId); visitTable.remove(msgId); } protected void messageSendToCluster(String msgId) { timeTable.remove(msgId); visitTable.remove(msgId); clusterDeliveryCount++; } Message removeMessage(String msgId) { for (Enumeration e = messages.elements(); e.hasMoreElements(); ) { Message msg = (Message) e.nextElement(); if (msgId.equals(msg.getIdentifier())) { // fix bug for softRefMessage msg.setPin(true); if (messages.remove(msg)) return msg; else return null; } } return null; } Message removeMessage(int index) { Message msg = (Message) messages.remove(0); // fix bug for softRefMessage msg.setPin(true); return msg; } boolean messagesIsEmpty() { return messages.isEmpty(); } void deletePersistenceMessage(Message msg) { msg.delete(); } AgentId getDestId() { return destId; } public int getNumberOfPendingMessages() { return messages.size(); } public int getNumberOfPendingRequests() { return requests.size(); } private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.readObject" + " loadingFactor = " + loadingFactor); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -