⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.java

📁 一个基于JAVA的多torrent下载程序
💻 JAVA
字号:
/*
 * File    : ThreadPool.java
 * Created : 21-Nov-2003
 * By      : parg
 * 
 * Azureus - a Java Bittorrent client
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details ( see the LICENSE file ).
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

package org.gudy.azureus2.core3.util;

/**
 * @author parg
 *
 */

import java.util.*;

public class 
ThreadPool 
{
	protected static final int	IDLE_LINGER_TIME	= 10000;
	
	protected static final boolean	LOG_WARNINGS	= false;
	protected static final int		WARN_TIME		= 10000;
	
	protected static List		busy_pools			= new ArrayList();
	protected static boolean	busy_pool_timer_set	= false;
	
	
	private static ThreadLocal		tls	= 
		new ThreadLocal()
		{
			public Object
			initialValue()
			{
				return( null );
			}
		};
		
	protected static void
	checkAllTimeouts()
	{
		List	pools;	
	
			// copy the busy pools to avoid potential deadlock due to synchronization
			// nestings
		
		synchronized( busy_pools ){
			
			pools	= new ArrayList( busy_pools );
		}
		
		for (int i=0;i<pools.size();i++){
			
			((ThreadPool)pools.get(i)).checkTimeouts();
		}
	}
	
	
	private String	name;
	private int		thread_name_index	= 1;
	
	private long	execution_limit;
	
	private Stack	thread_pool;
	private List	busy;
	private boolean	queue_when_full;
	private List	task_queue	= new ArrayList();
	
	private AESemaphore	thread_sem;
	
	public
	ThreadPool(
		String	_name,
		int		max_size )
	{
		this( _name, max_size, false );
	}
	
	public
	ThreadPool(
		String	_name,
		int		_max_size,
		boolean	_queue_when_full )
	{
		name			= _name;
		queue_when_full	= _queue_when_full;
		
		thread_sem = new AESemaphore( "ThreadPool::" + name, _max_size );
		
		thread_pool	= new Stack();
					
		busy		= new ArrayList( _max_size );
	}

	public void
	setExecutionLimit(
		long		millis )
	{
		execution_limit	= millis;
	}
	
	public threadPoolWorker
	run(
		AERunnable	runnable )
	{
		// System.out.println( "Thread pool:" + name + " - sem = " + thread_sem.getValue() + ", queue = " + task_queue.size());
		
			// not queueing, grab synchronous sem here
		
		if ( !queue_when_full ){
		
			if ( !thread_sem.reserveIfAvailable()){
			
					// defend against recursive entry when in queuing mode (yes, it happens)
				
				threadPoolWorker	recursive_worker = (threadPoolWorker)tls.get();
				
				if ( recursive_worker == null || recursive_worker.getOwner() != this ){
	
						// do a blocking reserve here, not recursive 
					
					thread_sem.reserve();
	
				}else{
						// run immediately
							
					if ( runnable instanceof ThreadPoolTask ){
						
						ThreadPoolTask task = (ThreadPoolTask)runnable;
						                        
						task.worker = recursive_worker;
						
						try{
							task.taskStarted();
							
							task.run();
							
						}finally{
							
							task.taskCompleted();  
						}
					}else{
					
						runnable.runSupport();
					}
					
					return( recursive_worker );
				}
			}
		}
						
		threadPoolWorker allocated_worker;
						
		synchronized( this ){
		
				// reserve if available is non-blocking
			
			if ( queue_when_full && !thread_sem.reserveIfAvailable()){
			
				allocated_worker	= null;
			
				task_queue.add( runnable );
				
			}else{
				
				if ( thread_pool.isEmpty()){
							
					allocated_worker = new threadPoolWorker( this );	
		
				}else{
									
					allocated_worker = (threadPoolWorker)thread_pool.pop();
				}
				
				if ( runnable instanceof ThreadPoolTask ){
					
					((ThreadPoolTask)runnable).worker = allocated_worker;
				}
				
				allocated_worker.run( runnable );
			}
		}
		
		return( queue_when_full?null:allocated_worker );
	}
	
	public AERunnable[]
	getQueuedTasks()
	{
		synchronized( this ){

			AERunnable[]	res = new AERunnable[task_queue.size()];
			
			task_queue.toArray(res);
			
			return( res );
		}
	}
	
	public boolean
	isQueued(
		AERunnable	task )
	{
		synchronized( this ){

			return( task_queue.contains( task ));
		}
	}
	
	public AERunnable[]
	getRunningTasks()
	{
		List	runnables	= new ArrayList();
		
		synchronized( this ){

			Iterator	it = busy.iterator();
			
			while( it.hasNext()){
				
				threadPoolWorker	worker = (threadPoolWorker)it.next();
				
				AERunnable	runnable = worker.getRunnable();
				
				if ( runnable != null ){
					
					runnables.add( runnable );
				}
			}
		}
		
		AERunnable[]	res = new AERunnable[runnables.size()];
			
		runnables.toArray(res);
			
		return( res );
	}
	
	protected void
	checkTimeouts()
	{
		synchronized( ThreadPool.this ){
		
			long	now = SystemTime.getCurrentTime();
			
			for (int i=0;i<busy.size();i++){
					
				threadPoolWorker	x = (threadPoolWorker)busy.get(i);
			
				long	elapsed = now - x.run_start_time ;
					
				if ( elapsed > ( WARN_TIME * (x.warn_count+1))){
		
					x.warn_count++;
					
					if ( LOG_WARNINGS ){
						
						DebugLight.out( x.getWorkerName() + ": running, elapsed = " + elapsed + ", state = " + x.state );
					}
					
					if ( execution_limit > 0 && elapsed > execution_limit ){
						
						if ( LOG_WARNINGS ){
							
							DebugLight.out( x.getWorkerName() + ": interrupting" );
						}
						
						AERunnable r = x.runnable;

						try{
							if ( r instanceof ThreadPoolTask ){
								
								((ThreadPoolTask)r).interruptTask();
								
							}else{
								
								x.worker_thread.interrupt();
							}
						}catch( Throwable e ){
							
							DebugLight.printStackTrace( e );
						}
					}
				}
			}
		}
	}
	
	public class
	threadPoolWorker
	{
		private ThreadPool	owner;
		private String	worker_name;
		
		private Thread	worker_thread;
		
		private AESemaphore my_sem = new AESemaphore("TPWorker");
		
		private AERunnable	runnable;
		private long		run_start_time;
		private int			warn_count;
		
		private String	state	= "<none>";
		
		protected
		threadPoolWorker(
			ThreadPool	_owner )
		{
			owner	= _owner;
			
			worker_name = name + "[" + (thread_name_index++) +  "]";
			
			worker_thread = new AEThread( worker_name )
				{
					public void 
					runSupport()
					{
						tls.set( threadPoolWorker.this );
						
						boolean	time_to_die = false;
			
outer:
						while(true){
							
							try{
								while( !my_sem.reserve(IDLE_LINGER_TIME)){
																		
									synchronized( ThreadPool.this ){
										
										if ( runnable == null ){
											
											time_to_die	= true;
											
											thread_pool.remove( threadPoolWorker.this );
																						
											break outer;
										}
									}
								}
								
								while( runnable != null ){
									
									try{
										
										synchronized( ThreadPool.this ){
												
											run_start_time	= SystemTime.getCurrentTime();
											warn_count		= 0;
											
											busy.add( threadPoolWorker.this );
											
											if ( busy.size() == 1 ){
												
												synchronized( busy_pools ){
													
													busy_pools.add( ThreadPool.this );
													
													if  ( !busy_pool_timer_set ){
														
															// we have to defer this action rather
															// than running as a static initialiser
															// due to the dependency between
															// ThreadPool, Timer and ThreadPool again
														
														busy_pool_timer_set	= true;
														
														SimpleTimer.addPeriodicEvent(
																WARN_TIME,
																new TimerEventPerformer()
																{
																	public void
																	perform(
																		TimerEvent	event )
																	{
																		checkAllTimeouts();
																	}
																});
													}
												}
											}
										}
										
										if ( runnable instanceof ThreadPoolTask ){
										
											ThreadPoolTask	tpt = (ThreadPoolTask)runnable;
											
											try{
												tpt.taskStarted();
												
												runnable.run();
												
											}finally{
												
												tpt.taskCompleted();
											}
										}else{
											
											runnable.run();
										}
										
									}catch( Throwable e ){
										
										DebugLight.printStackTrace( e );		
	
									}finally{
																					
										synchronized( ThreadPool.this ){
												
											long	elapsed = SystemTime.getCurrentTime() - run_start_time;
											
											if ( elapsed > WARN_TIME && LOG_WARNINGS ){
												
												DebugLight.out( getWorkerName() + ": terminated, elapsed = " + elapsed + ", state = " + state );
											}
											
											busy.remove( threadPoolWorker.this );
											
											if ( busy.size() == 0 ){
												
												synchronized( busy_pools ){
												
													busy_pools.remove( ThreadPool.this );
												}
											}
										
											if ( task_queue.size() > 0 ){
												
												runnable = (AERunnable)task_queue.remove(0);
												
											}else{
											
												runnable	= null;
											}
										}
									}
								}
							}catch( Throwable e ){
									
								DebugLight.printStackTrace( e );
											
							}finally{
										
								if ( !time_to_die ){
									
									synchronized( ThreadPool.this ){
															
										thread_pool.push( threadPoolWorker.this );
									}
								
									thread_sem.release();
								}
							}
						}
					}
				};
				
			worker_thread.setDaemon(true);
			
			worker_thread.start();
		}
		
		public void
		setState(
			String	_state )
		{
			//System.out.println( "state = " + _state );
			
			state	= _state;
		}
		
		public String
		getState()
		{
			return( state );
		}
		
		protected String
		getWorkerName()
		{
			return( worker_name );
		}
		
		protected ThreadPool
		getOwner()
		{
			return( owner );
		}
		
		protected void
		run(
			AERunnable	_runnable )
		{
			runnable	= _runnable;
			
			my_sem.release();
		}
		
		protected AERunnable
		getRunnable()
		{
			return( runnable );
		}
	}
	
	public String
	getName()
	{
		return( name );
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -