⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 wirepipe.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    /**
     * create an OutputPipe from the pipe Advertisement giving an Enumeration of
     * PeerId(s). Only peers in that Enumeration will receive messages.
     *
     * @param peers is an enumeration of the PeerId of peers that will receive the
     * message.
     *
     * @return OuputPipe
     * @throws IOException if none of the peers in the enumeration has the
     * corresponding OutputPipe
     */
    
    public OutputPipe    createOutputPipe( Enumeration peers, long timeout)
	throws IOException {
        
        if (!valid) {
            // This WIRE is not valid
            throw new IOException();
        }
        
        // create a OutputPipe
        OutputPipe op = null;
        op = new WirePrivateOutputPipe(this, wireId, peers);
        return op;
    }
    
    public void sendMessage(Message msg, WireHeader header)
	throws IOException {
        
        // send locally.
        msg = localDemux(msg);
        
        // send remote.
        if (LOG.isEnabledFor(Priority.INFO)) LOG.info("propagating message");
        propagate(msg, header);
    }
    
    
    public void processIncomingMessage(Message msg,
				       EndpointAddress srcAddr,
				       EndpointAddress dstAddr) {
        
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processIncominMessage starts");
        
        // Check if there is a JXTA-WIRE header
        WireHeader header = null;
        try {
	    MessageElement elem = msg.getElement(WireTagName);
            InputStream ip = elem.getStream();
            if (ip != null) {
                header = new WireHeader(ip);
                ip.close();
            }
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("No JxtaWireHeader tag. Message is discarded");
            return;
        }
        
        if (isLoopback(header)) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Loopback detected - discard");
            return;
        }
        
        if (!isAlive(header)) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Dead on arrival - discard");
            return;
        }
        
        // Message is valid
        
        // Queue the message to local InputPipes
        msg = localDemux(msg);
        
        if (!isToBePropagated(header)) {
            // This message ran out of fuel.
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Max TTL reached - discard");
            return;
        }
        if (LOG.isEnabledFor(Priority.INFO)) LOG.info("repropagating message");
        propagate(msg, header);
        
    }
    
    // Check if the "peers" list within the header contains
    // the local peer.
    private boolean isLoopback(WireHeader header) {
        
        Vector peers = header.getPeers();
        if ((peers == null) || (peers.size() <= 0)) {
            // This header does not contain any peer. This is
            // suspicious, so, discard the message, pretending
            // it is a loopback (messages are loopback "by default").
            if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("isLoopback: No peers in header");
            return true;
        }
        String peerId = null;
        for (int i = 0; i < peers.size(); ++i) {
            try {
                peerId = (String) peers.elementAt(i);
                if (peerId.equals(localPeerId)) {
                    // This is a lookback
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("isLoopback: local peer is in peers");
                    return true;
                }
            } catch (Exception e) {
                if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("isLoopback failed with " + e);
                continue;
            }
        }
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("isLoopback: this message is not a loopback");
        return false;
    }
    
    private boolean isAlive(WireHeader header) {
        // XXX lomax@jxta.org
        // To be implemented
        return true;
    }
    
    // For the time being, just decrement the TTL is check if it
    // still strictly positive.
    private boolean isToBePropagated(WireHeader header) {
        
        header.setTTL(header.getTTL() - 1);
        if (header.getTTL() < 1) {
            return false;
        }
        return true;
    }
    
    private synchronized Message localDemux(Message msg) {
        
        
        Message tmpMsg = null;
        
        // Copy the message and queue it to all the queues
        
        for (int i = 0; i < inputs.size(); ++i) {
            try {
                EndpointReceiveQueue queue = (EndpointReceiveQueue) inputs.elementAt(i);
                if (queue == null) {
                    continue;
                }
                tmpMsg = (Message) msg.clone();
                queue.push(tmpMsg);
            } catch (Exception e) {
                if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("localDemux failed with " + e);
                continue;
            }
        }
        // Fire message events on all the listeners
        for (int i = 0; i < pipemsglisteners.size(); ++i) {
            try {
                PipeMsgListener listener = (PipeMsgListener) pipemsglisteners.elementAt(i);
                if (listener == null) {
                    continue;
                }
                tmpMsg = (Message) msg.clone();
                PipeMsgEvent event = new PipeMsgEvent(this, tmpMsg);
                listener.pipeMsgEvent( event );
            } catch (Exception e) {
                if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("localDemux failed with " + e);
                continue;
            }
        }
        
        return msg;
    }
    
    private void addLocalPeer(WireHeader header) {
        
        Vector peers = header.getPeers();
        if (peers == null) {
            peers = new Vector();
        }
        
        String peerId = null;
        for (int i = 0; i < peers.size(); ++i) {
            try {
                peerId = (String) peers.elementAt(i);
                if (peerId.equals(localPeerId)) {
                    return;
                }
            } catch (Exception e) {
                if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("addLocalPeer failed with " + e);
            }
        }
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Adding local peer id into the header");
        peers.addElement(localPeerId);
        header.setPeers(peers);
    }
    
    private void propagate(Message msg, WireHeader header) {
        
        if (LOG.isEnabledFor(Priority.INFO)) LOG.info("propagate starts");
        // Add, if necessary, the local peer into the peers lists
        // Check if the local peer is already in the list. Otherwise
        // add it.
        addLocalPeer(header);
        
        // XXX lomax@jxta.org
        // This implementation directely gives the message to propagate
        // to the rendezvous. This can be a "semi-blocking" operation
        // (read slow).
        // A better implementation would be to simply queue the message
        // and have a background thread actually sending the message.
        // To do.
        
        try {
            // Push the header onto the message
            msg.addElement(msg.newMessageElement (WireTagName,
						  null,
						  header.getInputStream()));
            
            rendezvous.propagateInGroup(msg,
					WireName,
					wireId,
					7,
					null);
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("propagate failed with " + e);
        }
        return;
    }
    
    
    
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -