📄 clusterqueueimpl.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2004 France Telecom R&D * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies * Contributor(s): */package org.objectweb.joram.mom.dest;import java.io.*;import java.util.*;import org.objectweb.joram.mom.notifications.*;import org.objectweb.joram.shared.excepts.*;import org.objectweb.joram.shared.messages.*;import org.objectweb.joram.shared.selectors.*;import org.objectweb.joram.mom.dest.*;import org.objectweb.joram.shared.admin.*;import org.objectweb.joram.shared.messages.Message;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.Notification;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.agent.UnknownNotificationException;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.joram.mom.MomTracing;/** * The <code>ClusterQueueImpl</code> class implements the MOM queue behaviour, * basically storing messages and delivering them upon clients requests or * delivering to an other cluster queue. */public class ClusterQueueImpl extends QueueImpl { /** * key = agentId of ClusterQueue * value = rateOfFlow (Float) */ protected Hashtable clusters; /** to calcul the loading factor, overloaded, ... */ protected LoadingFactor loadingFactor; /** key = msgId * value = date */ private Hashtable timeTable; /** key = msgId * value = Vector (alreadyVisit) */ private Hashtable visitTable; /** number of message send to cluster */ private long clusterDeliveryCount; /** waiting after a cluster request */ private long waitAfterClusterReq = -1; /** * Constructs a <code>ClusterQueueImpl</code> instance. * * @param destId Identifier of the agent hosting the queue. * @param adminId Identifier of the administrator of the queue. */ public ClusterQueueImpl(AgentId destId, AgentId adminId, Properties prop) { super(destId, adminId, prop); /** producer threshold */ int producThreshold = -1; /** consumer threshold */ int consumThreshold = -1; /** automatic eval threshold */ boolean autoEvalThreshold = false; if (prop != null) { try { waitAfterClusterReq = Long.valueOf(prop.getProperty("waitAfterClusterReq")).longValue(); } catch (NumberFormatException exc) { waitAfterClusterReq = 60000; } try { producThreshold = Integer.valueOf(prop.getProperty("producThreshold")).intValue(); } catch (NumberFormatException exc) { producThreshold = 10000; } try { consumThreshold = Integer.valueOf(prop.getProperty("consumThreshold")).intValue(); } catch (NumberFormatException exc) { consumThreshold = 10000; } autoEvalThreshold = Boolean.valueOf(prop.getProperty("autoEvalThreshold")).booleanValue(); } clusters = new Hashtable(); clusters.put(destId, new Float(1)); loadingFactor = new LoadingFactor(this, producThreshold, consumThreshold, autoEvalThreshold, waitAfterClusterReq); timeTable = new Hashtable(); visitTable = new Hashtable(); clusterDeliveryCount = 0; } public String toString() { return "ClusterQueueImpl:" + destId.toString(); } /** * implement special process (see QueueImpl). */ protected void specialProcess(Notification not) { if (not instanceof ClientMessages) doProcess((ClientMessages) not); else if (not instanceof SetRightRequest) doProcess((SetRightRequest) not); else super.specialProcess(not); } /** propagate right to all cluster. */ protected void doProcess(SetRightRequest not) { super.doProcess(not); sendToCluster( new SetRightQueueCluster( loadingFactor.getRateOfFlow(), not, clients)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + not + ")" + "\nclients=" + clients); } /** * use to add or remove ClusterQueue to cluster. */ public Object specialAdminProcess(SpecialAdminRequest not) throws RequestException { Object ret = null; try { SpecialAdmin req = not.getRequest(); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " specialAdminProcess : " + req); if (req instanceof AddQueueCluster) { addQueueCluster(((AddQueueCluster) req).joiningQueue, loadingFactor.getRateOfFlow()); } else if (req instanceof RemoveQueueCluster) { broadcastLeave(((RemoveQueueCluster) req).removeQueue); removeQueueCluster(((RemoveQueueCluster) req).removeQueue); } else if(req instanceof ListClusterQueue) { ret = doList((ListClusterQueue) req); } } catch (Exception exc) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.WARN)) MomTracing.dbgDestination.log(BasicLevel.WARN, "--- " + this + " specialAdminProcess", exc); throw new RequestException(exc.getMessage()); } return ret; } /** return the cluster list (vector). */ protected Object doList(ListClusterQueue req) { Vector vect = new Vector(); for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) vect.add(e.nextElement().toString()); return vect; } /** * send to joiningQueue a JoinQueueCluster not. */ protected void addQueueCluster(String joiningQueue, float rateOfFlow) { AgentId id = AgentId.fromString(joiningQueue); if (clusters.containsKey(id)) return;// clusters.put(id,new Float(rateOfFlow)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.addQueueCluster in " + destId + "\njoiningQueue=" + joiningQueue + "\nclusters=" + clusters); Channel.sendTo(id, new JoinQueueCluster(loadingFactor.getRateOfFlow(), clusters, clients, freeReading, freeWriting)); } /** * broadcast to cluster the removeQueue. */ protected void broadcastLeave(String removeQueue) { sendToCluster(new LeaveQueueCluster(removeQueue)); } /** * removeQueue leave the cluster. */ protected void removeQueueCluster(String removeQueue) { AgentId id = AgentId.fromString(removeQueue); if (destId.equals(id)) { clusters.clear(); } else clusters.remove(id); for (Enumeration e = visitTable.elements(); e.hasMoreElements(); ) { Vector visit = (Vector) e.nextElement(); if (visit.contains(id)) visit.remove(id); } if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.removeQueueCluster in " + destId + "\nremoveQueue=" + removeQueue + "\nclusters=" + clusters); } /** * overload doProcess(ClientMessages) * store all msgId in timeTable and visitTable, * store message and deliver message if consumer * wait. * call factorCheck to evaluate the loading factor, * activity, ... and send message to cluster if need. */ protected void doProcess(ClientMessages not) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " " + not); receiving = true; long date = System.currentTimeMillis(); Message msg; // Storing each received message: for (Enumeration msgs = not.getMessages().elements(); msgs.hasMoreElements();) { if (arrivalsCounter == Long.MAX_VALUE) arrivalsCounter = 0; msg = (Message) msgs.nextElement(); msg.order = arrivalsCounter++; storeMessage(msg); storeMsgIdInTimeTable(msg.getIdentifier(), new Long(date));// storeMsgIdInVisitTable(msg.getIdentifier(),// destId); } // Lauching a delivery sequence: deliverMessages(0); if (getNumberOfPendingMessages() > loadingFactor.producThreshold) loadingFactor.factorCheck(clusters, getNumberOfPendingMessages(), getNumberOfPendingRequests()); else loadingFactor.evalRateOfFlow(getNumberOfPendingMessages(), getNumberOfPendingRequests()); receiving = false; } /** * Distributes the received notifications to the appropriate reactions. * * @exception UnknownNotificationException When receiving an unexpected * notification. */ public void react(AgentId from, Notification not) throws UnknownNotificationException { if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " react(" + from + "," + not + ")"); if (not instanceof AckJoinQueueCluster) doReact((AckJoinQueueCluster) not); else if (not instanceof JoinQueueCluster) doReact((JoinQueueCluster) not); else if (not instanceof LeaveQueueCluster) removeQueueCluster(((LeaveQueueCluster) not).removeQueue); else if (not instanceof ReceiveRequest) { super.react(from, not); doReact((ReceiveRequest) not); } else if (not instanceof LBMessageGive) doReact(from, (LBMessageGive) not); else if (not instanceof LBMessageHope) doReact(from, (LBMessageHope) not); else if (not instanceof LBCycleLife) doReact(from, (LBCycleLife) not); else if (not instanceof WakeUpNot) doReact((WakeUpNot) not); else if (not instanceof SetRightQueueCluster) doReact((SetRightQueueCluster) not); else super.react(from, not); } /** set the same right to all cluster */ protected void doReact(SetRightQueueCluster not) { try { AgentId user = not.setRightRequest.getClient(); int right = not.setRightRequest.getRight(); super.processSetRight(user,right); } catch (RequestException exc) {} super.doProcess(not.setRightRequest); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueueImpl.doReact(" + not + ")" + "\nclients=" + clients);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -