⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 httptransport.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        String fn = cm.getOldestFile(dn);
        
        if (fn == null) {
            // No more messages in the spooler for that peer
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("No messages for client");
            if (!blocking) {
                return HttpMessage.EMPTYMESSAGE;
            } else {
                // if we are blocking then we return no message at all. the
                // client will simply have to wait until something happens.
                return null;
            }
        }
        
        // There is a message to be sent in the spooler. Send it.
        // For the time being, just return a message at the time.
        // Maybe a good optimization would be to return a bunch
        // of them...
        // XXX: to revist. lomax@jxta.org
        
        byte[] buffer = (new HttpMessage(dn, fn)).getBytes();
        
        // Remove the file.
        // XXX: maybe it would be wise to wait for an OK from the other
        // side before removing the file, but that might be too heavy.
        // To be revisited. lomax@jxta.org
        
        try {
            cm.remove(dn, fn);
        } catch( IOException failed ) {
            if (LOG.isEnabledFor(Priority.WARN)) LOG.warn( "failed to remove cm file : " + dn +
            File.separatorChar + fn );
        }
        
        try {
	    client = server.getClientConnection(clientId);

            if (client != null) {
                Vector buffers = new Vector(1);
                buffers.addElement(buffer);
                client.sendToClient( buffers, buffer.length );
            } else {
                Vector buffers = new Vector();
                buffers.addElement(buffer);
                sendResponse(outputStream, buffers, buffer.length);
                closeSocket(inputSocket, inputStream, outputStream);
            }
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.WARN)) LOG.warn( "couldnt send reply?" );
            return null;
        }
        
        return null;
    }
    
    /**
     *  Given a serialized message stored in a string create a message object
     *  and then provide it to the endpoint for processing.
     *
     *  @param data the message in wire format.
     *  @return HttpMessage message indicating the result of the processing.
     *
     **/
    private HttpMessage processIncoming(String data) {
        
        try {
            byte[] buffer = data.getBytes();
            ByteArrayInputStream bais =  new ByteArrayInputStream(buffer);
            Message msg = endpoint.newMessage();
            MessageWireFormatFactory.newMessageWireFormat(
            new MimeMediaType( "text/xml" ) ).readMessage(bais, msg);
            
            endpoint.demux(msg);
        } catch (Exception ignored) {
            // Just discard message
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("failure processing incoming", ignored );
            return HttpMessage.FAILEDMESSAGE;
        }
        
        return HttpMessage.OKMESSAGE;
    }
    
    /**
     * Creates and return an EndpointMessenger
     */
    public EndpointMessenger getMessenger( EndpointAddress dst)
    throws IOException {
        
	String tmp = dst.getProtocolAddress();
	if ((tmp.length() > MagicWord.length()) &&
	    (tmp.substring(0, MagicWord.length()).equals(MagicWord))) {
	    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   getting a server messenger");
	    return new HttpServerMessenger( dst, this);
	} else {
	    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("    getting a client messenger");
	    return new HttpNonBlockingMessenger(dst, this);
	}
    }
    
    /**
     * Propagates a TransportMessage on this TransportProtocol
     *
     * @param msg the TransportMessage to be propagated
     */
    public void propagate(Message message,
    String pName,
    String pParams,
    String prunePeer) throws IOException {
        
        // HttpTransport does not implement propagate
        return;
    }
    
    private void startPolling() {
        
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("startPolling");
        
        // Sanity check
        if (pollingThread != null) {
            pollingThread.setPolling( true );
            return;
        }
        
        // Polling means that we are a client
        publicAddress = publicClientAddress;
        
        // Create listening Thread
        pollingThread = new HttpClient( myThreadGroup, this );
        pollingThread.start();
    }
    
    private void stopPolling() {
        
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("stopPolling");
        
        // Sanity check
        if (pollingThread == null) {
            return;
        }
        
        // tell the thread to quit
        pollingThread.endPolling();
        
        pollingThread = null;
        
        // We stopped polling, maybe are we server now
        publicAddress = publicServerAddress;
    }
    
    /**
     *  Poll the remote addr for messages for this peer.
     *
     *  @param addr URL of the remote server to poll
     *  @param type type of polling to perform, blocking or non-blocking
     *  @return boolean true if a message was received.
     **/
    boolean pollRemote( URL addr, int type ) {
        
        HttpMessage httpMsg = null;
        byte[] buffer = null;
        String content = null;
        
        try {
            content = sendHttpGET( addr, type, true );
            if (content == null) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   no message to poll (no reponse)");
                return false;
            }
            
            httpMsg = new HttpMessage(content);
            
            if ( HttpMessage.Empty == httpMsg.getCode() ) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   no message to poll(by server code)");
                return false;
            }
            
            buffer = httpMsg.getData();
            if (buffer == null) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   no message to poll(empty msg body)");
                return false;
            }
            
            ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
            Message msg = endpoint.newMessage();
            MessageWireFormatFactory.newMessageWireFormat(
            new MimeMediaType( "text/xml" ) ).readMessage(bais, msg);
            
            endpoint.demux(msg);
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("   polled one message");
            return true;
        } catch (Exception e) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("pollRemote failed", e);
            return false;
        }
    }
    
    /**
     *  register this client with the server named by the string authority
     *
     *  @param  authority   the server this client should try to register with
     *  @return    boolean true if the register was acknowledged otherwise
     *   false
     **/
    private boolean registerWithRemote( String authority ) {
        
        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Register with Remote : " + authority );
        
        try {
            String host = authority;
            int port = 80;
            
            int sepAt = authority.lastIndexOf( ':' );
            if( -1 != sepAt ) {
                host = authority.substring(0, sepAt );
                port = Integer.parseInt( authority.substring(sepAt + 1) );
            }
            
            URL addr = new URL( "http", host, port, "/reg/" + localClientId
                                + "/" );
            
            String result = sendHttpGET( addr, PollingSend, true );
            
            if (result != null) {
                int code  = (new HttpMessage( result )).getCode();
                if( HttpMessage.Ok == code ) {
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "register with remote succeeded" );
                    return true;
                } else {
                    if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "register with remote failed with code : " + code );
                }
            }
            else {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "register with remote got no response" );
                
                // FIXME 20010910 bondolo@jxta.org This occurs when we are
                // talking to an old server.
                return true;
            }
        } catch (IOException e) {
            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "register with remote failed for some reason", e );
        }
        
        return false;
    }
    
    /**
     *  Read a message from the input stream.
     *
     *  @param ip   the InputStream to read from.
     *  @param blocking read until until EOF
     *  @param String the http response or null if none could retrieved.
     **/
    private String getResult( InputStream ip, boolean blocking ) {
        
        final int   MAXRETRIES = 8;
        final int   RETRYINTERVAL = 2 * 1000;
        
        String tmp = "";
        byte[] buffer = null;
        
        if (!blocking) {
            // non-blocking receive. We never block on I/O unless available() lies
            try {
                int    cnt = 0;
                
                while (true) {
                    int avail = ip.available();
                    
                    // are we at the end of the stream?
                    if (ip.available() == -1) {
                        // we never saw the tags we were looking for
                        if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult:EOF before we found the message end" );
                        return null;
                    }
                    
                    if (ip.available() == 0) {
                        try {
                            if (++cnt > MAXRETRIES ) {
                                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult:Retry count exceeded. Had " +
                                    tmp.length() + " bytes" );
                                return null;
                            }
                            Thread.sleep( RETRYINTERVAL );
                        } catch (InterruptedException e) {
                            // if we are woken prematurely, give up
                            if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult:Thread interrupted, giving up" );
                            Thread.interrupted();
                            return null;
                        }
                        continue;
                    }
                    
                    buffer = new byte [avail];
                    int got = ip.read(buffer);
                    
                    // if we didnt read anything, just loop.
                    if (got == 0)
                        continue;
                    
                    // reset the retry. we are still being fed
                    cnt = 0;
                    
                    //if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult : non-blocking read of " + got + " bytes" );
                    
                    tmp += new String(buffer, 0, got);
                    
                    // are we done?
                    if (tmp.startsWith("GET")) {
                        break;
                    }
                    
                    // are we done?
                    if (tmp.lastIndexOf("</HTML>") >= 0) {
                        break;
                    }
                }
            } catch (Exception e) {
                if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult:caught exception, giving up.", e );
                return null;
            }
        } else {
            // blocking receive. We block on I/O
            // FIXME    20010918   bondolo@jxta.org The way in which we termintate
            // waiting is not really appropriate. What we should be doing here is
            // read the http 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -