⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmudpconnection.java

📁 JMule是一个基于Java开发
💻 JAVA
字号:
/* *  JMule - Java file sharing client *  Copyright (C) 2007-2008 JMule team ( jmule@jmule.org / http://jmule.org ) * *  Any parts of this program derived from other projects, or contributed *  by third-party developers are copyrighted by their respective authors. * *  This program is free software; you can redistribute it and/or *  modify it under the terms of the GNU General Public License *  as published by the Free Software Foundation; either version 2 *  of the License, or (at your option) any later version. * *  This program is distributed in the hope that it will be useful, *  but WITHOUT ANY WARRANTY; without even the implied warranty of *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *  GNU General Public License for more details. * *  You should have received a copy of the GNU General Public License *  along with this program; if not, write to the Free Software *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. * */package org.jmule.core.net;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.DatagramChannel;import java.util.concurrent.ConcurrentLinkedQueue;import org.jmule.core.JMThread;import org.jmule.core.JMuleCore;import org.jmule.core.JMuleCoreFactory;import org.jmule.core.configmanager.ConfigurationAdapter;import org.jmule.core.configmanager.ConfigurationManager;import org.jmule.core.configmanager.ConfigurationManagerFactory;import org.jmule.core.edonkey.ServerManager;import org.jmule.core.edonkey.impl.Server;import org.jmule.core.edonkey.packet.PacketReader;import org.jmule.core.edonkey.packet.UDPPacket;import org.jmule.core.edonkey.packet.scannedpacket.ScannedUDPPacket;import org.jmule.util.Average;/** *  * @author binary256 * @version $$Revision: 1.3 $$ * Last changed by $$Author: binary256_ $$ on $$Date: 2008/09/07 14:55:47 $$ */public class JMUDPConnection {		public static final int UDP_SOCKET_OPENED = 0x01;	public static final int UDP_SOCKET_CLOSED = 0x00;		private JMuleCore _core;		private static JMUDPConnection singleton = null;		private DatagramChannel listenChannel;	private UDPListenThread udpListenThread;		private Average wrongPacketCount = new Average(10);	private int currentlyWrongPackets = 0;	private WrongPacketCheckingThread wrongPacketChekingThread;			private ConcurrentLinkedQueue<UDPPacket> sendQueue = new ConcurrentLinkedQueue<UDPPacket>();	private PacketSenderThread packetSenderThread;		private ConcurrentLinkedQueue<ScannedUDPPacket> receiveQueue = new ConcurrentLinkedQueue<ScannedUDPPacket>();		private PacketProcessorThread packetProcessorThread = new PacketProcessorThread();		private int connectionStatus = UDP_SOCKET_CLOSED;		public static JMUDPConnection getInstance(){		if (singleton == null)			 singleton = new JMUDPConnection();		return singleton;	}		private JMUDPConnection(){		_core = JMuleCoreFactory.getSingleton();				_core.getConfigurationManager().addConfigurationListener(new ConfigurationAdapter() {			public void UDPPortChanged(int udp) {				try {					reopen();				} catch (JMUDPConnectionException e) {					e.printStackTrace();				}			}						public void isUDPEnabledChanged(boolean enabled) {				try {					if (enabled)						reopen();					else						close();				} catch (JMUDPConnectionException e) {					e.printStackTrace();				}			}		});	}		public void open() throws JMUDPConnectionException {		try {			int listenPort = ConfigurationManagerFactory.getInstance().getUDP();			listenChannel = DatagramChannel.open();			listenChannel.socket().bind(new InetSocketAddress(listenPort));			listenChannel.configureBlocking(true);						udpListenThread = new UDPListenThread();			udpListenThread.start();						wrongPacketChekingThread = new WrongPacketCheckingThread();			wrongPacketChekingThread.start();						packetProcessorThread = new PacketProcessorThread();			packetProcessorThread.start();						packetSenderThread = new PacketSenderThread();			packetSenderThread.start();						connectionStatus = UDP_SOCKET_OPENED;		} catch (Throwable t) {			throw new JMUDPConnectionException(t);		}	}		public boolean isOpen() {		return connectionStatus ==  UDP_SOCKET_OPENED;	}		public void close() throws JMUDPConnectionException {				if (!isOpen()) return;		try {			connectionStatus = UDP_SOCKET_CLOSED;			wrongPacketChekingThread.JMStop();			udpListenThread.JMStop();			packetSenderThread.JMStop();			packetProcessorThread.JMStop();			listenChannel.close();		} catch (IOException e) {			throw new JMUDPConnectionException(e);		}	}		public void reopen() throws JMUDPConnectionException {		if (isOpen())			close();		open();	}	private void addReceivedPacket(ScannedUDPPacket packet ){		receiveQueue.offer(packet);		if (packetProcessorThread.isSleeping()) 			packetProcessorThread.wakeUp();				}		// Send UDP packet code 	public void sendPacket(UDPPacket packet ) {		if (!isOpen()) return ;		sendQueue.offer(packet);		if (packetSenderThread.isSleeping())			packetSenderThread.wakeUp();				}				private void ban() {	}				/** Packet listener **/	private class UDPListenThread extends JMThread {				private volatile boolean stop = false;				public UDPListenThread() {			super("UDP packets listener");		}				public void run() {			while(!stop){				UDPPacket packet = PacketReader.readPacket(listenChannel);				if (packet != null) {					ScannedUDPPacket scanned_packet = PacketScanner.scanPacket(packet);					// scanned_packet == null is is not supported or decoding was failed					if (scanned_packet != null)						addReceivedPacket(scanned_packet);					else						currentlyWrongPackets++;				}				else {					if (stop) return ;					if (!listenChannel.isConnected()) break;					currentlyWrongPackets++;				}			}		}				public void JMStop() {			stop = true;			interrupt();		}	}				private class PacketProcessorThread extends JMThread {		private boolean stop = false;		private boolean sleeping = false;		private ServerManager server_manager = JMuleCoreFactory.getSingleton().getServerManager();		public PacketProcessorThread() {			super("UDP Packet processor");		}				public void run() {			while(!stop) {				if (receiveQueue.size()==0) {					// no more packets, sleeping...					sleeping = true;					synchronized(this) {						try {							this.wait();						} catch (InterruptedException e1) {} 					}					sleeping = false;					continue ;				}				ScannedUDPPacket packet = receiveQueue.poll();				// call server 				Server server = server_manager.getServer(packet.getSenderAddress());				if (server == null) continue ; // Packet from unknown source				server.processPacket(packet);			}		}				public void JMStop() {			stop = true;			interrupt();		}				public boolean isSleeping() {			return sleeping;		}				public void wakeUp() {			synchronized (this) {				notify();			}		}	}		private class PacketSenderThread extends JMThread {		private boolean stop = false;		private boolean sleeping = false;				public PacketSenderThread() {			super("UDP packet sender thread");		}				public void run() {			while(!stop) {				if (sendQueue.size() == 0) {					// no more packets, sleeping...					sleeping = true;					synchronized(this) {						try {							this.wait();						} catch (InterruptedException e1) {} 					}					sleeping = false;					continue ;				}				UDPPacket packet = sendQueue.poll();				InetSocketAddress destination = packet.getAddress();				try {					packet.getAsByteBuffer().position(0);					listenChannel.send(packet.getAsByteBuffer(), destination);				} catch (IOException e) {					if (stop) return;					if (!listenChannel.isConnected()) return;				}			}		}				public void JMStop() {			stop = true;			synchronized (this) {				interrupt();			}		}				public boolean isSleeping() {			return sleeping;		}				public void wakeUp() {			synchronized (this) {				notify();			}		}			}		private class WrongPacketCheckingThread extends JMThread {				private volatile boolean stop = false;				public WrongPacketCheckingThread()  {			super("UDP Wrong packet checking thread");		}				public void run() {			while(!stop) {				try {					Thread.sleep( ConfigurationManager.WRONG_PACKET_CHECK_INTERVAL );				} catch (InterruptedException e) {					if (stop)						return ;					else continue;				}				wrongPacketCount.add(currentlyWrongPackets);				currentlyWrongPackets = 0;				if ( wrongPacketCount.getAverage()>= ConfigurationManager.MAX_WRONG_PACKET_COUNT ) 					ban();							}		}				public void JMStop() {			stop = true;			interrupt();		}	}	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -