⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcptransport.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
			socket = null;
		}
	}

	public void runMulticastServer() {

		if (! allowMulticast)
			return;

		byte[] buffer;

		for (;;) {
			buffer = new byte [propagateSize];
			DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
			try {
				multicastSocket.receive(packet);
			} catch (Exception e) {
				if (LOG.isEnabledFor(Priority.ERROR)) LOG.error("failure in runMulticastServer ", e);
				break;
			}

			processReceivingBuffer(buffer);
		}
	}

	public void processMulticast(byte[] buffer) {

		if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processMulticast starts");
		if ((buffer == null) || (buffer.length < Header.length) ) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   buffer null or truncated.");
			return;
		}

		try {
			// Get the header
			Header header = new Header();
			if (! header.initFromNetwork(buffer, 0)) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "header is corrupted. Msg ignored." );
				throw new IOException( "header is corrupted. Msg ignored." );
			}
			if( header.size > buffer.length ) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "size from header is larger than buffer. Msg was truncated." );
				throw new IOException( "size from header is larger than buffer. Msg was truncated." );
			}

			ByteArrayInputStream bais = new ByteArrayInputStream(buffer, Header.length, header.size);
			Message msg = endpoint.newMessage();
			MessageWireFormatFactory.newMessageWireFormat(
			    new MimeMediaType( "application/x-jxta-msg" ) ).readMessage(bais, msg);

			// Give the message to the EndpointService Manager
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  handing message to EndpointService");
			if (!allowMulticast) {
				return;
			}

			// Notify an existing TcpConnection for the source of the message
			// that the peer might be up again.
			EndpointAddress destAddr = msg.getSourceAddress();
			if (destAddr != null) {
				checkConnection(destAddr);
			}

			// Demux the message for the upper layers.
			endpoint.demux(msg);

			// Make sure we are not hogging the cpu with input
			// thus preventing it from being processed.
			// It is better to slow the sending side a bit.
			Thread.yield();

		} catch (IOException e) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   discard message - exception " + e);
			// Just discard the message. Multicast are not reliable
			return;
		}
	}

	public void runReceive(Socket socket, boolean keep) {

		if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceive starts");
		if (socket == null) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   no socket");
			return;
		}

		InputStream inputStream = null;

		try {
			inputStream = socket.getInputStream();
			if (inputStream == null) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   no inputStream");
				socket.close();
				return;
			}
		} catch (Exception e) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  failed to create inputStream");
			try {
				socket.close();
			} catch ( Exception ignored ) {
			}
			return;
		}

		while (true) {
			int size = 0;
			try {
				// Get the header
				byte[] headerBuffer = new byte [Header.length];

				size = inputStream.read(headerBuffer, 0, Header.length);
				if( -1 == size ) {
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "   EOF on inputsteam. Closing socket" );
					inputStream.close();
					socket.close();
					return;
				}

				// Send keepalive info as soon as the cnx is extablished.
				// That'll speed-up the closing of this transaction for the
				// other side.
				try {
					OutputStream op = socket.getOutputStream();
					if (op != null) {
						byte rep;
						if (keep) {
							if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   keep alive");
							// Keep alive ok
							rep = 1;
						} else {
							// Keep alive not ok
							if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   not keep alive");
							rep = 0;
						}
						op.write(rep);
						op.flush();
					}
					else
						if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Cannot send keepalive");
				} catch (Exception e) {
					// Not fatal
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Cannot send keepalive ok");
				}


				if (size != Header.length) {
					// We assume that the header is transmitted in one chunk
					// Discard message
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   wrong header size= " + size +
						        " expected= " + Header.length);
					inputStream.close();
					socket.close();
					return;
				}
				Header header = new Header();
				if (! header.initFromNetwork(headerBuffer, 0)) {
					// The stream is broken. Discard the message
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   header corrupted, closing inputStream.");
					inputStream.close();
					socket.close();
					return;
				}

				size = 0;
				int tmpSize = 0;
				byte[] buffer = new byte [header.size];
				while (size < header.size) {
					try {
						tmpSize = inputStream.read(buffer,
						                           size,
						                           header.size - size);
					} catch (Exception e) {
						// The stream is broken. Discard the message
						if (LOG.isEnabledFor(Priority.DEBUG))
							LOG.debug("   inputStream is broken");
						inputStream.close();
						socket.close();
						return;
					}
					if (tmpSize == -1) {
						// No more bytes to receive. This is an error. Discard the
						// message.
						if (LOG.isEnabledFor(Priority.DEBUG))
							LOG.debug("  no more bytes to receive - incomplete message");
						inputStream.close();
						socket.close();
						return;
					}
					size += tmpSize;
				}

				if (size != header.size) {
					if (LOG.isEnabledFor(Priority.DEBUG))
						LOG.debug("    header size is invalid");
					inputStream.close();
					socket.close();
					return;
				}


				// Give the message to the endpoint manager
				if (LOG.isEnabledFor(Priority.DEBUG))
					LOG.debug("   handing incoming message from " + socket.getInetAddress().getHostAddress() + " to EndpointService");
				ByteArrayInputStream bais = new ByteArrayInputStream(buffer, 0, header.size);
				Message msg = endpoint.newMessage();
				MessageWireFormatFactory.newMessageWireFormat(
				    new MimeMediaType( "application/x-jxta-msg" ) )
				.readMessage(bais, msg);

				// Notify an existing TcpConnection for the source of the message
				// that the peer might be up again.
				EndpointAddress destAddr = msg.getSourceAddress();
				if (destAddr != null) {
					checkConnection(destAddr);
				}

				// Demux the message for the upper layers.
				endpoint.demux(msg);

				// Make sure we are not hogging the cpu with input
				// thus preventing it from being processed.
				// It is better to slow the sending side a bit.
				Thread.yield();

				if (!keep) {
					// No keep alive. Close the socket and return
					if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Not keepalive. close socket");
					inputStream.close();
					socket.close();
					return;
				}
			} catch (Exception e) {
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError " + e);
				if (inputStream != null) {
					try {
						inputStream.close();
					} catch (Exception ez1) {
						if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError cannot close inputStream " + ez1);
					}
				}
				if (socket != null) {
					try {
						socket.close();
					} catch (Exception ez1) {
						if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError cannot close socket " + ez1);
					}
				}
				return;
			}
		}
	}

	/**
	 * Creates and return an EndpointMessenger
	 */
	public EndpointMessenger getMessenger(EndpointAddress dst)
	throws IOException {

		// XXX: the following is a work around in order to
		//      strip out peers that advertise a local IP address
		//      as their EndpointService (127.0.0.1)
		// lomax@jxta.org

		String addr = dst.getProtocolAddress();
		if (addr.indexOf("127.0.0.1") >= 0) {
			if (LOG.isEnabledFor(Priority.DEBUG))
				LOG.debug("getMessenger is trying to get 127.0.0.1 - discard");
			return null;
		}

		return new TcpNonBlockingMessenger(dst, this, connManager);
	}


	/**
	 * Propagates a TransportMessage on this EndpointProtocol
	 *
	 * @param message the TransportMessage to be propagated
	 * @param pName the name of a service
	 * @param pParams parameters for this message.
	 * @param prunePeer ????
	 * @exception IOException   thrown if the message could not be sent for some reason.
	 */
	public synchronized void propagate(Message message,
	                                   String pName,
	                                   String pParams,
	                                   String prunePeer) throws IOException {

		if (! allowMulticast)
			return;
		if (LOG.isEnabledFor(Priority.DEBUG)) {
			LOG.debug("propagate");
			LOG.debug("prunepeer = " + prunePeer);
		}

		// First build the destination and source addresses
		// While mAddress is a global variable which is initialized once with
		// the local EndpointAddress for TCP multicast, the following manipulation
		// is thread-safe: mAddress is used only in this method, which itself is
		// synchronized. Synchronizing this method is not such a bad idea anyway, since
		// it might be a good thing to not allow concurrent IP multicast: that naturally
		// bonds the usage of ip-multicast in a linear matter and not exponantial.

		mAddress.setServiceName(pName);
		mAddress.setServiceParameter(pParams);

		message.setDestinationAddress(mAddress);
		message.setSourceAddress(publicAddress);

		// Allocate a buffer to contain the message and the header
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		MessageWireFormatFactory.newMessageWireFormat(
		    new MimeMediaType( "application/x-jxta-msg" ) )
		.writeMessage(baos, message);
		baos.close();
		byte bytes[] = baos.toByteArray();
		ByteArrayInputStream bais = new ByteArrayInputStream(bytes);

		byte[] buffer = new byte [multicastPacketSize];

		// Store the message into the buffer. We must loop because the input
		// stream may not return the whole message just because we ask for it.

		int msgSize = Header.length;

		for(;;) {
			int res = bais.read( buffer, msgSize, multicastPacketSize - msgSize );

			if( -1 == res )
				break;

			msgSize += res;

			// we have to make sure that we don't allow message size to grow
			// larger than multicast size or we will loop forever reading.
			if( msgSize >= multicastPacketSize ) {
				try {
					bais.close();
				} catch( IOException ignored ) {}
				if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Message discarded : larger than multicast packet size ("
					        + multicastPacketSize + ")"  );
				throw new IOException(
				    "Message discarded : larger than multicast packet size ("
				    + multicastPacketSize + ")" );
			}
		}

		try {
			bais.close();
		} catch( IOException ignored ) {}

		if( Header.length == msgSize ) {
			// Something odd happened. Discard the message
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Message discarded : could not read whole message from stream" );
			throw new IOException( "Message discarded : could not read whole message from stream" );
		}

		// First build the header
		Header header = new Header();
		header.cmd = Header.PROPAGATE;

		// XXX: This part could probably be initialized only once. To be fixed.
		header.srcAddr = usingInterface.getAddress();

		header.srcPort = serverSocketPort;
		header.size = msgSize - header.length;

		header.buildForNetwork(buffer, 0);

		DatagramPacket packet = new DatagramPacket(buffer, msgSize,
		                        propagateInetAddress,
		                        propagatePort);
		if (multicastSocket == null) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -