⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clusterqueueimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
  }  /**   * wake up, and call factorCheck to evaluate the loading factor...   * if msg stay more a periode time in timeTable send to an other   * (no visited) queue in cluster.   */  protected void doReact(WakeUpNot not) {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                     "--- " + this +                                    " ClusterQueueImpl.doReact(" + not + ")");    super.doReact(not);    if (clusters.size() > 1)      loadingFactor.factorCheck(clusters,                                getNumberOfPendingMessages(),                                getNumberOfPendingRequests());    // check if msg arrived befor "period".    // if is true send msg to the next (no visited) clusterQueue.    Vector toGive = new Vector();    long oldTime = System.currentTimeMillis() - period;    for (Enumeration e = timeTable.keys(); e.hasMoreElements(); ) {      String msgId = (String) e.nextElement();      if (((Long) timeTable.get(msgId)).longValue() < oldTime) {        toGive.add(msgId);        storeMsgIdInVisitTable(msgId,destId);      }    }    if (toGive.isEmpty()) return;    Hashtable table = new Hashtable();    for (int i = 0; i < toGive.size(); i++) {      String msgId = (String) toGive.get(i);      Vector visit = (Vector) visitTable.get(msgId);      for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) {        AgentId id = (AgentId) e.nextElement();        if (! visit.contains(id)) {          LBCycleLife cycle = (LBCycleLife) table.get(id);          if (cycle == null) {            cycle = new LBCycleLife(loadingFactor.getRateOfFlow());            cycle.setClientMessages(new ClientMessages());          }          ClientMessages cm = cycle.getClientMessages();          Message msg = removeMessage(msgId);          if (msg != null) {            cm.addMessage(msg);            cycle.putInVisitTable(msgId,visit);            table.put(id,cycle);            break;          }        }      }    }    for (Enumeration e = table.keys(); e.hasMoreElements(); ) {      AgentId id = (AgentId) e.nextElement();      loadingFactor.processGive(id,(LBCycleLife) table.get(id));    }  }  /**   * The messages are not consumed by an other cluster's queue   * in a periode time, try to consume in this queue.   * update visitTable, and process clientMessages.    */  protected void doReact(AgentId from, LBCycleLife not) {    clusters.put(from,new Float(not.getRateOfFlow()));    Hashtable vT = not.getVisitTable();    for (Enumeration e = vT.keys(); e.hasMoreElements(); ) {      String msgId = (String) e.nextElement();      visitTable.put(msgId,vT.get(msgId));    }    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                    " ClusterQueueImpl.doReact(" + not + ")" +                                    "\nvisitTable=" + clusters);    ClientMessages cm = not.getClientMessages();    if (cm != null)      doProcess(cm);  }  /**   * new queue come in cluster, update clusters.   * and spread to clusters the AckjoiningQueue.   */  protected void doReact(JoinQueueCluster not) {    for (Enumeration e = not.clusters.keys(); e.hasMoreElements(); ) {      AgentId id = (AgentId) e.nextElement();      if (! clusters.containsKey(id))        clusters.put(id,not.clusters.get(id));    }    for (Enumeration e = not.clients.keys(); e.hasMoreElements(); ) {      AgentId user = (AgentId) e.nextElement();      if (clients.containsKey(user)) {        Integer right = (Integer) not.clients.get(user);        if (right.compareTo((Integer) clients.get(user)) > 0)          clients.put(user,right);      } else        clients.put(user,not.clients.get(user));    }    freeReading = freeReading | not.freeReading;    freeWriting = freeWriting | not.freeWriting;    sendToCluster(      new AckJoinQueueCluster(loadingFactor.getRateOfFlow(),                              clusters,                              clients,                              freeReading,                              freeWriting));      if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                    " ClusterQueueImpl.doReact(" + not + ")" +                                    "\nclusters=" + clusters +                                    "\nclients=" + clients);  }  protected void doReact(AckJoinQueueCluster not) {    for (Enumeration e = not.clusters.keys(); e.hasMoreElements(); ) {      AgentId id = (AgentId) e.nextElement();      if (! clusters.containsKey(id))        clusters.put(id,not.clusters.get(id));    }    for (Enumeration e = not.clients.keys(); e.hasMoreElements(); ) {      AgentId user = (AgentId) e.nextElement();      if (clients.containsKey(user)) {        Integer right = (Integer) not.clients.get(user);        if (right.compareTo((Integer) clients.get(user)) > 0)          clients.put(user,right);      } else        clients.put(user,not.clients.get(user));    }    freeReading = freeReading | not.freeReading;    freeWriting = freeWriting | not.freeWriting;      if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                    " ClusterQueueImpl.doReact(" + not + ")" +                                    "\nclusters=" + clusters +                                    "\nclients=" + clients);  }  /**   *    */  protected void doReact(ReceiveRequest not) {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                     " ClusterQueueImpl.doReact(" + not + ")");    //loadingFactor.setWait();    if (getNumberOfPendingRequests() > loadingFactor.consumThreshold)      loadingFactor.factorCheck(clusters,                                getNumberOfPendingMessages(),                                getNumberOfPendingRequests());  }  /**    * load balancing message give by an other cluster queue.   * process ClientMessages, no need to check if sender is writer.   */  protected void doReact(AgentId from, LBMessageGive not)     throws UnknownNotificationException {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                     "--- " + this +                                    " ClusterQueueImpl.doReact(" + from + "," + not + ")");    clusters.put(from,new Float(not.getRateOfFlow()));    ClientMessages cm = not.getClientMessages();    if (cm != null)      doProcess(cm);  }  /**    * load balancing message hope by the "from" queue.   */  protected void doReact(AgentId from, LBMessageHope not) {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                     " ClusterQueueImpl.doReact(" + from + "," + not + ")");        clusters.put(from,new Float(not.getRateOfFlow()));    int hope = not.getNbMsg();    // TODO: if validityperiod    if (loadingFactor.getRateOfFlow() < 1) {      int possibleGive = getNumberOfPendingMessages() - getNumberOfPendingRequests();      LBMessageGive msgGive =         new LBMessageGive(waitAfterClusterReq,loadingFactor.getRateOfFlow());      ClientMessages cm = new ClientMessages();      for (int i = 0; (i < possibleGive) && (i < hope); i++) {        if (! messagesIsEmpty()) {          Message msg = removeMessage(0);          cm.addMessage(msg);        } else           break;      }      msgGive.setClientMessages(cm);      msgGive.setRateOfFlow(        loadingFactor.evalRateOfFlow(getNumberOfPendingMessages(),                                     getNumberOfPendingRequests()));      Channel.sendTo(from,msgGive);            if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                       "--- " + this +                                      " ClusterQueueImpl.doReact LBMessageHope : nbMsgSend = " +                                       cm.getMessages().size());      for (Enumeration e = cm.getMessages().elements(); e.hasMoreElements(); ) {        Message msg = (Message) e.nextElement();        messageSendToCluster(msg.getIdentifier());        deletePersistenceMessage(msg);      }    }  }  /**   * send to all queue in cluster.   */  protected void sendToCluster(QueueClusterNot not) {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                     " ClusterQueueImpl.sendToCluster(" + not + ")");    if (clusters.size() < 2) return;    for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) {      AgentId id = (AgentId) e.nextElement();      if (! id.equals(destId))        Channel.sendTo(id,not);    }  }  /**    * return the number of Message send to cluster.   */  public long getClusterDeliveryCount() {    return clusterDeliveryCount;  }  /**   * return index of message.   */  private int getIndexOfMessage(String msgId) {    for (int i = 0; i < messages.size(); i++) {      Message msg = (Message) messages.get(i);      if (msgId.equals(msg.getIdentifier()))        return i;    }    return -1;  }  private void storeMsgIdInTimeTable(String msgId, Long date) {    try {      timeTable.put(msgId,date);    } catch (NullPointerException exc) {}  }  private void storeMsgIdInVisitTable(String msgId, AgentId destId) {    Vector alreadyVisit = (Vector) visitTable.get(msgId);    if (alreadyVisit == null) alreadyVisit = new Vector();    alreadyVisit.add(destId);    visitTable.put(msgId,alreadyVisit);  }  protected void messageDelivered(String msgId) {    timeTable.remove(msgId);    visitTable.remove(msgId);  }  protected void messageRemoved(String msgId) {    timeTable.remove(msgId);    visitTable.remove(msgId);  }  protected void messageSendToCluster(String msgId) {    timeTable.remove(msgId);    visitTable.remove(msgId);    clusterDeliveryCount++;  }  Message removeMessage(String msgId) {    for (Enumeration e = messages.elements(); e.hasMoreElements(); ) {      Message msg = (Message) e.nextElement();      if (msgId.equals(msg.getIdentifier())) {        // fix bug for softRefMessage        msg.setPin(true);        if (messages.remove(msg))          return msg;        else           return null;      }    }    return null;  }  Message removeMessage(int index) {    Message msg = (Message) messages.remove(0);    // fix bug for softRefMessage    msg.setPin(true);    return msg;  }  boolean messagesIsEmpty() {   return messages.isEmpty();  }  void deletePersistenceMessage(Message msg) {    msg.delete();  }  AgentId getDestId() {    return destId;  }  public int getNumberOfPendingMessages() {    return messages.size();  }    public int getNumberOfPendingRequests() {    return requests.size();  }  private void readObject(java.io.ObjectInputStream in)    throws IOException, ClassNotFoundException {    in.defaultReadObject();    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                    "--- " + this +                                     " ClusterQueueImpl.readObject" +                                    " loadingFactor = " + loadingFactor);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -