⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 diskaccesscontrollerinstance.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
					
					request_sem.release();
					
					if ( thread == null ){
						
						thread = 
							new AEThread("DiskAccessController:requestDispatcher[" + index + "]", true )
							{
								public void
								runSupport()
								{
									tls.set( this );
									
									while( true ){
										
										if ( request_sem.reserve( 30000 )){
											
											DiskAccessRequestImpl	request;
											
											List	aggregated = null;
											
											synchronized( requests ){
	
												request = (DiskAccessRequestImpl)requests.remove(0);
												
												if ( enable_aggregation && request.getPriority() < 0 ){
													
													CacheFile	file = request.getFile();
													
													Map	file_map = (Map)request_map.get( file );
														
													file_map.remove( new Long( request.getOffset()));
													
													if ( !request.isCancelled()){
														
														DiskAccessRequestImpl	current = request;
															
														long	aggregated_bytes = 0;
														
														try{
															while( true ){
																
																int	current_size = current.getSize();
																
																long	end = current.getOffset() + current_size;
																				
																	// doesn't matter if we remove from this and don't end up using it
																
																DiskAccessRequestImpl next = (DiskAccessRequestImpl)file_map.remove( new Long( end ));
																
																if ( 	next == null || next.isCancelled() ||
																		!next.canBeAggregatedWith( request )){
																	
																	break;
																}
																
																requests.remove( next );
																
																if ( !request_sem.reserve( 30000 )){
																	
																		// semaphore should already be > 0 as we've removed an element...
																	
																	Debug.out( "shouldn't happen" );
																}
																
																if ( aggregated == null ){
																	
																	aggregated = new ArrayList(8);
																	
																	aggregated.add( current );
																	
																	aggregated_bytes += current_size;
																}
																
																aggregated.add( next );
																	
																aggregated_bytes += next.getSize();
																
																if ( aggregated.size() > aggregation_request_limit || aggregated_bytes >= aggregation_byte_limit ){
																	
																	break;
																}
																
																current = next;
															}
														}finally{
															
									
															if ( aggregated != null ){
																
																total_aggregated_requests_made++;
																
																/*
																System.out.println( 
																		"aggregated read: requests=" + aggregated.size() + 
																		", size=" + aggregated_bytes + 
																		", a_reqs=" + requests.size() + 
																		", f_reqs=" + file_map.size());
																*/

															}else{
																
																total_single_requests_made++;
															}
														}
													}
												}
											}
											
											try{
												
												long	io_start = SystemTime.getHighPrecisionCounter();
												
												if ( aggregated == null ){
													
													try{
														request.runRequest();
														
													}finally{
														
														long	io_end = SystemTime.getHighPrecisionCounter();

														io_time += ( io_end - io_start );

														total_single_bytes += request.getSize();
														
														releaseSpaceAllowance( request );
													}		
												}else{
													
													DiskAccessRequestImpl[]	requests = (DiskAccessRequestImpl[])aggregated.toArray( new DiskAccessRequestImpl[ aggregated.size()]);
													
													try{
														
														DiskAccessRequestImpl.runAggregated( request, requests );
														
													}finally{
														
														long	io_end = SystemTime.getHighPrecisionCounter();

														io_time += ( io_end - io_start );
														
														for (int i=0;i<requests.length;i++){
															
															DiskAccessRequestImpl	r = requests[i];
															
															total_aggregated_bytes += r.getSize();
															
															releaseSpaceAllowance( r );
														}
													}		
												}
											}catch( Throwable e ){
												
												Debug.printStackTrace(e);	
											}
										}else{
											
											synchronized( requests ){
	
												if ( requests.size() == 0 ){
													
													thread	= null;
																																					
													break;
												}
											}
										}
									}
								}
							};
							
						thread.start();
					}
				}
			}
		}
	
		protected long
		getLastRequestTime()
		{
			return( last_request_time );
		}
		
		protected void
		setLastRequestTime(
			long	l )
		{
			last_request_time	= l;
		}
		
		protected int
		size()
		{
			return( requests.size());
		}
	}
	
	protected static class
	groupSemaphore
	{
		private int value;
		
		private List	waiters = new LinkedList();
		
		private long	blocks;
		
		protected
		groupSemaphore(
			int	_value )
		{
			value	= _value;
		}
		
		protected long
		getBlockCount()
		{
			return( blocks );
		}
		
		protected void
		reserveGroup(
			int	num )
		{
			mutableInteger	wait;
			
			synchronized( this ){
				
					// for fairness we only return immediately if we can and there are no waiters
				
				if ( num <= value && waiters.size() == 0 ){
					
					value -= num;
					
					return;
					
				}else{
					
					blocks++;
					
					wait = new mutableInteger( num - value );
										
					value	= 0;
					
					waiters.add( wait );
				}
			}
			
			wait.reserve();
		}
			
		protected void
		releaseGroup(
			int	num )
		{
			synchronized( this ){

				if ( waiters.size() == 0 ){
					
						// no waiters we just increment the value
					
					value += num;
					
				}else{
					
						// otherwise we share num out amongst the waiters in order
					
					while( waiters.size() > 0 ){
						
						mutableInteger wait	= (mutableInteger)waiters.get(0);
						
						int	wait_num = wait.getValue();
						
						if ( wait_num <= num ){
							
								// we've got enough now to release this waiter
							
							wait.release();
							
							waiters.remove(0);
							
							num -= wait_num;
							
						}else{
							
							wait.setValue( wait_num - num );
							
							num	= 0;
							
							break;
						}
					}
					
						// if we have any left over then save it
					
					value = num;
				}
			}
		}
		
		protected static class
		mutableInteger
		{
			private int		i;
			private boolean	released;
			
			protected
			mutableInteger(
				int	_i )
			{
				i	= _i;
			}
			
			protected int
			getValue()
			{
				return( i );
			}
			
			protected void
			setValue(
				int	_i )
			{
				i	= _i;
			}
			
			protected void
			release()
			{
				synchronized( this ){
					
					released	= true;
					
					notify();
				}
			}
			
			protected void
			reserve()
			{
				synchronized( this ){
					
					if ( released ){
						
						return;
					}
					
					try{
						int	spurious_count = 0;
						
						while( true ){
							
							wait();
						
							if ( released ){
								
								break;
								
							}else{
								
								spurious_count++;

								if ( spurious_count > 1024 ){
								
									Debug.out( "DAC::mutableInteger: spurious wakeup limit exceeded" );
									
									throw( new RuntimeException( "die die die" ));
									
								}else{
								
									Debug.out("DAC::mutableInteger: spurious wakeup, ignoring" );
								}	
							}
						}
						
					}catch( InterruptedException e ){
						
						throw( new RuntimeException("Semaphore: operation interrupted" ));
					}
				}
			}
		}
	}
	
	public static void
	main(
		String[]	args )
	{
		final groupSemaphore	sem = new groupSemaphore( 9 );
		
		for (int i=0;i<10;i++){
			
			new Thread()
			{
				public void
				run()
				{
					int	count = 0;
					
					while( true ){
						
						int	group =RandomUtils.generateRandomIntUpto( 10 );
						
						System.out.println( Thread.currentThread().getName() + " reserving " + group );
						
						sem.reserveGroup( group );
						
						try{
							Thread.sleep(5 + RandomUtils.generateRandomIntUpto(5));
							
						}catch( Throwable e ){
						}
												
						sem.releaseGroup( group );
					
						count++;
						
						if ( count %100 == 0 ){
							
							System.out.println( Thread.currentThread().getName() + ": " + count + " ops" );
						}
					}
				}
			}.start();
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -