⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prudppackethandlerimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
				
				SHA1Hasher hasher = new SHA1Hasher();

				String	user_name 	= auth.getUserName();
				String	password	= new String(auth.getPassword());
				
				byte[]	sha1_password;
				
				if ( user_name.equals( "<internal>")){
					
					sha1_password = Base64.decode(password);

				}else{
					
					sha1_password = hasher.calculateHash(password.getBytes());			
				}
				
				byte[]	user_bytes = new byte[8];
				
				Arrays.fill( user_bytes, (byte)0);
				
				for (int i=0;i<user_bytes.length&&i<user_name.length();i++){
					
					user_bytes[i] = (byte)user_name.charAt(i);
				}
				
				hasher = new SHA1Hasher();
				
				hasher.update( buffer );
				hasher.update( user_bytes );
				hasher.update( sha1_password );
				
				byte[]	overall_hash = hasher.getDigest();
				
				//System.out.println("PRUDPHandler - auth = " + auth.getUserName() + "/" + new String(auth.getPassword()));
								
				baos.write( user_bytes );
				baos.write( overall_hash, 0, 8 );
				
				buffer = baos.toByteArray();
			}
			
			DatagramPacket dg_packet = new DatagramPacket(buffer, buffer.length, destination_address );
			
			PRUDPPacketHandlerRequestImpl	request = new PRUDPPacketHandlerRequestImpl( receiver, timeout );
		
			try{
				requests_mon.enter();
					
				requests.put( new Integer( request_packet.getTransactionId()), request );
				
			}finally{
				
				requests_mon.exit();
			}
			
			try{
				// System.out.println( "Outgoing to " + dg_packet.getAddress());

				if ( send_delay > 0 && priority != PRUDPPacketHandler.PRIORITY_IMMEDIATE ){
									
					try{
						send_queue_mon.enter();
						
						if ( send_queue_data_size > MAX_SEND_QUEUE_DATA_SIZE ){
														
							request.sent();
							
								// synchronous write holding lock to block senders
							
							socket.send( dg_packet );
							
							stats.packetSent( buffer.length );
							
							if ( TRACE_REQUESTS ){
								Logger.log(new LogEvent(LOGID,
										"PRUDPPacketHandler: request packet sent to "
												+ destination_address + ": "
												+ request_packet.getString()));
							}
								
							Thread.sleep( send_delay );
							
						}else{
							
							send_queue_data_size	+= dg_packet.getLength();
								
							send_queues[priority].add( new Object[]{ dg_packet, request });
							
							if ( TRACE_REQUESTS ){
								
								String	str = "";
								
								for (int i=0;i<send_queues.length;i++){
									str += (i==0?"":",") + send_queues[i].size();
								}
								System.out.println( "send queue sizes: " + str );
							}
							
							send_queue_sem.release();
					
							if ( send_thread == null ){
								
								send_thread = 
									new AEThread( "PRUDPPacketHandler:sender" )
									{
										public void
										runSupport()
										{
											int[]		consecutive_sends = new int[send_queues.length];
											
											while( true ){
												
												try{
													send_queue_sem.reserve();
													
													Object[]	data;
													int			selected_priority	= 0;
													
													try{
														send_queue_mon.enter();
													
															// invariant: at least one queue must have an entry
														
														for (int i=0;i<send_queues.length;i++){
															
															List	queue = send_queues[i];
															
															int	queue_size = queue.size();
															
															if ( queue_size > 0 ){
																
																selected_priority	= i;
																
																if ( 	consecutive_sends[i] >= 4 ||
																		(	i < send_queues.length - 1 &&
																			send_queues[i+1].size() - queue_size > 500 )){	
																	
																		// too many consecutive or too imbalanced, see if there are
																		// lower priority queues with entries
																	
																	consecutive_sends[i]	= 0;
																	
																}else{
																	
																	consecutive_sends[i]++;
																	
																	break;
																}
															}else{
																
																consecutive_sends[i]	= 0;
															}
														}
														
														data = (Object[])send_queues[selected_priority].remove(0);
														
													}finally{
														
														send_queue_mon.exit();
													}
																									
													DatagramPacket				p	= (DatagramPacket)data[0];
													PRUDPPacketHandlerRequestImpl	r	= (PRUDPPacketHandlerRequestImpl)data[1];

														// mark as sent before sending in case send fails
														// and we then rely on timeout to pick this up
													
													send_queue_data_size	-= p.getLength();
													
													r.sent();
													
													socket.send( p );
													
													stats.packetSent( p.getLength() );
							
													if ( TRACE_REQUESTS ){
														Logger.log(new LogEvent(LOGID,
															"PRUDPPacketHandler: request packet sent to "
																	+ p.getAddress()));											
													}														
												
													long	delay = send_delay;
													
													if ( selected_priority == PRIORITY_HIGH ){
														
														delay	= delay/2;
													}
													
													Thread.sleep( delay );
													
												}catch( Throwable e ){
													// get occasional send fails, not very interesting
													Logger.log(
														new LogEvent(
															LOGID, 
															LogEvent.LT_WARNING,
															"PRUDPPacketHandler: send failed: " + Debug.getNestedExceptionMessage(e)));
												}
											}
										}
									};
								
									send_thread.setDaemon( true );
								
									send_thread.start();
							}
						}
					}finally{
						
						send_queue_mon.exit();
					}
				}else{
					
					request.sent();
					
					socket.send( dg_packet );
					
					// System.out.println( "sent:" + buffer.length );
					
					stats.packetSent( buffer.length );
					
					if ( TRACE_REQUESTS ){
						Logger.log(new LogEvent(LOGID, "PRUDPPacketHandler: "
								+ "request packet sent to " + destination_address + ": "
								+ request_packet.getString()));
					}
				}
					// if the send is ok then the request will be removed from the queue
					// either when a reply comes back or when it gets timed-out
				
				return( request );
				
			}catch( Throwable e ){
				
					// never got sent, remove it immediately
				
				try{
					requests_mon.enter();
					
					requests.remove( new Integer( request_packet.getTransactionId()));
					
				}finally{
					
					requests_mon.exit();
				}
				
				throw( e );
			}
		}catch( PRUDPPacketHandlerException e ){
			
			throw( e );
			
		}catch( Throwable e ){
			
			Logger.log(new LogEvent(LOGID,LogEvent.LT_ERROR,
					"PRUDPPacketHandler: sendAndReceive to " + destination_address + " failed: " + Debug.getNestedExceptionMessage(e))); 
			
			throw( new PRUDPPacketHandlerException( "PRUDPPacketHandler:sendAndReceive failed", e ));
		}
	}
	
	public void
	send(
		PRUDPPacket				request_packet,
		InetSocketAddress		destination_address )
	
		throws PRUDPPacketHandlerException
	{
		if ( socket == null ){
			
			if ( init_error != null ){
				
				throw( new PRUDPPacketHandlerException( "Transport unavailable", init_error ));
			}
			
			throw( new PRUDPPacketHandlerException( "Transport unavailable" ));
		}
		
		try{
			checkTargetAddress( destination_address );
			
			ByteArrayOutputStream	baos = new ByteArrayOutputStream();
			
			DataOutputStream os = new DataOutputStream( baos );
					
			request_packet.serialise(os);
			
			byte[]	buffer = baos.toByteArray();
			
			request_packet.setSerialisedSize( buffer.length );

			DatagramPacket dg_packet = new DatagramPacket(buffer, buffer.length, destination_address );
			
			// System.out.println( "Outgoing to " + dg_packet.getAddress());	
			
			if ( TRACE_REQUESTS ){
				Logger.log(new LogEvent(LOGID,
						"PRUDPPacketHandler: reply packet sent: "
								+ request_packet.getString()));
			}
			
			socket.send( dg_packet );
			
			stats.packetSent( buffer.length );
			
				// this is a reply to a request, no time delays considered here 
			
		}catch( PRUDPPacketHandlerException e ){
			
			throw( e );
			
		}catch( Throwable e ){
			
			e.printStackTrace();
			
			Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR, "PRUDPPacketHandler: send to " + destination_address + " failed: " + Debug.getNestedExceptionMessage(e)));
			
			throw( new PRUDPPacketHandlerException( "PRUDPPacketHandler:send failed", e ));
		}
	}
	
	protected void
	checkTargetAddress(
		InetSocketAddress	address )
	
		throws PRUDPPacketHandlerException
	{
		if ( address.getPort() == 0 ){
			
			throw( new PRUDPPacketHandlerException( "Invalid port - 0" ));
		}
	}
	
	public void
	setDelays(
		int		_send_delay,
		int		_receive_delay,
		int		_queued_request_timeout )
	{
		send_delay				= _send_delay;
		receive_delay			= _receive_delay;
		
			// trim a bit off this limit to include processing time
		
		queued_request_timeout	= _queued_request_timeout-5000;
		
		if ( queued_request_timeout < 5000 ){
			
			queued_request_timeout = 5000;
		}
	}
	
	public long
	getSendQueueLength()
	{
		int	res = 0;
		for (int i=0;i<send_queues.length;i++){
			res += send_queues[i].size();
		}
		return(res);
	}
	
	public long
	getReceiveQueueLength()
	{
		return( recv_queue.size());
	}
	
	public void
	primordialSend(
		byte[]				buffer,
		InetSocketAddress	target )
	
		throws PRUDPPacketHandlerException
	{
		try{
			checkTargetAddress( target );
			
			DatagramPacket dg_packet = new DatagramPacket(buffer, buffer.length, target );
			
			// System.out.println( "Outgoing to " + dg_packet.getAddress());	
			
			if ( TRACE_REQUESTS ){
				Logger.log(new LogEvent(LOGID,
						"PRUDPPacketHandler: reply packet sent: " + buffer.length + " to " + target ));
			}
			
			socket.send( dg_packet );
			
			stats.primordialPacketSent( buffer.length );
			
		}catch( Throwable e ){
			
			throw( new PRUDPPacketHandlerException( e.getMessage()));
		}
	}
	
	public PRUDPPacketHandlerStats
	getStats()
	{
		return( stats );
	}
	
	protected void
	destroy()
	{
		destroyed	= true;
		
		destroy_sem.reserve();
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -