⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clusterqueueimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2004 France Telecom R&D * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): ScalAgent Distributed Technologies * Contributor(s): */package org.objectweb.joram.mom.dest;import java.io.*;import java.util.*;import org.objectweb.joram.mom.notifications.*;import org.objectweb.joram.shared.excepts.*;import org.objectweb.joram.shared.messages.*;import org.objectweb.joram.shared.selectors.*;import org.objectweb.joram.mom.dest.*;import org.objectweb.joram.shared.admin.*;import org.objectweb.joram.shared.messages.Message;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.Notification;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.agent.UnknownNotificationException;import org.objectweb.util.monolog.api.BasicLevel;import org.objectweb.joram.mom.MomTracing;/** * The <code>ClusterQueueImpl</code> class implements the MOM queue behaviour, * basically storing messages and delivering them upon clients requests or * delivering to an other cluster queue. */public class ClusterQueueImpl extends QueueImpl {  /**    * key = agentId of ClusterQueue    * value = rateOfFlow (Float)   */  protected Hashtable clusters;  /** to calcul the loading factor, overloaded, ... */  protected LoadingFactor loadingFactor;  /** key = msgId   * value = date    */  private Hashtable timeTable;  /** key = msgId   * value = Vector (alreadyVisit)   */  private Hashtable visitTable;  /** number of message send to cluster */  private long clusterDeliveryCount;  /** waiting after a cluster request */  private long waitAfterClusterReq = -1;  /**   * Constructs a <code>ClusterQueueImpl</code> instance.   *   * @param destId  Identifier of the agent hosting the queue.   * @param adminId  Identifier of the administrator of the queue.   */  public ClusterQueueImpl(AgentId destId, AgentId adminId, Properties prop) {    super(destId, adminId, prop);    /** producer threshold */    int producThreshold = -1;    /** consumer threshold */    int consumThreshold = -1;    /** automatic eval threshold */    boolean autoEvalThreshold = false;    if (prop != null) {      try {        waitAfterClusterReq =           Long.valueOf(prop.getProperty("waitAfterClusterReq")).longValue();      } catch (NumberFormatException exc) {        waitAfterClusterReq = 60000;      }      try {        producThreshold =           Integer.valueOf(prop.getProperty("producThreshold")).intValue();      } catch (NumberFormatException exc) {        producThreshold = 10000;      }      try {        consumThreshold =           Integer.valueOf(prop.getProperty("consumThreshold")).intValue();      } catch (NumberFormatException exc) {        consumThreshold = 10000;      }      autoEvalThreshold =        Boolean.valueOf(prop.getProperty("autoEvalThreshold")).booleanValue();    }    clusters = new Hashtable();    clusters.put(destId, new Float(1));    loadingFactor = new LoadingFactor(this,                                      producThreshold,                                      consumThreshold,                                      autoEvalThreshold,                                      waitAfterClusterReq);    timeTable = new Hashtable();    visitTable = new Hashtable();    clusterDeliveryCount = 0;  }  public String toString() {    return "ClusterQueueImpl:" + destId.toString();  }  /**    * implement special process (see QueueImpl).    */  protected void specialProcess(Notification not) {    if (not instanceof ClientMessages)      doProcess((ClientMessages) not);    else if (not instanceof SetRightRequest)      doProcess((SetRightRequest) not);    else      super.specialProcess(not);  }  /** propagate right to all cluster. */  protected void doProcess(SetRightRequest not) {    super.doProcess(not);    sendToCluster(      new SetRightQueueCluster(        loadingFactor.getRateOfFlow(),        not,        clients));    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                    " ClusterQueueImpl.doReact(" + not + ")" +                                    "\nclients=" + clients);  }  /**    * use to add or remove ClusterQueue to cluster.    */  public Object specialAdminProcess(SpecialAdminRequest not)     throws RequestException {    Object ret = null;    try {      SpecialAdmin req = not.getRequest();            if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                       "--- " + this +                                      " specialAdminProcess : " +                                      req);      if (req instanceof AddQueueCluster) {        addQueueCluster(((AddQueueCluster) req).joiningQueue,                        loadingFactor.getRateOfFlow());      } else if (req instanceof RemoveQueueCluster) {        broadcastLeave(((RemoveQueueCluster) req).removeQueue);        removeQueueCluster(((RemoveQueueCluster) req).removeQueue);      } else if(req instanceof ListClusterQueue) {        ret = doList((ListClusterQueue) req);      }    } catch (Exception exc) {      if (MomTracing.dbgDestination.isLoggable(BasicLevel.WARN))        MomTracing.dbgDestination.log(BasicLevel.WARN,                                       "--- " + this +                                      " specialAdminProcess",                                      exc);      throw new RequestException(exc.getMessage());    }    return ret;  }    /** return the cluster list (vector). */  protected Object doList(ListClusterQueue req) {    Vector vect = new Vector();    for (Enumeration e = clusters.keys(); e.hasMoreElements(); )      vect.add(e.nextElement().toString());    return vect;  }   /**   *  send to joiningQueue a JoinQueueCluster not.   */  protected void addQueueCluster(String joiningQueue, float rateOfFlow) {    AgentId id = AgentId.fromString(joiningQueue);    if (clusters.containsKey(id)) return;//    clusters.put(id,new Float(rateOfFlow));    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                    " ClusterQueueImpl.addQueueCluster in " + destId +                                    "\njoiningQueue=" + joiningQueue +                                    "\nclusters=" + clusters);    Channel.sendTo(id,                   new JoinQueueCluster(loadingFactor.getRateOfFlow(),                                        clusters,                                        clients,                                        freeReading,                                        freeWriting));  }    /**    * broadcast to cluster the removeQueue.    */  protected void broadcastLeave(String removeQueue) {    sendToCluster(new LeaveQueueCluster(removeQueue));  }  /**    * removeQueue leave the cluster.   */  protected void removeQueueCluster(String removeQueue) {    AgentId id = AgentId.fromString(removeQueue);    if (destId.equals(id)) {      clusters.clear();    } else      clusters.remove(id);    for (Enumeration e = visitTable.elements(); e.hasMoreElements(); ) {      Vector visit = (Vector) e.nextElement();      if (visit.contains(id))        visit.remove(id);    }    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                    " ClusterQueueImpl.removeQueueCluster in " + destId +                                    "\nremoveQueue=" + removeQueue +                                    "\nclusters=" + clusters);  }    /**    * overload doProcess(ClientMessages)   * store all msgId in timeTable and visitTable,   * store message and deliver message if consumer   * wait.   * call factorCheck to evaluate the loading factor,   * activity, ... and send message to cluster if need.   */  protected void doProcess(ClientMessages not) {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                     "--- " + this +                                     " " + not);    receiving = true;    long date = System.currentTimeMillis();        Message msg;    // Storing each received message:    for (Enumeration msgs = not.getMessages().elements();         msgs.hasMoreElements();) {      if (arrivalsCounter == Long.MAX_VALUE)        arrivalsCounter = 0;      msg = (Message) msgs.nextElement();      msg.order = arrivalsCounter++;      storeMessage(msg);      storeMsgIdInTimeTable(msg.getIdentifier(),                            new Long(date));//        storeMsgIdInVisitTable(msg.getIdentifier(),//                               destId);    }    // Lauching a delivery sequence:    deliverMessages(0);    if (getNumberOfPendingMessages() > loadingFactor.producThreshold)      loadingFactor.factorCheck(clusters,                                getNumberOfPendingMessages(),                                getNumberOfPendingRequests());    else      loadingFactor.evalRateOfFlow(getNumberOfPendingMessages(),                                   getNumberOfPendingRequests());    receiving = false;  }  /**   * Distributes the received notifications to the appropriate reactions.   *   * @exception UnknownNotificationException  When receiving an unexpected   *              notification.   */  public void react(AgentId from, Notification not)    throws UnknownNotificationException {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this +                                    " react(" + from + "," + not + ")");    if (not instanceof AckJoinQueueCluster)      doReact((AckJoinQueueCluster) not);    else if (not instanceof JoinQueueCluster)      doReact((JoinQueueCluster) not);    else if (not instanceof LeaveQueueCluster)      removeQueueCluster(((LeaveQueueCluster) not).removeQueue);    else if (not instanceof ReceiveRequest) {      super.react(from, not);      doReact((ReceiveRequest) not);    } else if (not instanceof LBMessageGive)      doReact(from, (LBMessageGive) not);    else if (not instanceof LBMessageHope)      doReact(from, (LBMessageHope) not);    else if (not instanceof LBCycleLife)      doReact(from, (LBCycleLife) not);    else if (not instanceof WakeUpNot)      doReact((WakeUpNot) not);    else if (not instanceof SetRightQueueCluster)      doReact((SetRightQueueCluster) not);    else      super.react(from, not);  }  /** set the same right to all cluster */  protected void doReact(SetRightQueueCluster not) {    try {      AgentId user = not.setRightRequest.getClient();      int right = not.setRightRequest.getRight();      super.processSetRight(user,right);    } catch (RequestException exc) {}    super.doProcess(not.setRightRequest);    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                    " ClusterQueueImpl.doReact(" + not + ")" +                                    "\nclients=" + clients);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -