📄 messagedispatcher.java
字号:
/* Stream-2-Stream - Peer to peer television and radio
* October 13, 2005 - This file has been modified from the original P2P-Radio source
* Project homepage: http://s2s.sourceforge.net/
* Copyright (C) 2005-2006 Jason Hooks
*/
/*
* P2P-Radio - Peer to peer streaming system
* Project homepage: http://p2p-radio.sourceforge.net/
* Copyright (C) 2003-2004 Michael Kaufmann <hallo@michael-kaufmann.ch>
*
* ---------------------------------------------------------------------------
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
* ---------------------------------------------------------------------------
*/
package stream2stream.network;
import p2pradio.logging.Logger;
import p2pradio.packets.*;
import p2pradio.monitor.*;
import p2pradio.*;
import stream2stream.XML.SettingsXML;
import java.net.*;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.io.*;
/**
* Processes all packets that it receives listening to a
* specified UDP socket.
*
* @author Michael Kaufmann
*/
public class MessageDispatcher extends Thread
{
private Peer peer;
private SettingsXML xml;
private UniversalSocket socket;
private boolean shutdown;
public static HashSet acceptedPeers;
private boolean isServer, streamDispatch, udp, usesVerifiedBandwidth, serverUsesVerifiedBandwidth;
private StreamDispatcherForChildren streamDispatcher;
private Listener listener;
private InetSocketAddress requester;
public static final int octetsChecked = 3;
public static final String c = "MessageDispatcher";
/**
* Creates a new UDP Dispatcher.
*
* @param peer The peer that this dispatcher belongs to
* @param udpSocket The socket that this dispatcher will listen to
*/
public MessageDispatcher(Listener listener, UniversalSocket socket, boolean streamDispatch)
{
super(Messages.getString(c + ".THREAD_NAME")); //$NON-NLS-1$
setDaemon(true);
this.streamDispatch = streamDispatch;
this.listener = listener;
this.peer = listener.getPeer();
xml = peer.getXML();
isServer = peer.isServer();
this.socket = socket;
shutdown = false;
if (acceptedPeers == null)
acceptedPeers = new HashSet();
udp = socket.getMode() == SettingsXML.UDP;
}
public void run()
{
Logger.finer(c, c + ".THREAD_RUNNING"); //$NON-NLS-1$ //$NON-NLS-2$
try
{
byte[] buffer = new byte[PacketFactory.MESSAGE_PACKET_MAX_SIZE];
Packet inPacket = null;
Packet outPacket = null;
// Endlosschleife -- Continuous Loop
while (!shutdown)
{
if (shutdown)
return;
// Auf ein Paket warten
try
{
inPacket = socket.receive();
}
catch(Exception e)
{
shutdown();
Logger.fine(c, c + ".LISTENER_DISCONNECTED", requester);
return;
}
if (shutdown)
return;
// debugMessage("L鋘ge der Paketdaten: " + inPacket.getLength());
if (requester == null)
{
requester = (InetSocketAddress) socket.getRemoteSocketAddress();
if (requester.equals(peer.getSocketAddress()))
{
// Da will uns jemand reinlegen und hat ein Paket
// mit gef鋖schter Absenderadresse geschickt
Logger.fine(c, c + ".PACKET_WITH_FAKED_SENDER"); //$NON-NLS-1$ //$NON-NLS-2$
return;
}
}
// System.out.println(actionPacket);
if(streamDispatch && (inPacket instanceof LeavePacket || inPacket instanceof IdentificationPacket))
{
//UniversalSocket socket = new UniversalSocket(this.socket.getMode());
//socket.connect(requester);
//socket.setBuffer(new byte[PacketFactory.UDP_MAX_DATAGRAM_SIZE]);
streamDispatcher = new StreamDispatcherForChildren(listener, socket, inPacket);
streamDispatcher.start();
return;
}
synchronized (peer)
{
if ((inPacket instanceof ConnectPacket) && isServer)
{
//Prepare the data
ConnectPacket connect = (ConnectPacket) inPacket;
byte[] sourceAddress = requester.getAddress().getAddress();
byte[] sourceOctets = new byte[octetsChecked];
System.arraycopy(sourceAddress, 0, sourceOctets, 0, octetsChecked);
byte[] bandwidthAddress = connect.getIP();
byte[] bandwidthOctets = new byte[octetsChecked];
System.arraycopy(bandwidthAddress, 0, bandwidthOctets, 0, octetsChecked);
DatagramPacket udpPacket = null;
double actualDownload;
double maxDownload = connect.getVerifiedDownloadMax();
double actualUpload;
double maxUpload = connect.getVerifiedUploadMax();
double currentByterate = peer.getUI().getMetadata().getAverageByterate() / 1024;
int childrenMustServe = peer.getXML().getMinimumChildrenClientsMustServe();
double uploadRequired = currentByterate * childrenMustServe;
serverUsesVerifiedBandwidth = peer.getVerifiedBandwidth();
InetAddress inet = InetAddress.getLocalHost();
if (serverUsesVerifiedBandwidth)
{
usesVerifiedBandwidth = !(requester.getAddress().isLoopbackAddress() || inet.equals(requester.getAddress()) || InetAddress.getByAddress(bandwidthAddress).equals(InetAddress.getByName(xml.getIP())));
}
if (usesVerifiedBandwidth)
{
actualUpload = connect.getVerifiedUploadLimit();
actualDownload = connect.getVerifiedDownloadLimit();
}
else
{
actualUpload = connect.getUnverifiedUpload();
actualDownload = connect.getUnverifiedDownload();
int nActualDownload = (int) actualDownload;
int nActualUpload = (int) actualUpload;
if (nActualDownload < 0)
actualDownload = -nActualDownload * currentByterate;
if (nActualUpload < 0)
actualUpload = -nActualUpload * currentByterate;
}
if (shutdown)
return;
//Check the data
if(connect.getNetworkVersion() < Radio.networkVersion)
{
outPacket = new DenyPacket(DenyPacket.OLD_VERSION);
}
else if (usesVerifiedBandwidth && !Arrays.equals(sourceOctets, bandwidthOctets) )
{
outPacket = new DenyPacket(DenyPacket.IP_INCORRECT);
}
else if(usesVerifiedBandwidth && (actualDownload > maxDownload || actualDownload < 0))
{
outPacket = new DenyPacket(DenyPacket.VERIFIED_DOWNLOAD_BAD);
}
else if(usesVerifiedBandwidth && (actualUpload > maxUpload || actualUpload < 0))
{
outPacket = new DenyPacket(DenyPacket.VERIFIED_UPLOAD_BAD);
}
else if(actualDownload < currentByterate)
{
outPacket = new DenyPacket(DenyPacket.NOT_ENOUGH_DOWNLOAD);
}
else if(actualUpload < uploadRequired)
{
outPacket = new DenyPacket(DenyPacket.NOT_ENOUGH_UPLOAD);
}
//Deny or accept the peer
if (outPacket == null)
{
acceptedPeers.add(requester);
outPacket = new AcceptPacket(peer.getStreamMode(), usesVerifiedBandwidth, peer.getVerifiedBandwidth(), peer.getMulticastAddress(), peer.getMulticastTTL(), System.currentTimeMillis() - peer.getStartTime(), peer.getUI().getMetadata());
}
if (shutdown)
return;
try
{
socket.send(outPacket);
}
catch (Exception e)
{
if (shutdown)
return;
Logger.fine(c, c + ".ERROR_SENDING_TIME", requester);
}
if (shutdown)
return;
}
else if (inPacket instanceof PingPacket)
{
// Ein anderer Peer will ein "Pong" haben
int senderPort = ((PingPacket)inPacket).getSenderPort();
RemotePeer sender;
if (senderPort == 0)
{
// Der Sender kennt seinen Absenderport gar nicht
sender = new RemotePeer(requester);
}
else
{
// Der Sender hat seinen Absenderport angegeben
sender = composeAddressAndPort(requester.getAddress(), senderPort);
}
try
{
Packet pongPacket = new PongPacket();
socket.send(pongPacket);
}
catch (Exception e)
{
// Die Pong-Nachricht konnte nicht gesendet werden
// Das ist nicht weiter schlimm
if (shutdown)
return;
}
if (shutdown)
return;
}
else if (!isServer || peerIsAccepted(requester))
{
if (inPacket instanceof JoinPacket)
{
// Der Peer will aufgenommen werden
RemotePeer newPeer = composeAddressAndPort(requester.getAddress(), ((JoinPacket)inPacket).getSenderPort());
// Ist dieser Peer getrennt? Dann keine
// Kinder mehr aufnehmen
if (!peer.isDisconnected())
{
// Wurde der Peer schon aufgenommen?
// (Die Best鋞igungsnachricht k鰊nte verloren gegangen sein)
if (peer.isChild(newPeer))
{
// OK-Nachricht nochmals schicken
try
{
Packet okPacket = createOKPacket();
socket.send(okPacket);
}
catch (Exception e)
{
if (shutdown)
return;
// Die OK-Nachricht konnte nicht gesendet werden
// Peer wieder entfernen
Logger.fine(c, c + ".ERROR_SENDING_OK_MESSAGE", newPeer); //$NON-NLS-1$ //$NON-NLS-2$
peer.removeChild(newPeer);
}
if(shutdown)
return;
}
else
{
// Soll sich dieser Peer fies verhalten?
if (peer.getMisbehavior(Commands.NO_NEW_CHILDREN))
{
// Fies sein und keine Antwort geben
}
else if (peer.getMisbehavior(Commands.REDIRECT_ALL_CHILDREN_TO_SUPPLIER))
{
if (peer.getSupplier() != null)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -