📄 bayeuxclient.java
字号:
// ========================================================================// Copyright 2006-20078 Mort Bay Consulting Pty. Ltd.// ------------------------------------------------------------------------// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at// http://www.apache.org/licenses/LICENSE-2.0// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.// ========================================================================package org.mortbay.cometd.client;import java.io.IOException;import java.net.URLEncoder;import java.util.ArrayList;import java.util.LinkedList;import java.util.List;import java.util.Map;import java.util.Queue;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.ConcurrentHashMap;import javax.servlet.http.Cookie;import org.cometd.Bayeux;import org.cometd.Client;import org.cometd.ClientListener;import org.cometd.Extension;import org.cometd.Message;import org.cometd.MessageListener;import org.cometd.RemoveListener;import org.mortbay.cometd.MessageImpl;import org.mortbay.cometd.MessagePool;import org.mortbay.component.AbstractLifeCycle;import org.mortbay.io.Buffer;import org.mortbay.io.ByteArrayBuffer;import org.mortbay.jetty.HttpHeaders;import org.mortbay.jetty.HttpSchemes;import org.mortbay.jetty.HttpURI;import org.mortbay.jetty.client.Address;import org.mortbay.jetty.client.ContentExchange;import org.mortbay.jetty.client.HttpClient;import org.mortbay.jetty.client.HttpExchange;import org.mortbay.log.Log;import org.mortbay.util.ArrayQueue;import org.mortbay.util.LazyList;import org.mortbay.util.QuotedStringTokenizer;import org.mortbay.util.ajax.JSON;/* ------------------------------------------------------------ *//** * Bayeux protocol Client. * <p> * Implements a Bayeux Ajax Push client as part of the cometd project. * <p> * The HttpClient attributes are used to share a Timer and MessagePool instance * between all Bayeux clients sharing the same HttpClient. * * @see http://cometd.org * @author gregw * */public class BayeuxClient extends AbstractLifeCycle implements Client{ private final static String __TIMER="org.mortbay.cometd.client.Timer"; private final static String __JSON="org.mortbay.cometd.client.JSON"; private final static String __MSGPOOL="org.mortbay.cometd.MessagePool"; protected HttpClient _httpClient; protected MessagePool _msgPool; private ArrayQueue<Message> _inQ = new ArrayQueue<Message>(); // queue of incoming messages private ArrayQueue<Message> _outQ = new ArrayQueue<Message>(); // queue of outgoing messages protected Address _cometdAddress; private Exchange _pull; private Exchange _push; private String _path = "/cometd"; private boolean _initialized = false; private boolean _disconnecting = false; private boolean _handshook = false; private String _clientId; private org.cometd.Listener _listener; private List<RemoveListener> _rListeners; private List<MessageListener> _mListeners; private int _batch; private boolean _formEncoded; private Map<String, Cookie> _cookies = new ConcurrentHashMap<String, Cookie>(); private Advice _advice; private Timer _timer; private int _backoffInterval = 1000; private int _backoffMaxRetries = 60; // equivalent to 60 seconds private Buffer _scheme; protected Extension[] _extensions; protected JSON _jsonOut; /* ------------------------------------------------------------ */ public BayeuxClient(HttpClient client, String url) { this(client,url,null); } /* ------------------------------------------------------------ */ public BayeuxClient(HttpClient client, String url, Timer timer) { HttpURI uri = new HttpURI(url); _httpClient = client; _cometdAddress = new Address(uri.getHost(),uri.getPort()); _path=uri.getPath(); _timer = timer; _scheme = (HttpSchemes.HTTPS.equals(uri.getScheme()))?HttpSchemes.HTTPS_BUFFER:HttpSchemes.HTTP_BUFFER; } /* ------------------------------------------------------------ */ public BayeuxClient(HttpClient client, Address address, String path, Timer timer) { _httpClient = client; _cometdAddress = address; _path = path; _timer = timer; } /* ------------------------------------------------------------ */ public BayeuxClient(HttpClient client, Address address, String uri) { this(client,address,uri,null); } /* ------------------------------------------------------------ */ public void addExtension(Extension ext) { _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class); } /* ------------------------------------------------------------ */ Extension[] getExtensions() { return _extensions; } /* ------------------------------------------------------------ */ /** * If unable to connect/handshake etc, even if following the interval in the * advice, wait for this interval and try again, up to a maximum of * _backoffRetries * * @param interval */ public void setBackOffInterval(int interval) { _backoffInterval = interval; } /* ------------------------------------------------------------ */ public int getBackoffInterval() { return _backoffInterval; } /* ------------------------------------------------------------ */ public void setBackoffMaxRetries(int retries) { _backoffMaxRetries = retries; } /* ------------------------------------------------------------ */ public int getBackoffMaxRetries() { return _backoffMaxRetries; } /* ------------------------------------------------------------ */ /* * (non-Javadoc) Returns the clientId * * @see dojox.cometd.Client#getId() */ public String getId() { return _clientId; } /* ------------------------------------------------------------ */ protected void doStart() throws Exception { if (!_httpClient.isStarted()) throw new IllegalStateException("!HttpClient.isStarted()"); synchronized (_httpClient) { if (_jsonOut == null) { _jsonOut = (JSON)_httpClient.getAttribute(__JSON); if (_jsonOut==null) { _jsonOut = new JSON(); _httpClient.setAttribute(__JSON,_jsonOut); } } if (_timer == null) { _timer = (Timer)_httpClient.getAttribute(__TIMER); if (_timer==null) { _timer = new Timer(__TIMER+"@"+hashCode(),true); _httpClient.setAttribute(__TIMER,_timer); } } if (_msgPool == null) { _msgPool = (MessagePool)_httpClient.getAttribute(__MSGPOOL); if (_msgPool==null) { _msgPool = new MessagePool(); _httpClient.setAttribute(__MSGPOOL,_msgPool); } } } _disconnecting=false; _pull=null; _push=null; super.doStart(); synchronized (_outQ) { if (!_initialized && _pull == null) { _pull = new Handshake(); send((Exchange)_pull,false); } } } /* ------------------------------------------------------------ */ protected void doStop() throws Exception { if (!_disconnecting) disconnect(); super.doStop(); } /* ------------------------------------------------------------ */ public boolean isPolling() { synchronized (_outQ) { return isRunning() && (_pull != null); } } /* ------------------------------------------------------------ */ /** * (non-Javadoc) */ public void deliver(Client from, Message message) { if (!isRunning()) throw new IllegalStateException("Not running"); synchronized (_inQ) { if (_mListeners == null) _inQ.add(message); else { for (MessageListener l : _mListeners) l.deliver(from,this,message); } } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#deliver(dojox.cometd.Client, java.lang.String, * java.lang.Object, java.lang.String) */ public void deliver(Client from, String toChannel, Object data, String id) { if (!isRunning()) throw new IllegalStateException("Not running"); MessageImpl message = _msgPool.newMessage(); message.put(Bayeux.CHANNEL_FIELD,toChannel); message.put(Bayeux.DATA_FIELD,data); if (id != null) message.put(Bayeux.ID_FIELD,id); synchronized (_inQ) { if (_mListeners == null) { message.incRef(); _inQ.add(message); } else { for (MessageListener l : _mListeners) l.deliver(from,this,message); } } message.decRef(); } /* ------------------------------------------------------------ */ /** * @deprecated */ public org.cometd.Listener getListener() { synchronized (_inQ) { return _listener; } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#hasMessages() */ public boolean hasMessages() { synchronized (_inQ) { return _inQ.size() > 0; } } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#isLocal() */ public boolean isLocal() { return false; } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see dojox.cometd.Client#subscribe(java.lang.String) */ private void publish(MessageImpl msg) { msg.incRef(); synchronized (_outQ) { _outQ.add(msg); if (_batch == 0 && _initialized && _push == null) { _push = new Publish(); try { send(_push); } catch (Exception e) { metaPublishFail(e,((Publish)_push).getOutboundMessages()); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -