📄 network.java
字号:
// Get the logging monitor from current server MonologLoggerFactory // Be careful, logmon is initialized from name and not this.name !! logmon = Debug.getLogger(Debug.A3Network + '.' + name); logmon.log(BasicLevel.DEBUG, name + ", initialized"); WDActivationPeriod = Long.getLong("WDActivationPeriod", WDActivationPeriod).longValue(); WDActivationPeriod = Long.getLong(name + ".WDActivationPeriod", WDActivationPeriod).longValue(); WDNbRetryLevel1 = Integer.getInteger("WDNbRetryLevel1", WDNbRetryLevel1).intValue(); WDNbRetryLevel1 = Integer.getInteger(name + ".WDNbRetryLevel1", WDNbRetryLevel1).intValue(); WDRetryPeriod1 = Long.getLong("WDRetryPeriod1", WDRetryPeriod1).longValue(); WDRetryPeriod1 = Long.getLong(name + ".WDRetryPeriod1", WDRetryPeriod1).longValue(); WDNbRetryLevel2 = Integer.getInteger("WDNbRetryLevel2", WDNbRetryLevel2).intValue(); WDNbRetryLevel2 = Integer.getInteger(name + ".WDNbRetryLevel2", WDNbRetryLevel2).intValue(); WDRetryPeriod2 = Long.getLong("WDRetryPeriod2", WDRetryPeriod2).longValue(); WDRetryPeriod2 = Long.getLong(name + ".WDRetryPeriod2", WDRetryPeriod2).longValue(); WDRetryPeriod3 = Long.getLong("WDRetryPeriod3", WDRetryPeriod3).longValue(); WDRetryPeriod3 = Long.getLong(name + ".WDRetryPeriod3", WDRetryPeriod3).longValue(); // Sorts the array of server ids into ascending numerical order. Arrays.sort(servers); this.servers = servers; this.serversFN = name + "Servers"; this.bootTSFN = name + "BootTS"; restore(); } /** * Adds the server sid in the network configuration. * * @param id the unique server id. */ synchronized void addServer(short id) throws Exception { // First we have to verify that id is not already in servers int idx = index(id); if (idx >= 0) return; idx = -idx -1; // Allocates new array for stamp and server int[] newStamp = new int[servers.length+1]; byte[] newStampBuf = new byte[4*(servers.length+1)]; int[] newBootTS = new int[servers.length+1]; short[] newServers = new short[servers.length+1]; // Copy old data from stamp and server, let a free room for the new one. int j = 0; for (int i=0; i<servers.length; i++) { if (i == idx) j++; newServers[j] = servers[i]; newBootTS[j] = bootTS[i]; newStamp[j] = stamp[i]; j++; } if (idx > 0) System.arraycopy(stampbuf, 0, newStampBuf, 0, idx*4); if (idx < servers.length) System.arraycopy(stampbuf, idx*4, newStampBuf, (idx+1)*4, (servers.length-idx)*4); newServers[idx] = id; newBootTS[idx] = -1; newStamp[idx] = -1; // useless newStampBuf[idx] = 0; // useless newStampBuf[idx+1] = 0; // useless newStampBuf[idx+2] = 0; // useless newStampBuf[idx+3] = 0; // useless stamp = newStamp; stampbuf = newStampBuf; servers = newServers; bootTS = newBootTS; // be careful, set again the index of local server. idxLS = index(sid); // Save the servers configuration and the logical time stamp. AgentServer.getTransaction().save(servers, serversFN); AgentServer.getTransaction().save(bootTS, bootTSFN); AgentServer.getTransaction().saveByteArray(stampbuf, name); } /** * Removes the server sid in the network configuration. * * @param id the unique server id. */ synchronized void delServer(short id) throws Exception { // First we have to verify that id is already in servers int idx = index(id); if (idx < 0) return; int[] newStamp = new int[servers.length-1]; byte[] newStampBuf = new byte[4*(servers.length-1)]; int[] newBootTS = new int[servers.length-1]; short[] newServers = new short[servers.length-1]; int j = 0; for (int i=0; i<servers.length; i++) { if (id == servers[i]) { idx = i; continue; } newServers[j] = servers[i]; newBootTS[j] = bootTS[i]; newStamp[j] = stamp[i]; j++; } if (idx > 0) System.arraycopy(stampbuf, 0, newStampBuf, 0, idx*4); if (idx < (servers.length-1)) System.arraycopy(stampbuf, (idx+1)*4, newStampBuf, idx*4, (servers.length-idx-1)*4); stamp = newStamp; stampbuf = newStampBuf; servers = newServers; bootTS = newBootTS; // be careful, set again the index of local server. idxLS = index(sid); // Save the servers configuration and the logical time stamp. AgentServer.getTransaction().save(servers, serversFN); AgentServer.getTransaction().save(bootTS, bootTSFN); AgentServer.getTransaction().saveByteArray(stampbuf, name); } /** * Reset all information related to server sid in the network configuration. * * @param id the unique server id. */ synchronized void resetServer(short id, int boot) throws IOException { // First we have to verify that id is already in servers int idx = index(id); if (idx < 0) return; // TODO... // Save the servers configuration and the logical time stamp. AgentServer.getTransaction().save(servers, serversFN); AgentServer.getTransaction().save(bootTS, bootTSFN); AgentServer.getTransaction().saveByteArray(stampbuf, name); } /** * Adds a message in "ready to deliver" list. This method allocates a * new time stamp to the message ; be Careful, changing the stamp imply * the filename change too. */ public void post(Message msg) throws Exception { if ((msg.not.expiration > 0) && (msg.not.expiration < System.currentTimeMillis())) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ": removes expired notification " + msg.from + ", " + msg.not); return; } short to = AgentServer.getServerDesc(msg.to.to).gateway; // Allocates a new timestamp. Be careful, if the message needs to be // routed we have to use the next destination in timestamp generation. msg.source = AgentServer.getServerId(); msg.dest = to; msg.stamp = getSendUpdate(to); // Saves the message. msg.save(); // Push it in "ready to deliver" queue. qout.push(msg); } /** * Returns the index in internal table of the specified server. * The servers array must be ordered. * * @param id the unique server id. */ protected final int index(short id) { int idx = Arrays.binarySearch(servers, id); return idx; } protected final byte[] getStamp() { return stampbuf; } protected final void setStamp(byte[] stampbuf) { this.stampbuf = stampbuf; stamp = new int[servers.length]; for (int i=0; i<stamp.length; i++) { stamp[i] = ((stampbuf[(i*4)+0] & 0xFF) << 24) + ((stampbuf[(i*4)+1] & 0xFF) << 16) + ((stampbuf[(i*4)+2] & 0xFF) << 8) + (stampbuf[(i*4)+3] & 0xFF); } } private void updateStamp(int idx, int update) throws IOException { stamp[idx] = update; stampbuf[(idx*4)+0] = (byte)((update >>> 24) & 0xFF); stampbuf[(idx*4)+1] = (byte)((update >>> 16) & 0xFF); stampbuf[(idx*4)+2] = (byte)((update >>> 8) & 0xFF); stampbuf[(idx*4)+3] = (byte)(update & 0xFF); AgentServer.getTransaction().saveByteArray(stampbuf, name); } /** The message can be delivered. */ static final int DELIVER = 0;// /**// * There is other message in the causal ordering before this one.// * This cannot happened with a FIFO ordering.// */// static final int WAIT_TO_DELIVER = 1; /** The message has already been delivered. */ static final int ALREADY_DELIVERED = 2; /** * Test if a received message with the specified clock must be * delivered. If the message is ready to be delivered, the method returns * <code>DELIVER</code> and the matrix clock is updated. If the message has * already been delivered, the method returns <code>ALREADY_DELIVERED</code>, * and if other messages are waited before this message the method returns * <code>WAIT_TO_DELIVER</code>. In the last two case the matrix clock * remains unchanged. * * @param update The message matrix clock (list of update). * @return <code>DELIVER</code>, <code>ALREADY_DELIVERED</code>, * or <code>WAIT_TO_DELIVER</code> code. */ private synchronized int testRecvUpdate(short source, int update) throws IOException { int fromIdx = index(source); if (update > stamp[fromIdx]) { updateStamp(fromIdx, update); return DELIVER; } return ALREADY_DELIVERED; } /** * Computes the matrix clock of a send message. The server's * matrix clock is updated. * * @param to The identification of receiver. * @return The message matrix clock (list of update). */ private synchronized int getSendUpdate(short to) throws IOException { int update = stamp[idxLS] +1; updateStamp(idxLS, update); return update; } final int getBootTS() { return bootTS[idxLS]; } final void testBootTS(short source, int boot) throws IOException { int fromIdx = index(source); if (boot != bootTS[fromIdx]) { if (logmon.isLoggable(BasicLevel.WARN)) logmon.log(BasicLevel.WARN, getName() + ", reset stamp #" + source + ", " + bootTS[fromIdx] + " -> " + boot); bootTS[fromIdx] = boot; AgentServer.getTransaction().save(bootTS, bootTSFN); updateStamp(fromIdx, -1); } } /** * Try to deliver the received message to the right consumer. * * @param msg the message. */ protected void deliver(Message msg) throws Exception { // Get real from serverId. short source = msg.getSource(); // Test if the message is really for this node (final destination or // router). short dest = msg.getDest(); if (dest != AgentServer.getServerId()) { logmon.log(BasicLevel.ERROR, getName() + ", recv bad msg#" + msg.getStamp() + " really to " + dest + " by " + source); throw new Exception("recv bad msg#" + msg.getStamp() + " really to " + dest + " by " + source); } if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", recv msg#" + msg.getStamp() + " from " + msg.from + " to " + msg.to + " by " + source); AgentServer.getServerDesc(source).active = true; AgentServer.getServerDesc(source).retry = 0; // Start a transaction in order to ensure atomicity of clock updates // and queue changes. AgentServer.getTransaction().begin(); // Test if the message can be delivered then deliver it // else put it in the waiting list int todo = testRecvUpdate(source, msg.getStamp()); if (todo == DELIVER) { // Deliver the message then try to deliver alls waiting message. // Allocate a local time to the message to order it in // local queue, and save it. Channel.post(msg); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", deliver msg#" + msg.getStamp()); Channel.save(); AgentServer.getTransaction().commit(); // then commit and validate the message. Channel.validate(); AgentServer.getTransaction().release(); } else {// it's an already delivered message, we have just to re-send an// aknowledge (see below). AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); } } /** * Deletes the component, removes all persistent datas. The component * may have been previously stopped, and removed from MessageConsumer * list. * This operation use Transaction calls, you may use commit to validate it. * * @see fr.dyade.aaa.util.Transaction */ public void delete() throws IllegalStateException { if (isRunning()) throw new IllegalStateException(); AgentServer.getTransaction().delete(serversFN); AgentServer.getTransaction().delete(bootTSFN); AgentServer.getTransaction().delete(name); } /** * Validates all messages pushed in queue during transaction session. */ public void validate() { qout.validate(); } public MessageQueue getQueue() { return qout; } /** * Updates the network port. */ public void setPort(int port) { this.port = port; } public final int getPort() { return port; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -