⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagedispatcher.java

📁 java语言开发的P2P流媒体系统
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* Stream-2-Stream - Peer to peer television and radio
 * October 13, 2005 - This file has been modified from the original P2P-Radio source
 * Project homepage: http://s2s.sourceforge.net/
 * Copyright (C) 2005-2006 Jason Hooks
 */

/* 
 * P2P-Radio - Peer to peer streaming system
 * Project homepage: http://p2p-radio.sourceforge.net/
 * Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
 * 
 * ---------------------------------------------------------------------------
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 * ---------------------------------------------------------------------------
 */

package stream2stream.network;

import p2pradio.logging.Logger;
import p2pradio.packets.*;
import p2pradio.monitor.*;
import p2pradio.*;
import stream2stream.XML.SettingsXML;

import java.net.*;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.io.*;


/**
 * Processes all packets that it receives listening to a
 * specified UDP socket.
 *
 * @author Michael Kaufmann
 */
public class MessageDispatcher extends Thread
{
	private Peer peer;
	private SettingsXML xml;
	private UniversalSocket socket;
	private boolean shutdown;
	public static HashSet acceptedPeers;
	
	private boolean isServer, streamDispatch, udp, usesVerifiedBandwidth, serverUsesVerifiedBandwidth;
	private StreamDispatcherForChildren streamDispatcher;
	private Listener listener;
	private InetSocketAddress requester;
	public static final int octetsChecked = 3;
	public static final String c = "MessageDispatcher";
	/**
	 * Creates a new UDP Dispatcher.
	 *
	 * @param peer The peer that this dispatcher belongs to
	 * @param udpSocket The socket that this dispatcher will listen to
	 */
	public MessageDispatcher(Listener listener, UniversalSocket socket, boolean streamDispatch)
	{
		super(Messages.getString(c + ".THREAD_NAME")); //$NON-NLS-1$
		setDaemon(true);
		this.streamDispatch = streamDispatch;
		this.listener = listener;
		this.peer = listener.getPeer();
		xml = peer.getXML();
		isServer = peer.isServer();
		
		this.socket = socket;
		shutdown = false;
		if (acceptedPeers == null)
			acceptedPeers = new HashSet();
		udp = socket.getMode() == SettingsXML.UDP;
	}
	
	public void run()
	{
		Logger.finer(c, c + ".THREAD_RUNNING"); //$NON-NLS-1$ //$NON-NLS-2$
		
			
		
		try
		{
			byte[] buffer = new byte[PacketFactory.MESSAGE_PACKET_MAX_SIZE];
			Packet inPacket = null;
			Packet outPacket = null;
			
			// Endlosschleife  -- Continuous Loop
			while (!shutdown)
			{
				if (shutdown)
					return;
				
					
				// Auf ein Paket warten
				try
				{
					inPacket = socket.receive();
				}
				catch(Exception e)
				{
					shutdown();	
					Logger.fine(c, c + ".LISTENER_DISCONNECTED", requester);
					return;
				}
				
				if (shutdown)
					return;
				// debugMessage("L鋘ge der Paketdaten: " + inPacket.getLength());	
				if (requester == null)
				{
					requester = (InetSocketAddress) socket.getRemoteSocketAddress();
				    if (requester.equals(peer.getSocketAddress()))
				    {
				    	// Da will uns jemand reinlegen und hat ein Paket
				    	// mit gef鋖schter Absenderadresse geschickt
				    	Logger.fine(c, c + ".PACKET_WITH_FAKED_SENDER"); //$NON-NLS-1$ //$NON-NLS-2$
				    	return;
				    }
				}
				
				// System.out.println(actionPacket);	
				if(streamDispatch && (inPacket instanceof LeavePacket || inPacket instanceof IdentificationPacket))
				{
					//UniversalSocket socket = new UniversalSocket(this.socket.getMode());
					//socket.connect(requester);
					//socket.setBuffer(new byte[PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
					streamDispatcher = new StreamDispatcherForChildren(listener, socket, inPacket);
					streamDispatcher.start();
					return;
				}
				
				synchronized (peer)
				{
					if ((inPacket instanceof ConnectPacket) && isServer)
					{
						//Prepare the data
							
						ConnectPacket connect = (ConnectPacket) inPacket;
						byte[] sourceAddress = requester.getAddress().getAddress();
						byte[] sourceOctets = new byte[octetsChecked];
						System.arraycopy(sourceAddress, 0, sourceOctets, 0, octetsChecked);
						byte[] bandwidthAddress = connect.getIP();
						byte[] bandwidthOctets = new byte[octetsChecked];
						System.arraycopy(bandwidthAddress, 0, bandwidthOctets, 0, octetsChecked);
						DatagramPacket udpPacket = null;
						double actualDownload;
						double maxDownload = connect.getVerifiedDownloadMax();
						double actualUpload;
						double maxUpload = connect.getVerifiedUploadMax();
						double currentByterate = peer.getUI().getMetadata().getAverageByterate() / 1024;
						int childrenMustServe = peer.getXML().getMinimumChildrenClientsMustServe();
						double uploadRequired = currentByterate * childrenMustServe;
						serverUsesVerifiedBandwidth = peer.getVerifiedBandwidth();
						InetAddress inet = 	InetAddress.getLocalHost();
						if (serverUsesVerifiedBandwidth)
						{
							usesVerifiedBandwidth = !(requester.getAddress().isLoopbackAddress() || inet.equals(requester.getAddress()) || InetAddress.getByAddress(bandwidthAddress).equals(InetAddress.getByName(xml.getIP())));
						}
						if (usesVerifiedBandwidth)
						{
							actualUpload = connect.getVerifiedUploadLimit();
							actualDownload = connect.getVerifiedDownloadLimit();
						}
						else 
						{
							actualUpload = connect.getUnverifiedUpload();
							actualDownload = connect.getUnverifiedDownload();
							int nActualDownload = (int) actualDownload;
							int nActualUpload = (int) actualUpload;
							if (nActualDownload < 0)
								actualDownload = -nActualDownload * currentByterate;
							if (nActualUpload < 0)
								actualUpload = -nActualUpload * currentByterate;
						}
							
						if (shutdown)
							return;
						
						
						//Check the data
						if(connect.getNetworkVersion() < Radio.networkVersion)
						{
							outPacket = new DenyPacket(DenyPacket.OLD_VERSION);
						}
						else if (usesVerifiedBandwidth && !Arrays.equals(sourceOctets, bandwidthOctets) )
						{
							outPacket = new DenyPacket(DenyPacket.IP_INCORRECT);
						}
						else if(usesVerifiedBandwidth && (actualDownload > maxDownload || actualDownload < 0))
						{
							outPacket = new DenyPacket(DenyPacket.VERIFIED_DOWNLOAD_BAD);
						}
						else if(usesVerifiedBandwidth && (actualUpload > maxUpload || actualUpload < 0))
						{
							outPacket = new DenyPacket(DenyPacket.VERIFIED_UPLOAD_BAD);
						}
						else if(actualDownload < currentByterate)
						{
							outPacket = new DenyPacket(DenyPacket.NOT_ENOUGH_DOWNLOAD);
						}
						else if(actualUpload < uploadRequired)
						{
							outPacket = new DenyPacket(DenyPacket.NOT_ENOUGH_UPLOAD);
						}
						
					
						//Deny or accept the peer
						if (outPacket == null)
						{
							acceptedPeers.add(requester);
							
							
							outPacket = new AcceptPacket(peer.getStreamMode(), usesVerifiedBandwidth, peer.getVerifiedBandwidth(), peer.getMulticastAddress(), peer.getMulticastTTL(), System.currentTimeMillis() - peer.getStartTime(), peer.getUI().getMetadata());
						}
						
						if (shutdown)
							return;
						try
						{
							socket.send(outPacket);
						}
						catch (Exception e)
						{
							if (shutdown)
								return;
							Logger.fine(c, c + ".ERROR_SENDING_TIME", requester);
						}
						if (shutdown)
							return;
					}
					else if (inPacket instanceof PingPacket)
					{
						// Ein anderer Peer will ein "Pong" haben
				
						int senderPort = ((PingPacket)inPacket).getSenderPort();
						RemotePeer sender;
					
						if (senderPort == 0)
						{
							// Der Sender kennt seinen Absenderport gar nicht
							sender = new RemotePeer(requester);
						}
						else
						{
							// Der Sender hat seinen Absenderport angegeben
							sender = composeAddressAndPort(requester.getAddress(), senderPort);	
						}
						
						try
						{
							Packet pongPacket = new PongPacket();
							socket.send(pongPacket);
						}
						catch (Exception e)
						{
							// Die Pong-Nachricht konnte nicht gesendet werden
							// Das ist nicht weiter schlimm
							if (shutdown)
								return;
						}
						if (shutdown)
							return;
					}
					else if (!isServer || peerIsAccepted(requester))
						{
							if (inPacket instanceof JoinPacket)
							{
								// Der Peer will aufgenommen werden
								
								RemotePeer newPeer = composeAddressAndPort(requester.getAddress(), ((JoinPacket)inPacket).getSenderPort());
						
								// Ist dieser Peer getrennt? Dann keine
								// Kinder mehr aufnehmen
								if (!peer.isDisconnected())
								{
									// Wurde der Peer schon aufgenommen?
									// (Die Best鋞igungsnachricht k鰊nte verloren gegangen sein)				
									if (peer.isChild(newPeer))
									{
										// OK-Nachricht nochmals schicken
										try
										{
											Packet okPacket = createOKPacket();
											socket.send(okPacket);
										}
										catch (Exception e)
										{
											if (shutdown)
												return;
											// Die OK-Nachricht konnte nicht gesendet werden
											// Peer wieder entfernen
											Logger.fine(c, c + ".ERROR_SENDING_OK_MESSAGE", newPeer); //$NON-NLS-1$ //$NON-NLS-2$
											peer.removeChild(newPeer);
										}
										if(shutdown)
											return;
									}
									else
									{
										// Soll sich dieser Peer fies verhalten?
										
										if (peer.getMisbehavior(Commands.NO_NEW_CHILDREN))
										{
											// Fies sein und keine Antwort geben
										}
										else if (peer.getMisbehavior(Commands.REDIRECT_ALL_CHILDREN_TO_SUPPLIER))
										{
											if (peer.getSupplier() != null)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -