⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 resolverserviceimpl.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        catch(IOException e) {
            if (LOG.isEnabledFor(Priority.DEBUG) ) {
                LOG.debug("Error propagating query",e);
	    }
        }
    }
    
    
    private void resendQuery(Message msg, String srcPeer) {
        
        if (rendezvous == null)
            return;
        
        if (!myGroup.isRendezvous()) {
            // We are not a Rendez vous peer. Just drop the message.
            return;
        }
        
        // Rebuild a query based on the existing one
        MessageElementEnumeration elements = msg.getElements();
        
        String tagString = null;
        
        Vector tmp = new Vector();
        ResolverQuery query = null;
        
        // We need to change the source peer value of each query.
        while (elements.hasMoreMessageElements()) {
            MessageElement el = elements.nextMessageElement();
            tagString=el.getName();
            if ( !tagString.endsWith(outQueNameShort) ) {
                // Tag for something else - just ignore
                continue;
            }
            
            try {
                query = new ResolverQuery(el.getStream());
                // Remove the element
                msg.removeElement(el);
                query.setSrc(srcPeer);
                tmp.addElement(query);
            }
            catch (Exception e) {
                continue;
            }
        }
        // At this point, all the resolver queries from the message have been
        // removed, updated, and stored into tmp. Put them back into the
        // new the message
        
        if (tmp.size() == 0) {
            // There was no query. Discard.
            return;
        }
        
        Message newMsg = endpoint.newMessage();
        
        for (int i = 0; i < tmp.size(); ++i) {
            
            try {
                query = (ResolverQuery) tmp.elementAt(i);
                newMsg.addElement(newMsg.newMessageElement(outQueName,
                                                           textXml,
							   (InputStream)((Document)
                                                           (query.getDocument(textXml))).getStream()));
            }
            catch (Exception e) {
                // Something weird happened. Discard.
                return;
            }
        }
        
        // Send the message
        try {
            rendezvous.propagateInGroup(newMsg,
                                        handlerName,
					outQueName,
					7,
					localPeerId);
        }
        catch(IOException e) {
            if (LOG.isEnabledFor(Priority.DEBUG) ) {
                LOG.debug("Error propagating resending query",e);
	    }
        }
    }
    
    
    /**
     * This method will be called by the endpoint or the rendezvous service
     * to handle query messages
     *
     * @param message
     */
    public void processIncomingMessage(Message message,
                                       EndpointAddress srcAddr,
				       EndpointAddress dstAddr) {
        
        MessageElementEnumeration elements;
        String tagString;
        
        if (LOG.isEnabledFor(Priority.DEBUG))
            LOG.debug("demuxing a query");
        
        Message srcMsg = (Message) message.clone();
        
        elements = message.getElements();
        while (elements.hasMoreElements()) {
            MessageElement element = elements.nextMessageElement();
            tagString= element.getName();
            if ( !tagString.endsWith(outQueNameShort) ) {
                // Tag for something else - just ignore
                continue;
            }
            
            try {
                // Remove the element fromt the message
                message.removeElement(element);
                ResolverQuery doc =
                new ResolverQuery(element.getStream());
                
                ResolverResponseMsg res =
                (ResolverResponseMsg) processQuery((ResolverQueryMsg)doc);
                
                if (res != null) {
                    // There is a response to be sent
                    respond(doc.getSrc(),
                            handlerName,
			    inQueName,
			    inQueName,
			    (InputStream)((Document) (res.getDocument(textXml))).getStream());
                }
                // This ResolverService Service allow to re-propagate a request even if
                // a local answer has been found.
                
                propagateQuery(srcMsg);
                
            } catch (ResendQueryException e2) {
                // The resolver service has no response, but is interested to get
                // a response itself. Resend the query, but just like if that
                // was the source of the query. The resolver service will be
                // reponsible for resending the response to the original sender.
                propagateQuery((Message) srcMsg.clone());
                resendQuery(srcMsg, localPeerId);
            }
            catch (NoResponseException e1) {
                // The resolver service has no response. Process the query to
                // see if there is something else to do.
                propagateQuery(srcMsg);
            }
            catch (DiscardQueryException e3) {
                // We are asked to discard this query.
                return;
            }
            catch (IOException e4) {
                // The query was invalid somehow. Just discard.
                if (LOG.isEnabledFor(Priority.DEBUG)) {
                    LOG.debug("query was malformed", e4);
		}
                return;
            }
        }
    }
    
    
    /**
     * Inner class to handle the Response queue for incoming responses
     *
     */
    
    class RecvDemux implements EndpointListener {
        
        public void processIncomingMessage(Message message,
                                           EndpointAddress srcAddr,
					   EndpointAddress dstAddr) {
            
            if (message == null ) {
                if (LOG.isEnabledFor(Priority.DEBUG))
                    LOG.debug("RecvDemux.demux: got a null message");
                return;
            }
            if (LOG.isEnabledFor(Priority.DEBUG))
                LOG.debug("demuxing a response ");
            
            MessageElementEnumeration elements = message.getElements();
            while (elements.hasMoreElements()) {
                MessageElement el = elements.nextMessageElement();
                String tagString=el.getName();
                if (! tagString.endsWith(inQueNameShort) ) {
                    // Tag for something else - just ignore
                    continue;
                }
                try {
                    // Remove the element from the message
                    message.removeElement(el);
                    ResolverResponse resp = new ResolverResponse(el.getStream());
                    processResponse(resp);
                }
                catch (IOException e) {
                    if (LOG.isEnabledFor(Priority.DEBUG))
                        LOG.debug("Ill formatted resolver response, ignoring.", e );
                }
            }
        }
    }
    
    
    /**
     * Process the message and respond
     * @return ResolverResponseMsg response
     * @param query
     * @throws NoResponseException
     */
    
    public ResolverResponseMsg processQuery(ResolverQueryMsg query)
            throws NoResponseException, 
	           ResendQueryException,
		   DiscardQueryException,
		   IOException {
        
        if (LOG.isEnabledFor(Priority.DEBUG))
            LOG.debug("handing a query to  "+((ResolverQuery)query).getHandlerName());
        
        QueryHandler theHandler = getHandler( ((ResolverQuery)query).getHandlerName());
        
        if ( theHandler == null) {
            if (LOG.isEnabledFor(Priority.DEBUG))
                LOG.debug("throwing away "+((ResolverQuery)query).getHandlerName());
            return null;
        }
        if (LOG.isEnabledFor(Priority.DEBUG))
            LOG.debug("handing a query to  "+((ResolverQuery)query).getHandlerName());
        ResolverResponseMsg result;
        
        result = theHandler.processQuery(query);
        
        return((ResolverResponseMsg) result);
    }
    
    
    /**
     * push Response back to the handler
     * @return ResolverResponseMsg response
     * @param query
     * @throws NoResponseException
     */
    
    public void processResponse(ResolverResponseMsg resp) {
	
    String handlerName = ((ResolverResponseMsg)resp).getHandlerName();
        if (LOG.isEnabledFor(Priority.DEBUG)) {
	    LOG.debug("pushing a response to  " + handlerName );
	}
	QueryHandler theHandler = getHandler( handlerName );
	theHandler.processResponse(resp);
    }
    
    
    /**
     * Send a response to a peer.
     * This is used by resolver services that are handling query/response in
     * an asynchronous way, which is the case of Rendezvous peers.
     *
     * @param destPeer is the destination of the response
     * @param response is the response to be sent
     */
    public void sendResponse(String destPeer, ResolverResponseMsg response) {
        
        if (destPeer == null) {
            propagateResponse(response);
        }
        else {
            try {
                respond(destPeer,
                        handlerName,
			inQueName,
			inQueName,
			(InputStream)((Document)(response.getDocument(textXml))).getStream());
            }
            catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG))
                LOG.debug("error in sending response", e );
            }
        }
    }
    
    private void propagateResponse(ResolverResponseMsg response) {
        
        if (rendezvous == null)
            return;
        
        Message propagateMsg = endpoint.newMessage();
        
        try {
            propagateMsg.addElement(
	                propagateMsg.newMessageElement(inQueName,
                                                       textXml,
                                                       (InputStream)((Document)
						       (response.getDocument(textXml))).getStream()));
            
            rendezvous.propagateInGroup(propagateMsg,
                                        handlerName,
					inQueName,
					1,
					null);
            
        }
        catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG))
                LOG.debug( "failure on propagateResponse", e);
        }
        
    }
    
    /**
     * respond with a ResolverResponse Message
     * @param address
     * @param tagName
     * @param response
     * @throws RuntimeException
     */
    private void respond(String destPeer,
                         String pName,
			 String pParam,
			 String tagName,
			 InputStream response)  throws RuntimeException {
        
        if (LOG.isEnabledFor(Priority.DEBUG))
            LOG.debug("destPeer :\n"+destPeer);
        if (LOG.isEnabledFor(Priority.DEBUG))
            LOG.debug("responding to "+pName+"  "+pParam+" "+tagName);
        if (response == null) {
            throw new RuntimeException("Attempting to respond with a empty message");
        }
        
        Message msg = endpoint.newMessage();
        try {
            msg.addElement( msg.newMessageElement(tagName,
            null,
            response));
            // FIXME: we should make sure that it will not be re-propagated
            // To do that we'd need to add a propagation header, which
            // the rendezvous service does when propagating.
            // However, at this time there is no unicast method
            // among the rendezvous propagation service...
            //
            // updatePropHeader(msg, 1);
        }
        catch (Exception ez1) {
            // Not much we can do
            if (LOG.isEnabledFor(Priority.DEBUG))
                LOG.debug("Exception ", ez1);
        }
        
        EndpointMessenger messenger = null;
        try {
            messenger = endpoint.getMessenger(mkAddress(destPeer,
                                              pName,
					      pParam));
            
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG))
                LOG.debug( "endpoint.getMessenger failed to get messenger", e );
            return;
        }
        
        if (messenger == null) {
            if (LOG.isEnabledFor(Priority.DEBUG))
                LOG.debug( "endpoint.getMessenger failed to get messenger");
            return;
        }
        
        try {
            messenger.sendMessage(msg);
        }
        catch (IOException e) {
            if (LOG.isEnabledFor(Priority.DEBUG))
                LOG.debug( "messenger.sendMessage failed to send", e );
            throw new RuntimeException( "messenger.sendMessage failed to send" );
        }
    }
    
    /**
     * init is called by the group running this instance
     * @param g
     * @throws PeerGroupException
     */
    
    public void init(PeerGroup g, ID assignedID, Advertisement impl)
    throws PeerGroupException {
        
        implAdvertisement     = (ModuleImplAdvertisement) impl;
        
        myGroup           = g;
        endpoint          = g.getEndpointService();
        localPeerId       = g.getPeerID().toString();
        handlerName       = assignedID.toString();
        String uniqueStr  = (g.getPeerGroupID().getUniqueValue()).toString();
        
        outQueName        = uniqueStr + outQueNameShort;
        inQueName         = uniqueStr + inQueNameShort;
        localPeerId       = g.getPeerID().toString();
        recvMux           = new RecvDemux();
        
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -