⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    // Get the logging monitor from current server MonologLoggerFactory    // Be careful, logmon is initialized from name and not this.name !!    logmon = Debug.getLogger(Debug.A3Network + '.' + name);    logmon.log(BasicLevel.DEBUG, name + ", initialized");    WDActivationPeriod = Long.getLong("WDActivationPeriod",                                      WDActivationPeriod).longValue();    WDActivationPeriod = Long.getLong(name + ".WDActivationPeriod",                                      WDActivationPeriod).longValue();    WDNbRetryLevel1 = Integer.getInteger("WDNbRetryLevel1",                                         WDNbRetryLevel1).intValue();    WDNbRetryLevel1 = Integer.getInteger(name + ".WDNbRetryLevel1",                                         WDNbRetryLevel1).intValue();    WDRetryPeriod1 = Long.getLong("WDRetryPeriod1",                                  WDRetryPeriod1).longValue();    WDRetryPeriod1 = Long.getLong(name + ".WDRetryPeriod1",                                  WDRetryPeriod1).longValue();    WDNbRetryLevel2 = Integer.getInteger("WDNbRetryLevel2",                                         WDNbRetryLevel2).intValue();    WDNbRetryLevel2 = Integer.getInteger(name + ".WDNbRetryLevel2",                                         WDNbRetryLevel2).intValue();    WDRetryPeriod2 = Long.getLong("WDRetryPeriod2",                                  WDRetryPeriod2).longValue();    WDRetryPeriod2 = Long.getLong(name + ".WDRetryPeriod2",                                  WDRetryPeriod2).longValue();    WDRetryPeriod3 = Long.getLong("WDRetryPeriod3",                                  WDRetryPeriod3).longValue();    WDRetryPeriod3 = Long.getLong(name + ".WDRetryPeriod3",                                  WDRetryPeriod3).longValue();    // Sorts the array of server ids into ascending numerical order.    Arrays.sort(servers);    this.servers = servers;    this.serversFN = name + "Servers";    this.bootTSFN = name + "BootTS";    restore();  }  /**   * Adds the server sid in the network configuration.   *   * @param id	the unique server id.   */  synchronized void addServer(short id) throws Exception {    // First we have to verify that id is not already in servers    int idx = index(id);    if (idx >= 0) return;    idx = -idx -1;    // Allocates new array for stamp and server    int[] newStamp = new int[servers.length+1];    byte[] newStampBuf = new byte[4*(servers.length+1)];    int[] newBootTS = new int[servers.length+1];    short[] newServers = new short[servers.length+1];    // Copy old data from stamp and server, let a free room for the new one.    int j = 0;    for (int i=0; i<servers.length; i++) {      if (i == idx) j++;      newServers[j] = servers[i];      newBootTS[j] = bootTS[i];      newStamp[j] = stamp[i];      j++;    }    if (idx > 0)      System.arraycopy(stampbuf, 0, newStampBuf, 0, idx*4);    if (idx < servers.length)      System.arraycopy(stampbuf, idx*4,                       newStampBuf, (idx+1)*4, (servers.length-idx)*4);    newServers[idx] = id;    newBootTS[idx] = -1;    newStamp[idx] = -1;		// useless    newStampBuf[idx] = 0;	// useless    newStampBuf[idx+1] = 0;	// useless    newStampBuf[idx+2] = 0; 	// useless    newStampBuf[idx+3] = 0; 	// useless    stamp = newStamp;    stampbuf = newStampBuf;    servers = newServers;    bootTS = newBootTS;    // be careful, set again the index of local server.    idxLS = index(sid);    // Save the servers configuration and the logical time stamp.    AgentServer.getTransaction().save(servers, serversFN);    AgentServer.getTransaction().save(bootTS, bootTSFN);    AgentServer.getTransaction().saveByteArray(stampbuf, name);  }  /**   * Removes the server sid in the network configuration.   *   * @param id	the unique server id.   */  synchronized void delServer(short id) throws Exception {    // First we have to verify that id is already in servers    int idx = index(id);    if (idx < 0) return;    int[] newStamp = new int[servers.length-1];    byte[] newStampBuf = new byte[4*(servers.length-1)];    int[] newBootTS = new int[servers.length-1];    short[] newServers = new short[servers.length-1];    int j = 0;    for (int i=0; i<servers.length; i++) {      if (id == servers[i]) {        idx = i;        continue;      }      newServers[j] = servers[i];      newBootTS[j] = bootTS[i];      newStamp[j] = stamp[i];      j++;    }    if (idx > 0)      System.arraycopy(stampbuf, 0, newStampBuf, 0, idx*4);    if (idx < (servers.length-1))      System.arraycopy(stampbuf, (idx+1)*4,                       newStampBuf, idx*4, (servers.length-idx-1)*4);    stamp = newStamp;    stampbuf = newStampBuf;    servers = newServers;    bootTS = newBootTS;    // be careful, set again the index of local server.    idxLS = index(sid);    // Save the servers configuration and the logical time stamp.    AgentServer.getTransaction().save(servers, serversFN);    AgentServer.getTransaction().save(bootTS, bootTSFN);    AgentServer.getTransaction().saveByteArray(stampbuf, name);  }  /**   * Reset all information related to server sid in the network configuration.   *   * @param id	the unique server id.   */  synchronized void resetServer(short id, int boot) throws IOException {    // First we have to verify that id is already in servers    int idx = index(id);    if (idx < 0) return;    // TODO...    // Save the servers configuration and the logical time stamp.    AgentServer.getTransaction().save(servers, serversFN);    AgentServer.getTransaction().save(bootTS, bootTSFN);    AgentServer.getTransaction().saveByteArray(stampbuf, name);  }  /**   *  Adds a message in "ready to deliver" list. This method allocates a   * new time stamp to the message ; be Careful, changing the stamp imply   * the filename change too.   */  public void post(Message msg) throws Exception {    if ((msg.not.expiration > 0) &&        (msg.not.expiration < System.currentTimeMillis())) {      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG,                   getName() + ": removes expired notification " +                   msg.from + ", " + msg.not);      return;    }    short to = AgentServer.getServerDesc(msg.to.to).gateway;    // Allocates a new timestamp. Be careful, if the message needs to be    // routed we have to use the next destination in timestamp generation.    msg.source = AgentServer.getServerId();    msg.dest = to;    msg.stamp = getSendUpdate(to);    // Saves the message.    msg.save();    // Push it in "ready to deliver" queue.    qout.push(msg);  }  /**   *  Returns the index in internal table of the specified server.   * The servers array must be ordered.   *   * @param id	the unique server id.   */  protected final int index(short id) {    int idx = Arrays.binarySearch(servers, id);    return idx;  }  protected final byte[] getStamp() {    return stampbuf;  }  protected final void setStamp(byte[] stampbuf) {    this.stampbuf = stampbuf;    stamp = new int[servers.length];    for (int i=0; i<stamp.length; i++) {      stamp[i] = ((stampbuf[(i*4)+0] & 0xFF) << 24) +        ((stampbuf[(i*4)+1] & 0xFF) << 16) +        ((stampbuf[(i*4)+2] & 0xFF) <<  8) +        (stampbuf[(i*4)+3] & 0xFF);    }      }  private void updateStamp(int idx, int update) throws IOException {    stamp[idx] = update;    stampbuf[(idx*4)+0] = (byte)((update >>> 24) & 0xFF);    stampbuf[(idx*4)+1] = (byte)((update >>> 16) & 0xFF);    stampbuf[(idx*4)+2] = (byte)((update >>>  8) & 0xFF);    stampbuf[(idx*4)+3] = (byte)(update & 0xFF);    AgentServer.getTransaction().saveByteArray(stampbuf, name);  }  /** The message can be delivered. */  static final int DELIVER = 0;//   /**//    *  There is other message in the causal ordering before this one.//    * This cannot happened with a FIFO ordering.//    *///   static final int WAIT_TO_DELIVER = 1;  /** The message has already been delivered. */  static final int ALREADY_DELIVERED = 2;  /**   *  Test if a received message with the specified clock must be   * delivered. If the message is ready to be delivered, the method returns   * <code>DELIVER</code> and the matrix clock is updated. If the message has   * already been delivered, the method returns <code>ALREADY_DELIVERED</code>,   * and if other messages are waited before this message the method returns   * <code>WAIT_TO_DELIVER</code>. In the last two case the matrix clock   * remains unchanged.   *   * @param update	The message matrix clock (list of update).   * @return		<code>DELIVER</code>, <code>ALREADY_DELIVERED</code>,   * 			or <code>WAIT_TO_DELIVER</code> code.   */  private synchronized int testRecvUpdate(short source, int update) throws IOException {    int fromIdx = index(source);    if (update > stamp[fromIdx]) {      updateStamp(fromIdx, update);      return DELIVER;    }    return ALREADY_DELIVERED;  }  /**   * Computes the matrix clock of a send message. The server's   * matrix clock is updated.   *   * @param to	The identification of receiver.	   * @return	The message matrix clock (list of update).   */  private synchronized int getSendUpdate(short to) throws IOException {    int update =  stamp[idxLS] +1;    updateStamp(idxLS, update);    return update;  }  final int getBootTS() {    return bootTS[idxLS];  }  final void testBootTS(short source, int boot) throws IOException {    int fromIdx = index(source);    if (boot != bootTS[fromIdx]) {      if (logmon.isLoggable(BasicLevel.WARN))        logmon.log(BasicLevel.WARN,                   getName() + ", reset stamp #" + source + ", "                   + bootTS[fromIdx] + " -> " + boot);      bootTS[fromIdx] = boot;      AgentServer.getTransaction().save(bootTS, bootTSFN);      updateStamp(fromIdx, -1);    }  }  /**   * Try to deliver the received message to the right consumer.   *   * @param msg		the message.   */  protected void deliver(Message msg) throws Exception {    // Get real from serverId.    short source = msg.getSource();    // Test if the message is really for this node (final destination or    // router).    short dest = msg.getDest();    if (dest != AgentServer.getServerId()) {      logmon.log(BasicLevel.ERROR,                 getName() + ", recv bad msg#" + msg.getStamp() +                 " really to " + dest +                 " by " + source);      throw new Exception("recv bad msg#" + msg.getStamp() +                          " really to " + dest +                          " by " + source);    }    if (logmon.isLoggable(BasicLevel.DEBUG))      logmon.log(BasicLevel.DEBUG,                 getName() + ", recv msg#" + msg.getStamp() +                 " from " + msg.from +                 " to " + msg.to +                 " by " + source);    AgentServer.getServerDesc(source).active = true;    AgentServer.getServerDesc(source).retry = 0;    // Start a transaction in order to ensure atomicity of clock updates    // and queue changes.    AgentServer.getTransaction().begin();    // Test if the message can be delivered then deliver it    // else put it in the waiting list    int todo = testRecvUpdate(source, msg.getStamp());    if (todo == DELIVER) {      // Deliver the message then try to deliver alls waiting message.      // Allocate a local time to the message to order it in      // local queue, and save it.      Channel.post(msg);      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG,                   getName() + ", deliver msg#" + msg.getStamp());      Channel.save();      AgentServer.getTransaction().commit();      // then commit and validate the message.      Channel.validate();      AgentServer.getTransaction().release();    } else {//    it's an already delivered message, we have just to re-send an//    aknowledge (see below).      AgentServer.getTransaction().commit();      AgentServer.getTransaction().release();    }  }  /**   * Deletes the component, removes all persistent datas. The component   * may have been previously stopped, and removed from MessageConsumer   * list.   * This operation use Transaction calls, you may use commit to validate it.   *   * @see fr.dyade.aaa.util.Transaction    */  public void delete() throws IllegalStateException {    if (isRunning()) throw new IllegalStateException();    AgentServer.getTransaction().delete(serversFN);    AgentServer.getTransaction().delete(bootTSFN);    AgentServer.getTransaction().delete(name);  }  /**   * Validates all messages pushed in queue during transaction session.   */  public void validate() {    qout.validate();  }  public MessageQueue getQueue() {    return qout;  }  /**   * Updates the network port.   */  public void setPort(int port) {    this.port = port;  }  public final int getPort() {    return port;  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -