📄 loadingfactor.java
字号:
pendingRequests); evalActivity(); if ( status == Status.INIT || status == Status.RUN) { if (isOverloaded()) { dispatchAndSendTo(clusters, pendingMessages, pendingRequests); status = Status.WAIT; statusTime = System.currentTimeMillis() + validityPeriod; } } updateThreshol(); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "<< LoadingFactor.factorCheck " + this); } /** * return true if cluster queue is overloaded. * depends on activity. */ public boolean isOverloaded() { overLoaded = false; if ((consumerStatus == ConsumerStatus.CONSUMER_HIGH_ACTIVITY) || (producerStatus == ProducerStatus.PRODUCER_HIGH_ACTIVITY)) overLoaded = true; if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "LoadingFactor.isOverloaded " + overLoaded); return overLoaded; } /** * use to dispatch request hope or give messages * in clusters. */ private void dispatchAndSendTo(Hashtable clusters, int nbOfPendingMessages, int nbOfPendingRequests) { int nbMsgHope = -1; int nbMsgGive = -1; if ((consumerStatus == ConsumerStatus.CONSUMER_NO_ACTIVITY) && (producerStatus == ProducerStatus.PRODUCER_NO_ACTIVITY)) return; if (producThreshold < nbOfPendingMessages) nbMsgGive = nbOfPendingMessages - producThreshold; if (consumThreshold < nbOfPendingRequests) nbMsgHope = nbOfPendingRequests;// if (nbOfPendingRequests > nbOfPendingMessages)// nbMsgHope = nbOfPendingRequests - nbOfPendingMessages;// else// nbMsgGive = nbOfPendingMessages - nbOfPendingRequests; if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "LoadingFactor.dispatchAndSendTo" + "\nnbMsgHope=" + nbMsgHope + ", nbMsgGive=" + nbMsgGive); if (consumerStatus == ConsumerStatus.CONSUMER_HIGH_ACTIVITY) processHope(nbMsgHope,clusters); if (producerStatus == ProducerStatus.PRODUCER_HIGH_ACTIVITY) processGive(nbMsgGive,clusters); } /** * send nb messages on clusters. */ private void processGive(int nbMsgGive, Hashtable clusters) { if (nbMsgGive < 1) return; // select queue in cluster who have a rateOfFlow > 1 Vector selected = new Vector(); for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); if (((Float) clusters.get(id)).floatValue() >= 1 && !id.equals(clusterQueueImpl.getDestId())) selected.add(id); } if (selected.size() == 0) return; int givePerQueue = nbMsgGive / selected.size(); LBMessageGive msgGive = new LBMessageGive(validityPeriod,rateOfFlow); ClientMessages cm = new ClientMessages(); if (givePerQueue == 0 && nbMsgGive > 0) { AgentId id = (AgentId) selected.get(0); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "LoadingFactor.processGive" + " nbMsgGive = " + nbMsgGive + ", id = " + id); for (int i = 0; i < givePerQueue; i++) { if (! clusterQueueImpl.messagesIsEmpty()) { Message msg = clusterQueueImpl.removeMessage(0); cm.addMessage(msg); } else break; } msgGive.setClientMessages(cm); Channel.sendTo(id,msgGive); for (Enumeration e = cm.getMessages().elements(); e.hasMoreElements(); ) { Message msg = (Message) e.nextElement(); clusterQueueImpl.messageSendToCluster(msg.getIdentifier()); clusterQueueImpl.deletePersistenceMessage(msg); } } else { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "LoadingFactor.processGive" + " givePerQueue = " + givePerQueue + ", selected = " + selected); for (Enumeration e = selected.elements(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); for (int i = 0; i < givePerQueue; i++) { if (clusterQueueImpl.messagesIsEmpty()) break; Message msg = clusterQueueImpl.removeMessage(0); cm.addMessage(msg); } msgGive.setClientMessages(cm); Channel.sendTo(id,msgGive); for (Enumeration e2 = cm.getMessages().elements(); e2.hasMoreElements(); ) { Message msg = (Message) e2.nextElement(); clusterQueueImpl.messageSendToCluster(msg.getIdentifier()); clusterQueueImpl.deletePersistenceMessage(msg); } cm = new ClientMessages(); } } } /** * */ private void processHope(int nbMsgHope, Hashtable clusters) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "LoadingFactor.processHope" + " nbMsgHope = " + nbMsgHope); if (nbMsgHope < 1) return; Vector selected = new Vector(); for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); if (((Float) clusters.get(id)).floatValue() <= 1 && !id.equals(clusterQueueImpl.getDestId())) selected.add(id); } if (selected.size() == 0) return; int hopePerQueue = nbMsgHope / selected.size(); if (hopePerQueue == 0 && nbMsgHope > 0) { AgentId id = (AgentId) selected.get(0); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "LoadingFactor.processHope" + " nbMsgHope = " + nbMsgHope + ", id = " + id); LBMessageHope msgHope = new LBMessageHope(validityPeriod,rateOfFlow); msgHope.setNbMsg(nbMsgHope); Channel.sendTo(id,msgHope); } else { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "LoadingFactor.processHope" + " hopePerQueue = " + hopePerQueue + ", selected = " + selected); LBMessageHope msgHope = new LBMessageHope(validityPeriod,rateOfFlow); for (Enumeration e = selected.elements(); e.hasMoreElements(); ) { AgentId id = (AgentId) e.nextElement(); msgHope.setNbMsg(hopePerQueue); Channel.sendTo(id,msgHope); } } } public void processGive(AgentId to, LBCycleLife cycle) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "LoadingFactor.processGive" + " to = " + to + ", cycle = " + cycle); Channel.sendTo(to,cycle); ClientMessages cm = cycle.getClientMessages(); for (Enumeration e = cm.getMessages().elements(); e.hasMoreElements(); ) { Message msg = (Message) e.nextElement(); clusterQueueImpl.messageSendToCluster(msg.getIdentifier()); clusterQueueImpl.deletePersistenceMessage(msg); } } public String toString() { StringBuffer str = new StringBuffer(); str.append("LoadingFactor (status="); str.append(Status.names[status]); str.append(", consumerStatus="); str.append(ConsumerStatus.names[consumerStatus]); str.append(", producerStatus="); str.append(ProducerStatus.names[producerStatus]); str.append(", producThreshold="); str.append(producThreshold); str.append(", consumThreshold="); str.append(consumThreshold); str.append(", autoEvalThreshold="); str.append(autoEvalThreshold); str.append(", nbOfPendingMessages="); str.append(nbOfPendingMessages); str.append(", nbOfPendingRequests="); str.append(nbOfPendingRequests); str.append(", rateOfFlow="); str.append(rateOfFlow); str.append(", overLoaded="); str.append(overLoaded); str.append(")"); return str.toString(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -