⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcptransport.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        if (multicastSocket == null) {
            // The multicast socket needs first to be allocated.
            try {
                multicastSocket = new MulticastSocket(propagatePort);
                multicastSocket.setInterface( usingInterface );
                multicastSocket.joinGroup(propagateInetAddress);
            } catch (Exception e) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
            }
        }
        try {
            multicastSocket.send(packet);
            //XXX: Shouldn't we close the socket?!
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
        }
    }


    /**
     *  Return the protocol name we support with this instance. For the TCP
     *  transport this is the only protocol supported so we return the same
     *  result as getSupportedProtocolName()
     *
     *  @return String containing the name of the protocol we are supporting.
     **/
    public String getProtocolName() {
        return protocolName;
    }

    /**
     * Return the endpoint address by which this peer is accessible via this
     * transport.
     */
    public EndpointAddress getPublicAddress() {
        return publicAddress;
    }

    public boolean isConnectionOriented() {
        return true;
    }

    /**
     * Returns true if the endpoint protocol can be used by the EndpointRouter
     *
     * @return boolean true if the protocol can be used by the EndpointRouter
     */
    public boolean allowRouting() {
	return true;
    }


    /**
     * Ping a remote host.
     *
     * This implementation tries to open a connection, and after tests the result. Note
     * if there is already an open connection, no new connection is actually created.
     **/
    public  boolean ping(EndpointAddress addr) {

        try {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ping to " + addr.toString());
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ping cannot print address because: " + e);
        }

        // XXX: the following is a work around in order to
        //      strip out peers that advertise a local IP address
        //      as their EndpointService (127.0.0.1)
        // lomax@jxta.org

        try {
            String tmpAddr = addr.getProtocolAddress();
            if (tmpAddr.indexOf("127.0.0.1") >= 0) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   pinging localhost - discard");
                return false;
            }
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  failed " + e);
            return false;
        }

	TcpConnection conn = connManager.getTcpConnection (addr);
	if (conn == null) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  no connection return false ");
	    return false;
	}

        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("  return " + conn.isConnected());
	return conn.isConnected();
    }

    // If there is already a TcpConnection for the given host, notify it
    // that the connection might be up again (if it was disconnected)
    private void checkConnection (EndpointAddress addr) {

	TcpConnection conn = connManager.getTcpConnection (addr);
	if (conn == null) {
	    // No pending connection. Nothing to do.
	    return;
	}

	conn.notifyActivity();
    }

    public class IncomingUnicastThreads implements Runnable {

        protected Vector sockets = null;
        protected TcpTransport tp = null;
        private volatile int waitingThreads = 0;
        private   int nbOfThreads = 0;
        private   int nbOfKeepAlive = 0;

        public IncomingUnicastThreads(TcpTransport tp, int initialNumber) {

            this.tp = tp;
            sockets = new Vector();

            for (int i = 0; i < initialNumber; ++i) {
                ++nbOfThreads;
                Thread t = new Thread(this, "TCP Incoming Unicast:" + nbOfThreads);
                t.start();
            }
        }

        public void run() {

            try {
                Socket socket = null;
                while (true) {
                    socket = waitForSocket();
                    if (socket == null) {
                        // That means that this thread must die
                        decThreads();
                        return;
                    }
                    boolean keep = doesKeepAlive();
                    tp.runReceive(socket, keep);
                    if (keep) {
                        stopKeepAlive();
                    }
                }
            } catch ( Throwable all ) {
                if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
            }
        }

        synchronized private void decThreads() {
            --nbOfThreads;
        }

        synchronized protected Socket waitForSocket() {

            Socket socket = null;

            while (true) {

                if (sockets.size() > 0) {
                    try {
                        socket = (Socket) sockets.elementAt(0);
                    } catch (Exception e) {
                        // Strange error. XXX: to handle better
                        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket error: " + e);
                    }
                    if (socket != null) {
                        try {
                            sockets.removeElementAt(0);
                        } catch (Exception e) {
                            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket error: " + e);
                        }
                        return socket;
                    }
                    // We should not get here.
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket cannot get incoming socket");
                }

                // No more socket to consume... Just wait.
                // Check if this thread must die or wait for a new socket
                if (mustDie()) {
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket: killing thread");
                    return null;
                }
                try {
                    ++waitingThreads;
                    wait();
                    --waitingThreads;
                } catch (Exception e) {
                }
            }
        }

        synchronized private boolean mustDie() {

            if ((nbOfThreads > MaxNbOfUnicastThreads) ||
            (waitingThreads > 1)) {
                return true;
            }
            return false;
        }

        synchronized private boolean doesKeepAlive() {

            if (nbOfKeepAlive >= MaxNbOfUnicastKeepAliveThreads) {
                return false;
            }
            ++nbOfKeepAlive;
            return true;
        }

        synchronized private void stopKeepAlive() {
            --nbOfKeepAlive;
        }


        synchronized protected void newSocket(Socket socket) {

            if (sockets.size() > MaxNbOfPendingSockets) {
                // Too many request. Just drop it.
                try {
                    socket.close();
                } catch (Exception e) {
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket socket.close failed with " + e);
                }
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket: too many incoming request, dropped incoming message");
                return;
            }

            sockets.addElement(socket);
            // Check if we need to allocate a new Thread
            if (waitingThreads == 0) {
                ++nbOfThreads;
                // Allocate a new thread.
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket: create a new Thread");
                Thread t = new Thread(this, "TCP Incoming Unicast:" + nbOfThreads );
                t.start();
            }
            try {
                notify();
            } catch (Exception e) {
            }
        }
    }



    void processReceivingSocket(Socket socket) {

        unicastThreads.newSocket(socket);
    }


    public class IncomingMulticastThreads implements Runnable {

        protected Vector buffers = null;
        protected TcpTransport tp = null;
        private volatile int waitingThreads = 0;
        private   int nbOfThreads = 0;

        public IncomingMulticastThreads(TcpTransport tp, int initialNumber) {

            this.tp = tp;
            buffers = new Vector();

            for (int i = 0; i < initialNumber; ++i) {
                ++nbOfThreads;
                Thread t = new Thread(this, "TCP Incoming Multicast:" + nbOfThreads);
                t.start();
            }
        }

        public void run() {

            try {
                byte[] buffer = null;
                while (true) {
                    buffer = waitForBuffer();
                    if (buffer == null) {
                        // That means that this thread must die
                        decThreads();
                        return;
                    }
                    tp.processMulticast(buffer);
                }
            } catch ( Throwable all ) {
                if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
            }
        }

        synchronized private void decThreads() {
            --nbOfThreads;
        }

        synchronized protected byte[] waitForBuffer() {

            byte[] buffer = null;

            while (true) {

                if (buffers.size() > 0) {
                    try {
                        buffer = (byte[]) buffers.elementAt(0);
                    } catch (Exception e) {
                        // Strange error. XXX: to handle better
                        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer error: " + e);
                    }
                    if (buffer != null) {
                        try {
                            buffers.removeElementAt(0);
                        } catch (Exception e) {
                            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer error: " + e);
                        }
                        return buffer;
                    }
                    // We should not get here.
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer cannot get incoming buffer");
                }

                // No more socket to consume... Just wait.
                // Check if this thread must die or wait for a new socket
                if (mustDie()) {
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer: killing thread " + Thread.currentThread().getName() );
                    return null;
                }
                try {
                    ++waitingThreads;
                    wait();
                    --waitingThreads;
                } catch (Exception e) {
                }
            }
        }

        synchronized private boolean mustDie() {

            if ((nbOfThreads > MaxNbOfMulticastThreads) ||
            (waitingThreads > 0)) {
                return true;
            }
            return false;
        }

        synchronized protected void newBuffer(byte[] buffer) {

            buffers.addElement(buffer);
            // Check if we need to allocate a new Thread
            if (waitingThreads == 0) {
                ++nbOfThreads;
                // Allocate a new thread.
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newBuffer: create a new Multicast receive Thread");
                Thread t = new Thread(this, "TCP Incoming Multicast:" + nbOfThreads);
                t.start();
            }
            try {
                notify();
            } catch (Exception e) {
            }
        }
    }

    void processReceivingBuffer(byte[] buffer) {

        multicastThreads.newBuffer(buffer);
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -