⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 loadingfactor.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                   pendingRequests);    evalActivity();    if ( status == Status.INIT || status == Status.RUN) {      if (isOverloaded()) {        dispatchAndSendTo(clusters,                          pendingMessages,                          pendingRequests);        status = Status.WAIT;        statusTime = System.currentTimeMillis() + validityPeriod;      }    }    updateThreshol();    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                     "<< LoadingFactor.factorCheck "                                     + this);  }  /**    * return true if cluster queue is overloaded.   * depends on activity.   */  public boolean isOverloaded() {    overLoaded = false;    if ((consumerStatus ==          ConsumerStatus.CONSUMER_HIGH_ACTIVITY) ||        (producerStatus ==          ProducerStatus.PRODUCER_HIGH_ACTIVITY))      overLoaded = true;        if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                     "LoadingFactor.isOverloaded "                                     + overLoaded);    return overLoaded;  }  /**   * use to dispatch request hope or give messages   * in clusters.   */  private void dispatchAndSendTo(Hashtable clusters,                                 int nbOfPendingMessages,                                 int nbOfPendingRequests) {    int nbMsgHope = -1;    int nbMsgGive = -1;        if ((consumerStatus == ConsumerStatus.CONSUMER_NO_ACTIVITY) &&        (producerStatus == ProducerStatus.PRODUCER_NO_ACTIVITY))      return;        if (producThreshold < nbOfPendingMessages)      nbMsgGive = nbOfPendingMessages - producThreshold;    if (consumThreshold < nbOfPendingRequests)      nbMsgHope = nbOfPendingRequests;//      if (nbOfPendingRequests > nbOfPendingMessages)//        nbMsgHope = nbOfPendingRequests - nbOfPendingMessages;//      else//        nbMsgGive = nbOfPendingMessages - nbOfPendingRequests;        if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                     "LoadingFactor.dispatchAndSendTo" +                                    "\nnbMsgHope=" + nbMsgHope +                                    ", nbMsgGive=" + nbMsgGive);    if (consumerStatus == ConsumerStatus.CONSUMER_HIGH_ACTIVITY)      processHope(nbMsgHope,clusters);    if (producerStatus == ProducerStatus.PRODUCER_HIGH_ACTIVITY)      processGive(nbMsgGive,clusters);  }  /**   * send  nb messages on clusters.   */  private void processGive(int nbMsgGive, Hashtable clusters) {    if (nbMsgGive < 1) return;    // select queue in cluster who have a rateOfFlow > 1    Vector selected = new Vector();    for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) {      AgentId id = (AgentId) e.nextElement();      if (((Float) clusters.get(id)).floatValue() >= 1 &&           !id.equals(clusterQueueImpl.getDestId()))        selected.add(id);    }        if (selected.size() == 0) return;        int givePerQueue = nbMsgGive / selected.size();        LBMessageGive msgGive = new LBMessageGive(validityPeriod,rateOfFlow);    ClientMessages cm = new ClientMessages();        if (givePerQueue == 0 && nbMsgGive > 0) {      AgentId id = (AgentId) selected.get(0);      if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                       "LoadingFactor.processGive" +                                      " nbMsgGive = " + nbMsgGive +                                      ", id = " + id);      for (int i = 0; i < givePerQueue; i++) {        if (! clusterQueueImpl.messagesIsEmpty()) {          Message msg = clusterQueueImpl.removeMessage(0);          cm.addMessage(msg);        } else           break;      }      msgGive.setClientMessages(cm);      Channel.sendTo(id,msgGive);            for (Enumeration e = cm.getMessages().elements(); e.hasMoreElements(); ) {        Message msg = (Message) e.nextElement();        clusterQueueImpl.messageSendToCluster(msg.getIdentifier());        clusterQueueImpl.deletePersistenceMessage(msg);      }    } else {      if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                       "LoadingFactor.processGive" +                                      " givePerQueue = " + givePerQueue +                                      ", selected = " + selected);            for (Enumeration e = selected.elements(); e.hasMoreElements(); ) {        AgentId id = (AgentId) e.nextElement();                for (int i = 0; i < givePerQueue; i++) {          if (clusterQueueImpl.messagesIsEmpty()) break;          Message msg = clusterQueueImpl.removeMessage(0);          cm.addMessage(msg);        }        msgGive.setClientMessages(cm);        Channel.sendTo(id,msgGive);        for (Enumeration e2 = cm.getMessages().elements(); e2.hasMoreElements(); ) {          Message msg = (Message) e2.nextElement();          clusterQueueImpl.messageSendToCluster(msg.getIdentifier());          clusterQueueImpl.deletePersistenceMessage(msg);        }        cm = new ClientMessages();      }    }  }  /**   *    */  private void processHope(int nbMsgHope, Hashtable clusters) {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                     "LoadingFactor.processHope" +                                    " nbMsgHope = " + nbMsgHope);    if (nbMsgHope < 1) return;    Vector selected = new Vector();    for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) {      AgentId id = (AgentId) e.nextElement();      if (((Float) clusters.get(id)).floatValue() <= 1 &&           !id.equals(clusterQueueImpl.getDestId()))        selected.add(id);    }    if (selected.size() == 0) return;    int hopePerQueue = nbMsgHope / selected.size();    if (hopePerQueue == 0 && nbMsgHope > 0) {      AgentId id = (AgentId) selected.get(0);            if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                       "LoadingFactor.processHope" +                                      " nbMsgHope = " + nbMsgHope +                                      ", id = " + id);      LBMessageHope msgHope = new LBMessageHope(validityPeriod,rateOfFlow);      msgHope.setNbMsg(nbMsgHope);      Channel.sendTo(id,msgHope);          } else {      if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                       "LoadingFactor.processHope" +                                      " hopePerQueue = " + hopePerQueue +                                      ", selected = " + selected);            LBMessageHope msgHope = new LBMessageHope(validityPeriod,rateOfFlow);      for (Enumeration e = selected.elements(); e.hasMoreElements(); ) {        AgentId id = (AgentId) e.nextElement();        msgHope.setNbMsg(hopePerQueue);        Channel.sendTo(id,msgHope);      }    }  }  public void processGive(AgentId to, LBCycleLife cycle) {    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG,                                     "LoadingFactor.processGive" +                                    " to = " + to +                                    ", cycle = " + cycle);    Channel.sendTo(to,cycle);    ClientMessages cm = cycle.getClientMessages();    for (Enumeration e = cm.getMessages().elements(); e.hasMoreElements(); ) {      Message msg = (Message) e.nextElement();      clusterQueueImpl.messageSendToCluster(msg.getIdentifier());      clusterQueueImpl.deletePersistenceMessage(msg);    }  }  public String toString() {    StringBuffer str = new StringBuffer();    str.append("LoadingFactor (status=");    str.append(Status.names[status]);    str.append(", consumerStatus=");    str.append(ConsumerStatus.names[consumerStatus]);    str.append(", producerStatus=");    str.append(ProducerStatus.names[producerStatus]);    str.append(", producThreshold=");    str.append(producThreshold);    str.append(", consumThreshold=");    str.append(consumThreshold);    str.append(", autoEvalThreshold=");    str.append(autoEvalThreshold);    str.append(", nbOfPendingMessages=");    str.append(nbOfPendingMessages);    str.append(", nbOfPendingRequests=");    str.append(nbOfPendingRequests);    str.append(", rateOfFlow=");    str.append(rateOfFlow);    str.append(", overLoaded=");    str.append(overLoaded);    str.append(")");    return str.toString();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -