⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 targetselector.java

📁 简单的分布式算法
💻 JAVA
字号:
/* ***************************************************************************** * $Id: TargetSelector.java,v 1.1 2003/05/02 00:40:21 jheiss Exp $ ***************************************************************************** * Marshall new connections through the process of having distribution * algorithms pick a target for them, and then hand the completed * connections to the targets to perform the bulk data transfer for the * life of the connection. ***************************************************************************** * Copyright 2003 Jason Heiss *  * This file is part of Distributor. *  * Distributor is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. *  * Distributor is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. *  * You should have received a copy of the GNU General Public License * along with Distributor; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA ***************************************************************************** */package oss.distributor;import java.util.List;import java.util.LinkedList;import java.util.Map;import java.util.HashMap;import java.util.Iterator;import java.util.logging.Logger;import java.io.IOException;import java.nio.channels.SocketChannel;public class TargetSelector implements Runnable{	Distributor distributor;	Logger logger;	List distributionAlgorithms;	Map currentAlgorithm;	List needsDistributing;	List finishedDistributing;	Thread thread;	protected TargetSelector(Distributor distributor)	{		this.distributor = distributor;		currentAlgorithm = new HashMap();		needsDistributing = new LinkedList();		finishedDistributing = new LinkedList();		thread = new Thread(this, getClass().getName());	}	/*	 * This allows Distributor to delay some of our initialization until	 * it is ready.  There are some things we need that Distributor may	 * not have ready at the point at which it constructs us, so we wait	 * and retrieve them at the start of run().	 */	protected void startThread()	{		thread.start();	}	/*	 * Used by Distributor to give us a new client	 */	protected void addNewClient(SocketChannel client)	{		try		{			logger.finest("Setting client channel to non-blocking mode");			client.configureBlocking(false);			addUnconnectedClient(client);		}		catch (IOException e)		{			logger.warning(				"Error setting channels to non-blocking mode: " +				e.getMessage());			try			{				logger.fine("Closing client channel");				client.close();			}			catch (IOException ioe)			{				logger.warning(					"Error closing client channel: " + ioe.getMessage());			}		}	}	/*	 * Used by addNewClient(), or by a DistributionAlgorithm to give us	 * a client which that algorithm was not able to connect to a	 * Target.	 *	 * See the note near the wait() call in run() as to why this method	 * is synchronized.	 */	public synchronized void addUnconnectedClient(SocketChannel client)	{		synchronized (needsDistributing)		{			needsDistributing.add(client);		}		// Wake up this class' thread so it can process the client		notify();	}	/*	 * Used by a DistributionAlgorithm to give us a completed	 * connection.	 *	 * See the note near the wait() call in run() as to why this method	 * is synchronized.	 */	public synchronized void addFinishedClient(Connection conn)	{		synchronized (finishedDistributing)		{			finishedDistributing.add(conn);		}		// Wake up this class' thread so it can process the client		notify();	}	public void run()	{		List needsDistProcessQueue;		List finishedDistProcessQueue;		Iterator iter;		SocketChannel client;		Connection conn;		Iterator algoIter;		DistributionAlgorithm algo;		int i;		// Finish our initialization		logger = distributor.getLogger();		distributionAlgorithms = distributor.getDistributionAlgorithms();		while (true)		{			// If the queues are empty, give up our synchronization lock			// on 'this' and wait for addUnconnectedClient or			// addFinishedClient to notify us that there is a client to			// process.  Synchronization on 'this' is to prevent the			// addUnconnectedClient() and addFinishedClient() methods			// from sneaking another entry into the queue while we're			// checking the sizes.			synchronized (this)			{				if (needsDistributing.size() == 0 &&					finishedDistributing.size() == 0)				{					try { wait(); } catch (InterruptedException e) {}				}			}			//			// Handle clients which need to be distributed			//			// Roll over the needsDistributing queue so that we can			// process it without holding anyone else up			needsDistProcessQueue = needsDistributing;			needsDistributing = new LinkedList();			// Now that we've rolled over the queue, we don't need to			// synchronize, as no one else is going to touch it.			iter = needsDistProcessQueue.iterator();			while (iter.hasNext())			{				client = (SocketChannel) iter.next();				//				// Figure out which algorithm to use for this client				//				// Get the last algorithm used				algo = (DistributionAlgorithm) currentAlgorithm.get(client);				// New clients won't be in the map and thus we'll				// get null.  Start them off with the first algorithm.				if (algo == null)				{					algo =						(DistributionAlgorithm) distributionAlgorithms.get(0);				}				// Otherwise advance to the next algorithm				else				{					i = distributionAlgorithms.indexOf(algo);					if (i < distributionAlgorithms.size())					{						algo =							(DistributionAlgorithm)								distributionAlgorithms.get(i + 1);					}					else					{						// No more algorithms available, disconnect						logger.warning(							"Unable to find a working target for client " +							client);						try { client.close(); } catch (IOException e) {}					}				}				// Record the current algorithm in case it fails to				// find a working target and the client needs				// another trip through this section				currentAlgorithm.put(client, algo);				//				// Ask the algorithm to attempt to find a Target for				// this client				//				logger.finer("Asking " + algo +					" to try to find a target for " + client);				algo.tryToConnect(client);			}			//			// Handle clients that distribution algorithms have			// finished distributing			//			// Roll over the finishedDistributing queue so that we can			// process it without holding anyone else up			finishedDistProcessQueue = finishedDistributing;			finishedDistributing = new LinkedList();			// Now that we've rolled over the queue, we don't need to			// synchronize, as no one else is going to touch it.			iter = finishedDistProcessQueue.iterator();			while (iter.hasNext())			{				conn = (Connection) iter.next();				// Let each distribution algorithm know that a				// successful connection has occurred.  Some				// algorithms want to record that information.				logger.finer(					"Notifying distribution algorithms of successful " +					"connection " + conn);				algoIter = distributionAlgorithms.iterator();				while (algoIter.hasNext())				{					algo = (DistributionAlgorithm) algoIter.next();					algo.connectionNotify(conn);				}				// Yank them from currentAlgorithm				algo =					(DistributionAlgorithm)						currentAlgorithm.remove(conn.getClient());				// Register them with the Target				logger.finer("Registering connection " + conn + "with target");				conn.getTarget().addConnection(conn);			}		}	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -