hmtpclient.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 697 行

JAVA
697
字号
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT.  See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * *   Free Software Foundation, Inc. *   59 Temple Place, Suite 330 *   Boston, MA 02111-1307  USA * * @author Scott Ferguson */package com.caucho.hmtp;import com.caucho.bam.BamQueryCallback;import com.caucho.bam.BamStream;import com.caucho.bam.BamError;import com.caucho.bam.BamConnection;import com.caucho.hessian.io.*;import java.io.*;import java.net.*;import java.util.*;import java.util.concurrent.*;import java.util.logging.*;/** * HMTP client protocol */public class HmtpClient implements BamConnection {  private static final Logger log    = Logger.getLogger(HmtpClient.class.getName());  private String _url;  private String _scheme;  private String _host;  private int _port;  private String _path;    private InetAddress _address;    private String _to;  protected Socket _s;  protected InputStream _is;  protected OutputStream _os;  private ClientBrokerStream _clientStream;  private String _jid;  private BamStream _streamHandler;  private HashMap<Long,QueryItem> _queryMap    = new HashMap<Long,QueryItem>();      private long _qId;  private boolean _isFinest;  public HmtpClient(String url)  {    _url = url;    parseURL(url);    _isFinest = log.isLoggable(Level.FINEST);  }  protected void parseURL(String url)  {    int p = url.indexOf("://");    if (p < 0)      throw new IllegalArgumentException("URL '" + url + "' is not well-formed");    _scheme = url.substring(0, p);    url = url.substring(p + 3);        p = url.indexOf("/");    if (p >= 0) {      _path = url.substring(p);      url = url.substring(0, p);    }    else {      _path = "/";    }    p = url.indexOf(':');    if (p > 0) {      _host = url.substring(0, p);      _port = Integer.parseInt(url.substring(p + 1));    }    else {      _host = url;      if ("https".equals(_scheme))	_port = 443;      else	_port = 80;    }  }  public String getHost()  {    return _host;  }  public int getPort()  {    return _port;  }  public void connect()    throws IOException  {    if (_s != null)      throw new IllegalStateException(this + " is already connected");    openSocket(_host, _port);    print(_os, "POST " + _path + " HTTP/1.1\r\n");    print(_os, "Host: " + _to + ":" + _port + "\r\n");    print(_os, "Upgrade: HMTP/0.9\r\n");    print(_os, "Content-Length: 0\r\n");    print(_os, "\r\n");    _os.flush();    String result;    result = readLine(_is);    if (result.startsWith("HTTP/1.1 101")) {      if (log.isLoggable(Level.FINE))	log.fine(this + " " + result);            while (! (result = readLine(_is)).trim().equals("")) {	if (log.isLoggable(Level.FINE))	  log.fine(this + " " + result);      }      _clientStream = new ClientBrokerStream(_is, _os);      executeThread(new ClientAgentStream(this));    }    else {      if (log.isLoggable(Level.FINE))	log.fine(this + " " + result);            throw new IOException("Unexpected result: " + result);    }  }  protected void executeThread (Runnable r)  {    Thread thread = new Thread(r);    thread.setDaemon(true);    thread.start();  }        protected void openSocket(String host, int port)    throws IOException  {    _s = new Socket(_host, _port);    InputStream is = _s.getInputStream();    OutputStream os = _s.getOutputStream();        _os = new BufferedOutputStream(os);    _is = new BufferedInputStream(is);  }  protected void print(OutputStream os, String s)    throws IOException  {    int len = s.length();    for (int i = 0; i < len; i++)      os.write(s.charAt(i));  }  protected String readLine(InputStream is)    throws IOException  {    StringBuilder sb = new StringBuilder();    int ch;    while ((ch = is.read()) >= 0 && ch != '\n') {      sb.append((char) ch);    }    return sb.toString();  }  /**   * Login to the server   */  public void login(String uid, String password)  {    try {      AuthResult result;      result = (AuthResult) querySet("", new AuthQuery(uid, password));      _jid = result.getJid();    } catch (Exception e) {      throw new RuntimeException(e);    }  }  Hessian2StreamingInput getStreamingInput()  {    return _clientStream.getStreamingInput();  }  /**   * Returns the jid   */  public String getJid()  {    return _jid;  }  /**   * Sets the message handler   */  public void setStreamHandler(BamStream handler)  {    _streamHandler = handler;  }  /**   * Gets the message listener   */  public BamStream getStreamHandler()  {    return _streamHandler;  }  /**   * Returns the client stream   */  public BamStream getBrokerStream()  {    return _clientStream;  }  /**   * Sends a message to a given jid   */  public void message(String to, Serializable value)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        stream.message(to, null, value);  }  /**   * Sends a presence packet to the server   */  public void presence(Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presence(null, null, data);  }  /**   * Sends a presence packet to the server   */  public void presence(String to, Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presence(to, null, data);  }  /**   * Sends a presence packet to the server   */  public void presenceUnavailable(Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presenceUnavailable(null, null, data);  }  /**   * Sends a presence packet to the server   */  public void presenceUnavailable(String to, Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presenceUnavailable(to, null, data);  }  /**   * Sends a presence probe packet to the server   */  public void presenceProbe(String to, Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presenceProbe(to, null, data);  }  /**   * Sends a presence subscribe packet to the server   */  public void presenceSubscribe(String to, Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presenceSubscribe(to, null, data);  }  /**   * Sends a presence subscribed packet to the server   */  public void presenceSubscribed(String to, Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presenceSubscribed(to, null, data);  }  /**   * Sends a presence subscribe packet to the server   */  public void presenceUnsubscribe(String to, Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presenceUnsubscribe(to, null, data);  }  /**   * Sends a presence subscribed packet to the server   */  public void presenceUnsubscribed(String to, Serializable data)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presenceUnsubscribed(to, null, data);  }  /**   * Sends a presence packet to the server   */  public void presenceError(String to, Serializable data, BamError error)  {    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.presenceError(to, null, data, error);  }  /**   * Sends a query-set packet to the server   */  public Serializable queryGet(String to,			       Serializable query)  {    WaitQueryCallback callback = new WaitQueryCallback();    queryGet(to, query, callback);    if (callback.waitFor())      return callback.getResult();    else      throw new RuntimeException(String.valueOf(callback.getError()));  }  /**   * Sends a query-get packet to the server   */  public void queryGet(String to,		       Serializable value,		       BamQueryCallback callback)  {    long id;          synchronized (this) {      id = _qId++;      _queryMap.put(id, new QueryItem(id, callback));    }    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        _clientStream.queryGet(id, to, null, value);  }  /**   * Sends a query-set packet to the server   */  public Serializable querySet(String to,			       Serializable query)  {    WaitQueryCallback callback = new WaitQueryCallback();    querySet(to, query, callback);    if (callback.waitFor())      return callback.getResult();    else      throw new RuntimeException("No valid return for " + query				 + "\n" + callback.getError());  }  /**   * Sends a query-set packet to the server   */  public void querySet(String to,		       Serializable value,		       BamQueryCallback callback)  {    long id;          synchronized (this) {      id = _qId++;      _queryMap.put(id, new QueryItem(id, callback));    }    BamStream stream = _clientStream;    if (stream == null)      throw new IllegalStateException("connection is closed");        stream.querySet(id, to, null, value);  }  /**   * Callback for the response   */  void onQueryResult(long id, String to, String from, Serializable value)  {    QueryItem item = null;        synchronized (this) {      item = _queryMap.remove(id);    }    if (item != null)      item.onQueryResult(to, from, value);  }  /**   * Callback for the response   */  void onQueryError(long id,		    String to,		    String from,		    Serializable value,		    BamError error)  {    QueryItem item = null;        synchronized (this) {      item = _queryMap.remove(id);    }    if (item != null)      item.onQueryError(to, from, value, error);  }  /**   * Low-level query response   */  /*  public void queryResult(long id, String to, Serializable value)    throws IOException  {    _clientStream.queryResult(id, to, null, value);  }  */  /**   * Low-level query error   */  /*  public void queryError(long id,			 String to,			 Serializable value,			 BamError error)  {    _clientStream.queryError(id, to, null, value, error);  }  */  public void flush()    throws IOException  {    ClientBrokerStream stream = _clientStream;    if (stream != null)      stream.flush();  }  public boolean isClosed()  {    return _clientStream == null;  }  public void close()  {    if (log.isLoggable(Level.FINE))      log.fine(this + " close");        try {      Socket s;      InputStream is;      OutputStream os;      ClientBrokerStream stream;            synchronized (this) {	s = _s;	_s = null;		is = _is;	_is = null;	stream = _clientStream;	_clientStream = null;		os = _os;	_os = null;      }      if (stream != null)	stream.close();      if (os != null) {	try { os.close(); } catch (IOException e) {}      }      if (is != null) {	is.close();      }      if (s != null) {	s.close();      }    } catch (Exception e) {      log.log(Level.WARNING, e.toString(), e);    }  }  @Override  public String toString()  {    return getClass().getSimpleName() + "[" + _jid + "," + _url + "]";  }  @Override  protected void finalize()  {    close();  }  static class QueryItem {    private final long _id;    private final BamQueryCallback _callback;    QueryItem(long id, BamQueryCallback callback)    {      _id = id;      _callback = callback;    }    void onQueryResult(String to, String from, Serializable value)    {      if (_callback != null)	_callback.onQueryResult(to, from, value);    }    void onQueryError(String to,		      String from,		      Serializable value,		      BamError error)    {      if (_callback != null)	_callback.onQueryError(to, from, value, error);    }        @Override    public String toString()    {      return getClass().getSimpleName() + "[" + _id + "," + _callback + "]";    }  }  static class WaitQueryCallback implements BamQueryCallback {    private Serializable _result;    private BamError _error;    private boolean _isResult;    public Serializable getResult()    {      return _result;    }        public BamError getError()    {      return _error;    }    boolean waitFor()    {      try {	synchronized (this) {	  if (! _isResult)	    this.wait(10000);	}      } catch (Exception e) {	log.log(Level.FINE, e.toString(), e);      }      return _isResult;    }        public void onQueryResult(String fromJid, String toJid,			      Serializable value)    {      _result = value;      synchronized (this) {	_isResult = true;	notifyAll();      }    }      public void onQueryError(String fromJid, String toJid,			     Serializable value, BamError error)    {      _error = error;      synchronized (this) {	_isResult = true;	notifyAll();      }    }  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?