⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcptransport.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            } catch (SecurityException e2) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ServerSocket.accept() on port " +
                serverSocketPort +
                " has failed : " + e2.toString());
                continue;
            }
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runUnicastServer: received a connection");
            processReceivingSocket(socket);
        }
    }

    public void runMulticastServer() {

        byte[] buffer;

        for (;;) {
            buffer = new byte [propagateSize];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            try {
                multicastSocket.receive(packet);
            } catch (Exception e) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runMulticastServer " + e);
                break;
            }
            processReceivingBuffer(buffer);
        }
    }

    public void processMulticast(byte[] buffer) {

        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processMulticast starts");
        if ((buffer == null) || (buffer.length < Header.length) ) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   buffer null or truncated.");
            return;
        }

        try {
            // Get the header
            Header header = new Header();
            if (! header.initFromNetwork(buffer, 0)) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "header is corrupted. Msg ignored." );
                throw new IOException( "header is corrupted. Msg ignored." );
            }
            if( header.size > buffer.length ) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "size from header is larger than buffer. Msg was truncated." );
                throw new IOException( "size from header is larger than buffer. Msg was truncated." );
            }

            ByteArrayInputStream bais = new ByteArrayInputStream(buffer, Header.length, header.size);
            Message msg = endpoint.newMessage();
            MessageWireFormatFactory.newMessageWireFormat(
            new MimeMediaType( "application/x-jxta-msg" ) ).readMessage(bais, msg);

            // Give the message to the EndpointService Manager
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  handing message to EndpointService");
	    if (!allowMulticast) {
		return;
	    }

	    // Notify an existing TcpConnection for the source of the message
	    // that the peer might be up again.
	    EndpointAddress destAddr = msg.getSourceAddress();
	    if (destAddr != null) {
		checkConnection (destAddr);
	    }

	    // Demux the message for the upper layers.
            endpoint.demux(msg);

            // Make sure we are not hogging the cpu with input
            // thus preventing it from being processed.
            // It is better to slow the sending side a bit.
            Thread.yield();

        } catch (IOException e) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   discard message - exception " + e);
            // Just discard the message. Multicast are not reliable
            return;
        }
    }

    public void runReceive(Socket socket, boolean keep) {

        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceive starts");
        if (socket == null) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   no socket");
            return;
        }

        InputStream inputStream = null;

        try {
            inputStream = socket.getInputStream();
            if (inputStream == null) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   no inputStream");
                socket.close();
                return;
            }
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  failed to create inputStream");
            try {
                socket.close();
            } catch ( Exception ignored ) {
            }
            return;
        }

        while (true) {
            int size = 0;
            try {
                // Get the header
                byte[] headerBuffer = new byte [Header.length];

		size = inputStream.read(headerBuffer, 0, Header.length);
                if( -1 == size ) {
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "   EOF on inputsteam. Closing socket" );
                    inputStream.close();
                    socket.close();
                    return;
                }

                // Send keepalive info as soon as the cnx is extablished.
                // That'll speed-up the closing of this transaction for the
                // other side.
                try {
                    OutputStream op = socket.getOutputStream();
                    if (op != null) {
                        byte rep;
                        if (keep) {
                            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   keep alive");
                            // Keep alive ok
                            rep = 1;
                        } else {
                            // Keep alive not ok
                            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   not keep alive");
                            rep = 0;
                        }
                        op.write(rep);
                        op.flush();
                    }
                    else
                        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Cannot send keepalive");
                } catch (Exception e) {
                    // Not fatal
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Cannot send keepalive ok");
                }


                if (size != Header.length) {
                    // We assume that the header is transmitted in one chunk
                    // Discard message
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   wrong header size= " + size +
                    " expected= " + Header.length);
                    inputStream.close();
                    socket.close();
                    return;
                }
                Header header = new Header();
                if (! header.initFromNetwork(headerBuffer, 0)) {
                        // The stream is broken. Discard the message
                        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   header corrupted, closing inputStream.");
                        inputStream.close();
                        socket.close();
                        return;
                }

                size = 0;
                int tmpSize = 0;
                byte[] buffer = new byte [header.size];
                while (size < header.size) {
                    try {
                        tmpSize = inputStream.read(buffer,
                        size,
                        header.size - size);
                    } catch (Exception e) {
                        // The stream is broken. Discard the message
                        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   inputStream is broken");
                        inputStream.close();
                        socket.close();
                        return;
                    }
                    if (tmpSize == -1) {
                        // No more bytes to receive. This is an error. Discard the
                        // message.
                        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  no more bytes to receive - incomplete message");
                        inputStream.close();
                        socket.close();
                        return;
                    }
                    size += tmpSize;
                }

                if (size != header.size) {
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("    header size is invalid");
                    inputStream.close();
                    socket.close();
                    return;
                }


                // Give the message to the endpoint manager
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   handing incoming message to EndpointService");
                ByteArrayInputStream bais = new ByteArrayInputStream(buffer, 0, header.size);
                Message msg = endpoint.newMessage();
                MessageWireFormatFactory.newMessageWireFormat(
                new MimeMediaType( "application/x-jxta-msg" ) )
                .readMessage(bais, msg);

		// Notify an existing TcpConnection for the source of the message
		// that the peer might be up again.
		EndpointAddress destAddr = msg.getSourceAddress();
		if (destAddr != null) {
		    checkConnection (destAddr);
		}

		// Demux the message for the upper layers.
                endpoint.demux(msg);

                // Make sure we are not hogging the cpu with input
                // thus preventing it from being processed.
                // It is better to slow the sending side a bit.
                Thread.yield();

                if (!keep) {
                    // No keep alive. Close the socket and return
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Not keepalive. close socket");
                    inputStream.close();
                    socket.close();
                    return;
                }
            } catch (Exception e) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError " + e);
		if (inputStream != null) {
		    try {
			inputStream.close();
		    } catch (Exception ez1) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError cannot close inputStream " + ez1);
		    }
		}
		if (socket != null) {
		    try {
			socket.close();
		    } catch (Exception ez1) {
			if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("runReceiveError cannot close socket " + ez1);
		    }
		}
                return;
            }
        }
    }

    /**
     * Creates and return an EndpointMessenger
     */
    public EndpointMessenger getMessenger(EndpointAddress dst)
        throws IOException {

        // XXX: the following is a work around in order to
        //      strip out peers that advertise a local IP address
        //      as their EndpointService (127.0.0.1)
        // lomax@jxta.org

        String addr = dst.getProtocolAddress();
        if (addr.indexOf("127.0.0.1") >= 0) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("getMessenger is trying to get 127.0.0.1 - discard");
            return null;
        }

	return new TcpNonBlockingMessenger(dst, this, connManager);
    }


    /**
     * Propagates a TransportMessage on this EndpointProtocol
     *
     * @param message the TransportMessage to be propagated
     * @param pName the name of a service
     * @param pParams parameters for this message.
     * @param prunePeer ????
     * @exception IOException   thrown if the message could not be sent for some reason.
     */
    public synchronized void propagate(Message message,
                                       String pName,
                                       String pParams,
                                       String prunePeer) throws IOException {

        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("propagate");
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("prunepeer = " + prunePeer);

        // First build the destination and source addresses
	// While mAddress is a global variable which is initialized once with
	// the local EndpointAddress for TCP multicast, the following manipulation
	// is thread-safe: mAddress is used only in this method, which itself is
	// synchronized. Synchronizing this method is not such a bad idea anyway, since
	// it might be a good thing to not allow concurrent IP multicast: that naturally
	// bonds the usage of ip-multicast in a linear matter and not exponantial.

        mAddress.setServiceName(pName);
        mAddress.setServiceParameter(pParams);

        message.setDestinationAddress(mAddress);
        message.setSourceAddress(publicAddress);

        // Allocate a buffer to contain the message and the header
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        MessageWireFormatFactory.newMessageWireFormat(
            new MimeMediaType( "application/x-jxta-msg" ) )
            .writeMessage(baos, message);
        baos.close();
        byte bytes[] = baos.toByteArray();
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);

        byte[] buffer = new byte [multicastPacketSize];

        // Store the message into the buffer. We must loop because the input
        // stream may not return the whole message just because we ask for it.

        int msgSize = Header.length;

        for(;;) {
            int res = bais.read( buffer, msgSize, multicastPacketSize - msgSize );

            if( -1 == res )
                break;

            msgSize += res;

            // we have to make sure that we don't allow message size to grow
            // larger than multicast size or we will loop forever reading.
            if( msgSize >= multicastPacketSize ) {
                try {
                    bais.close();
                } catch( IOException ignored ) {}
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Message discarded : larger than multicast packet size ("
                + multicastPacketSize + ")"  );
                throw new IOException(
                "Message discarded : larger than multicast packet size ("
                + multicastPacketSize + ")" );
            }
        }

        try {
            bais.close();
        } catch( IOException ignored ) {}

        if( Header.length == msgSize ) {
            // Something odd happened. Discard the message
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Message discarded : could not read whole message from stream" );
            throw new IOException( "Message discarded : could not read whole message from stream" );
        }

        // First build the header
        Header header = new Header();
        header.cmd = Header.PROPAGATE;

        // XXX: This part could probably be initialized only once. To be fixed.
        header.srcAddr = usingInterface.getAddress();

        header.srcPort = serverSocketPort;
        header.size = msgSize - header.length;

        header.buildForNetwork(buffer, 0);

        DatagramPacket packet = new DatagramPacket(buffer, msgSize,
            propagateInetAddress,
            propagatePort);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -