⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcptransport.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
			// The multicast socket needs first to be allocated.
			try {
				multicastSocket = new MulticastSocket(propagatePort);
				multicastSocket.setInterface( usingInterface );
				multicastSocket.joinGroup(propagateInetAddress);
			} catch (Exception e) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
			}
		}
		try {
			multicastSocket.send(packet);
			//XXX: Shouldn't we close the socket?!
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
		}
	}


	/**
	 *  Return the protocol name we support with this instance. For the TCP
	 *  transport this is the only protocol supported so we return the same
	 *  result as getSupportedProtocolName()
	 *
	 *  @return String containing the name of the protocol we are supporting.
	 **/
	public String getProtocolName() {
		return protocolName;
	}

	/**
	 * Return the endpoint address by which this peer is accessible via this
	 * transport.
	 */
	public EndpointAddress getPublicAddress() {
		return publicAddress;
	}

	public boolean isConnectionOriented() {
		return true;
	}

	/**
	 * Returns true if the endpoint protocol can be used by the EndpointRouter
	 *
	 * @return boolean true if the protocol can be used by the EndpointRouter
	 */
	public boolean allowRouting() {
		return true;
	}


	/**
	 * Ping a remote host.
	 *
	 * This implementation tries to open a connection, and after tests the result. Note
	 * if there is already an open connection, no new connection is actually created.
	 **/
	public  boolean ping(EndpointAddress addr) {

		try {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("ping to " + addr.toString());
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("ping cannot print address because: ", e);
		}

		// XXX: the following is a work around in order to
		//      strip out peers that advertise a local IP address
		//      as their EndpointService (127.0.0.1)
		// lomax@jxta.org

		try {
			String tmpAddr = addr.getProtocolAddress();
			if (tmpAddr.indexOf("127.0.0.1") >= 0) {
				if (LOG.isEnabledFor(Priority.DEBUG))
					LOG.debug("   pinging localhost - discard");
				return false;
			}
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("  failed ", e);
			return false;
		}

		TcpConnection conn = connManager.getTcpConnection(addr);
		if (conn == null) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("  no connection return false ");
			return false;
		}

		if (LOG.isEnabledFor(Priority.DEBUG))
			LOG.debug("  return " + conn.isConnected());
		return conn.isConnected();
	}

	// If there is already a TcpConnection for the given host, notify it
	// that the connection might be up again (if it was disconnected)
	private void checkConnection(EndpointAddress addr) {

		TcpConnection conn = connManager.getTcpConnection(addr);
		if (conn == null) {
			// No pending connection. Nothing to do.
			return;
		}

		conn.notifyActivity();
	}

	public class IncomingUnicastThreads implements Runnable {

		protected Vector sockets = null;
		protected TcpTransport tp = null;
		private volatile int waitingThreads = 0;
		private   int nbOfThreads = 0;
		private   int nbOfKeepAlive = 0;

		public IncomingUnicastThreads(TcpTransport tp, int initialNumber) {

			this.tp = tp;
			sockets = new Vector();

			for (int i = 0; i < initialNumber; ++i) {
				++nbOfThreads;
				Thread t = new Thread(this, "TCP Incoming Unicast:" + nbOfThreads);
				t.start();
			}
		}

		public void run() {

			try {
				Socket socket = null;
				while (true) {
					socket = waitForSocket();
					if (socket == null) {
						// That means that this thread must die
						decThreads();
						return;
					}
					boolean keep = doesKeepAlive();
					tp.runReceive(socket, keep);
					if (keep) {
						stopKeepAlive();
					}
				}
			} catch ( Throwable all ) {
				if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
			}
		}

		synchronized private void decThreads() {
			--nbOfThreads;
		}

		synchronized protected Socket waitForSocket() {

			Socket socket = null;

			while (true) {

				if (sockets.size() > 0) {
					try {
						socket = (Socket) sockets.elementAt(0);
					} catch (Exception e) {
						// Strange error. XXX: to handle better
						if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket error: " + e);
					}
					if (socket != null) {
						try {
							sockets.removeElementAt(0);
						} catch (Exception e) {
							if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket error: " + e);
						}
						return socket;
					}
					// We should not get here.
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket cannot get incoming socket");
				}

				// No more socket to consume... Just wait.
				// Check if this thread must die or wait for a new socket
				if (mustDie()) {
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket: killing thread");
					return null;
				}
				try {
					++waitingThreads;
					wait();
					--waitingThreads;
				} catch (Exception e) {
				}
			}
		}

		synchronized private boolean mustDie() {

			if ((nbOfThreads > MaxNbOfUnicastThreads) ||
			        (waitingThreads > 1)) {
				return true;
			}
			return false;
		}

		synchronized private boolean doesKeepAlive() {

			if (nbOfKeepAlive >= MaxNbOfUnicastKeepAliveThreads) {
				return false;
			}
			++nbOfKeepAlive;
			return true;
		}

		synchronized private void stopKeepAlive() {
			--nbOfKeepAlive;
		}


		synchronized protected void newSocket(Socket socket) {

			if (sockets.size() > MaxNbOfPendingSockets) {
				// Too many request. Just drop it.
				try {
					socket.close();
				} catch (Exception e) {
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket socket.close failed with " + e);
				}
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket: too many incoming request, dropped incoming message");
				return;
			}

			sockets.addElement(socket);
			// Check if we need to allocate a new Thread
			if (waitingThreads == 0) {
				++nbOfThreads;
				// Allocate a new thread.
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket: create a new Thread");
				Thread t = new Thread(this, "TCP Incoming Unicast:" + nbOfThreads );
				t.start();
			}
			try {
				notify();
			} catch (Exception e) {
			}
		}
	}



	void processReceivingSocket(Socket socket) {

		unicastThreads.newSocket(socket);
	}


	public class IncomingMulticastThreads implements Runnable {

		protected Vector buffers = null;
		protected TcpTransport tp = null;
		private volatile int waitingThreads = 0;
		private   int nbOfThreads = 0;

		public IncomingMulticastThreads(TcpTransport tp, int initialNumber) {

			this.tp = tp;
			buffers = new Vector();

			for (int i = 0; i < initialNumber; ++i) {
				++nbOfThreads;
				Thread t = new Thread(this, "TCP Incoming Multicast:" + nbOfThreads);
				t.start();
			}
		}

		public void run() {

			try {
				byte[] buffer = null;
				while (true) {
					buffer = waitForBuffer();
					if (buffer == null) {
						// That means that this thread must die
						decThreads();
						return;
					}
					tp.processMulticast(buffer);
				}
			} catch ( Throwable all ) {
				if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
			}
		}

		synchronized private void decThreads() {
			--nbOfThreads;
		}

		synchronized protected byte[] waitForBuffer() {

			byte[] buffer = null;

			while (true) {

				if (buffers.size() > 0) {
					try {
						buffer = (byte[]) buffers.elementAt(0);
					} catch (Exception e) {
						// Strange error. XXX: to handle better
						if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer error: " + e);
					}
					if (buffer != null) {
						try {
							buffers.removeElementAt(0);
						} catch (Exception e) {
							if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer error: " + e);
						}
						return buffer;
					}
					// We should not get here.
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer cannot get incoming buffer");
				}

				// No more socket to consume... Just wait.
				// Check if this thread must die or wait for a new socket
				if (mustDie()) {
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer: killing thread " + Thread.currentThread().getName() );
					return null;
				}
				try {
					++waitingThreads;
					wait();
					--waitingThreads;
				} catch (Exception e) {
				}
			}
		}

		synchronized private boolean mustDie() {

			if ((nbOfThreads > MaxNbOfMulticastThreads) ||
			        (waitingThreads > 0)) {
				return true;
			}
			return false;
		}

		synchronized protected void newBuffer(byte[] buffer) {

			buffers.addElement(buffer);
			// Check if we need to allocate a new Thread
			if (waitingThreads == 0) {
				++nbOfThreads;
				// Allocate a new thread.
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newBuffer: create a new Multicast receive Thread");
				Thread t = new Thread(this, "TCP Incoming Multicast:" + nbOfThreads);
				t.start();
			}
			try {
				notify();
			} catch (Exception e) {
			}
		}
	}

	void processReceivingBuffer(byte[] buffer) {

		multicastThreads.newBuffer(buffer);
	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -