⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpmonitorserver.java

📁 java实现的P2P多agent中间件
💻 JAVA
字号:
/*****************************************************************
 JADE - Java Agent DEvelopment Framework is a framework to develop 
 multi-agent systems in compliance with the FIPA specifications.
 Copyright (C) 2000 CSELT S.p.A. 

 GNU Lesser General Public License

 This library is free software; you can redistribute it and/or
 modify it under the terms of the GNU Lesser General Public
 License as published by the Free Software Foundation, 
 version 2.1 of the License. 

 This library is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 Lesser General Public License for more details.

 You should have received a copy of the GNU Lesser General Public
 License along with this library; if not, write to the
 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 Boston, MA  02111-1307, USA.
 *****************************************************************/

package jade.core.nodeMonitoring;

//#APIDOC_EXCLUDE_FILE
// Take care that the DOTNET build file (dotnet.xml) uses this file (it is copied just after the preprocessor excluded it)
//#J2ME_EXCLUDE_FILE

import jade.core.Profile;

import java.io.IOException;
import java.nio.ByteBuffer;

//#DOTNET_EXCLUDE_BEGIN
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.BindException;
//#DOTNET_EXCLUDE_END

/*#DOTNET_INCLUDE_BEGIN
 import System.Net.*;
 import System.Net.Sockets.*;
 #DOTNET_INCLUDE_END*/

import java.util.Calendar;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;

import jade.util.Logger;

/**
 * The <code>UDPMonitorServer</code> is used by any instance of the class
 * <code>UDPNodeFailureMonitor</code> to receive UDP ping messages from nodes.
 * 
 * The server is only running if there are nodes to supervise. By default it
 * waits at port 28000 for incomming UDP datagrams. The maximum time between
 * two ping messages is by default 3 second. After 5 seconds a node is marked 
 * for removing from the platform.
 * <p>
 * 
 * @author Roland Mungenast - Profactor
 * @since JADE 3.3
 * @author Federico Pieri - ERXA
 * @since JADE 3.3.NET
 */
class UDPMonitorServer {

	private Logger logger;
	
	private UDPNodeMonitoringService myService = null;
	
	private String host;
	private int port;
	private int pingDelay;
	private int pingDelayLimit;
	private int unreachLimit;

	//#DOTNET_EXCLUDE_BEGIN
	private DatagramChannel server;
	private Selector selector;
	//#DOTNET_EXCLUDE_END
	
	private Hashtable targets = new Hashtable();
	private PingHandler pingHandler;
	private Timer timer;
	private Hashtable deadlines = new Hashtable();

	/*#DOTNET_INCLUDE_BEGIN
	 private Socket server;
	 #DOTNET_INCLUDE_END*/

	/**
	 * Class to store a deadline for the next ping
	 * of a targeted node
	 */
	private class Deadline extends TimerTask {

		private String nodeID;

		private long time;

		public Deadline(String nodeID) {
			this.nodeID = nodeID;
			this.time = System.currentTimeMillis();
		}

		public long getTime() {
			return time;
		}

		public String getNodeID() {
			return nodeID;
		}

		public void run() {
			UDPNodeFailureMonitor mon = (UDPNodeFailureMonitor) targets.get(nodeID);

			// node is still supervised and there are no new deadlines
			if (mon != null) {
				synchronized (mon) { // Mutual exclusion with pingReceived()
					if (mon.getDeadline() == time) {
						timeout(nodeID, mon);
					}
				}
			}
		}
	}

	/**
	 * Class to handles incomming ping messages
	 */
	private class PingHandler implements Runnable {

		private final byte TERMINATING_INFO = 1; // bit 1
		private boolean interrupted = false;
		private Thread thread;

		public PingHandler(String name) {
			thread = new Thread(this, name);
		}

		private void handlePing() throws IOException {
			// allocate maximum size of one UDP packet
			ByteBuffer datagramBuffer = ByteBuffer.allocate(1 << 16);

			//#DOTNET_EXCLUDE_BEGIN
			SocketAddress address = server.receive(datagramBuffer);
			//#DOTNET_EXCLUDE_END

			/*#DOTNET_INCLUDE_BEGIN
			 ubyte[] recData = new ubyte[datagramBuffer.getUByte().length];

			 if ( server != null)
			 {
			 try
			 {
			 if (server.get_Available() <= 0)
			 return;
			 }
			 catch (System.ObjectDisposedException ode)
			 {
			 return;
			 }
			 }
			 else
			 return;

			 try
			 {
			 server.Receive(recData, 0, server.get_Available(), SocketFlags.None);
			 }
			 catch (SocketException se)
			 {
			 int socketError = se.get_ErrorCode();
			 return;
			 }
			 IPEndPoint IPendPt  = (IPEndPoint) server.get_LocalEndPoint();
			 IPAddress address	= IPendPt.get_Address();
			 datagramBuffer.copyUByte(recData);
			 #DOTNET_INCLUDE_END*/

			datagramBuffer.position(0);

			if (address != null) {

				int nodeIDLength = datagramBuffer.getInt();

				// get node ID
				byte[] bb = new byte[nodeIDLength];
				datagramBuffer.get(bb, 0, nodeIDLength);
				String nodeID = new String(bb);

				// analyse info byte
				byte info = datagramBuffer.get();
				boolean isTerminating = (info & TERMINATING_INFO) != 0;

				// cancel arleady existing deadline
				TimerTask currDeadline = (TimerTask) deadlines.get(nodeID);
				if (currDeadline != null) {
					currDeadline.cancel();
				}
				pingReceived(nodeID, isTerminating);
			}
		}

		public void run() {
			while (!interrupted) { // endless loop
				try {
					//#DOTNET_EXCLUDE_BEGIN
					selector.select();

					Set keys = selector.selectedKeys();
					interrupted = keys.size() == 0;
					Iterator i = keys.iterator();

					while (i.hasNext()) {
						SelectionKey key = (SelectionKey) i.next();
						i.remove();
						if (key.isValid() && key.isReadable()) {
							//#DOTNET_EXCLUDE_END
							handlePing();
							//#DOTNET_EXCLUDE_BEGIN
						}
					}
					//#DOTNET_EXCLUDE_END 
				} catch (Exception e) // .net requires I catch Exception instead of IOException
				{
					if (logger.isLoggable(Logger.SEVERE))
						logger.log(Logger.SEVERE, "UDP Connection error ");
				}
			} // for
		}

		public void start() {
			thread.start();
		}

		public void stop() {
			interrupted = true;
		}
	}

	/**
	 * Constructs a new UDPMonitorServer object
	 */
	UDPMonitorServer(UDPNodeMonitoringService s, String h, int p, int pd, int pdl, int ul) {
		myService = s;
		host = h;
		port = p;
		pingDelay = pd;
		pingDelayLimit = pdl;
		unreachLimit = ul;

		logger = Logger.getMyLogger(UDPNodeMonitoringService.NAME);
		try {
			//#DOTNET_EXCLUDE_BEGIN
			server = DatagramChannel.open();
			//#DOTNET_EXCLUDE_END

			/*#DOTNET_INCLUDE_BEGIN
			 server = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
			 #DOTNET_INCLUDE_END*/
		} catch (Exception e) { // .net requires I catch Exception instead of IOException
			logger.log(Logger.SEVERE, "Cannot open UDP channel. " + e);
			e.printStackTrace();
		}
	}

	String getHost() {
		return host;
	}
	
	int getPort() {
		return port;
	}
	
	int getPingDelay() {
		return pingDelay;
	}
	
	int getPingDelayLimit() {
		return pingDelayLimit;
	}
	
	int getUnreachableLimit() {
		return unreachLimit;
	}
	
	/**
	 * Starts the UDP server
	 */
	synchronized void start() {
		try {
			// Start UDP server

			//#DOTNET_EXCLUDE_BEGIN
			server.configureBlocking(false);
            try {
                server.socket().bind(new InetSocketAddress(host, port));
            } catch (BindException e){
                server.socket().bind(null);
                port  = server.socket().getLocalPort();
                logger.log(Logger.INFO, "New UDP monitoring server port  " + port);
            }
            //#DOTNET_EXCLUDE_END

			/*#DOTNET_INCLUDE_BEGIN
			 server = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
			 server.set_Blocking( false );
			 server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
			 String defaultNetworkName = Profile.getDefaultNetworkName();
			 System.Net.IPHostEntry hostEntry = System.Net.Dns.GetHostByName( defaultNetworkName );
			 System.Net.IPAddress[] ipAddresses = hostEntry.get_AddressList();
			 long ipAddressLong = ipAddresses[0].get_Address();
			 if ( !server.get_Connected() )
			 {
			 IPEndPoint ipPoint = new IPEndPoint(IPAddress.Any, port);
			 try
			 {
			 server.Bind( ipPoint );
			 }
			 catch(SocketException se)
			 {
			 int socketError = se.get_ErrorCode();
			 }
			 }
			 #DOTNET_INCLUDE_END*/

			//#DOTNET_EXCLUDE_BEGIN
			// Create and register Selector
			selector = Selector.open();
			server.register(selector, SelectionKey.OP_READ);
			//#DOTNET_EXCLUDE_END

			// Start PingHandler thread
			pingHandler = new PingHandler("UDPNodeFailureMonitor-PingHandler");
			pingHandler.start();

			// start timer for deadlines
			timer = new Timer();

			if (logger.isLoggable(Logger.CONFIG))
				logger.log(Logger.CONFIG, "UDP monitoring server started.");
		} catch (Throwable t) { // .net requires I catch Exception instead of IOException
			logger.log(Logger.SEVERE, "UDP monitoring server cannot be started. " + t);
			t.printStackTrace();
		}
	}

	/**
	 * Stops the UDP server
	 */
	synchronized void stop() {
		try {
			pingHandler.stop();
			timer.cancel();
			deadlines.clear();

			//#DOTNET_EXCLUDE_BEGIN
			server.disconnect();
			//#DOTNET_EXCLUDE_END

			/*#DOTNET_INCLUDE_BEGIN
			 server.Shutdown(SocketShutdown.Both);
			 server.Close();
			 #DOTNET_INCLUDE_END*/

			if (logger.isLoggable(Logger.INFO))
				logger.log(Logger.INFO, "UDP monitoring server has been stopped.");

		} catch (Exception e) {
			if (logger.isLoggable(Logger.SEVERE))
				logger.log(Logger.SEVERE, "Error shutting down the UDP monitor server");
		}
	}

	/**
	 * Registers a <code>UDPNodeFailureMonitor</code>.
	 * All nodes targeted by this monitor are now supervised
	 * by the <code>UDPMonitorServer</code>. Its listener
	 * gets informed about any state changes.
	 */
	public void register(UDPNodeFailureMonitor m) {
		String nodeID = m.getNode().getName();
		targets.put(nodeID, m);
		addDeadline(nodeID, pingDelayLimit);
	}

	/**
	 * Deregisters a <code>UDPNodeFailureMonitor</code> and all
	 * its targeted nodes from monitoring.
	 */
	public void deregister(UDPNodeFailureMonitor m) {
		String nodeId = m.getNode().getName();
		targets.remove(nodeId);
	}

	/**
	 * This method is invoced by a PingHandler thread when a 
	 * new ping message has been received
	 * @param nodeID identification of the sender node
	 * @param isTerminating true if the sender is currently shutting down
	 */
	protected void pingReceived(String nodeID, boolean isTerminating) {

		if (logger.isLoggable(Logger.FINEST)) {
			logger.log(Logger.FINEST, "UDP ping message for node '" + nodeID + "' received. (termination-flag: " + isTerminating + ")");
		}
		UDPNodeFailureMonitor mon = (UDPNodeFailureMonitor) targets.get(nodeID);

		if (mon != null) {
			synchronized (mon) { // Mutual exclusion with Timer expiration
				mon.setLastPing(System.currentTimeMillis()); // update time for last ping
				addDeadline(nodeID, pingDelayLimit);
				int state = mon.getState();
	
				// state transitions
				if (state == UDPNodeFailureMonitor.STATE_CONNECTED && isTerminating) {
					mon.setState(UDPNodeFailureMonitor.STATE_FINAL);
	
				} else if (state == UDPNodeFailureMonitor.STATE_UNREACHABLE && !isTerminating) {
					mon.setState(UDPNodeFailureMonitor.STATE_CONNECTED);
					addDeadline(nodeID, pingDelayLimit);
	
				} else if (state == UDPNodeFailureMonitor.STATE_UNREACHABLE && isTerminating) {
					mon.setState(UDPNodeFailureMonitor.STATE_FINAL);
				}
			}
		} 
		else {
			logger.log(Logger.WARNING, "UDP ping message with the unknown node ID '" + nodeID + "' received");
			myService.handleOrphanNode(nodeID);
		}
	}

	/**
	 * This method is invoced by a TimeoutHandler at a timeout
	 */
	protected void timeout(String nodeID, UDPNodeFailureMonitor mon) {
		int oldState = mon.getState();
		int newState = oldState;

		if (logger.isLoggable(Logger.FINEST)) {
			logger.log(Logger.FINEST, "Timeout for '" + nodeID + "'");
		}

		if (oldState == UDPNodeFailureMonitor.STATE_CONNECTED) {
			newState = UDPNodeFailureMonitor.STATE_UNREACHABLE;
			addDeadline(nodeID, unreachLimit);

		} else if (oldState == UDPNodeFailureMonitor.STATE_UNREACHABLE) {
			newState = UDPNodeFailureMonitor.STATE_FINAL;
		}
		
		if (newState != oldState)
			mon.setState(newState);
	}

	private void addDeadline(String nodeID, int delay) {
		Calendar now = Calendar.getInstance();
		now.add(Calendar.MILLISECOND, delay);

		Deadline deadline = new Deadline(nodeID);
		UDPNodeFailureMonitor mon = (UDPNodeFailureMonitor) targets.get(nodeID);
		if (mon != null) {
			mon.setDeadline(deadline.getTime());
			deadlines.put(nodeID, deadline);
			timer.schedule(deadline, delay);
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -