📄 bayeuxclient.java
字号:
/* * (non-Javadoc) * * @see * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete() */ protected void onResponseComplete() throws IOException { super.onResponseComplete(); try { synchronized (_outQ) { startBatch(); _push = null; } if (getResponseStatus() == 200 && _responses != null && _responses.length > 0) { for (int i = 0; i < _responses.length; i++) { MessageImpl msg = (MessageImpl)_responses[i]; deliver(null,msg); if (Bayeux.META_DISCONNECT.equals(msg.getChannel())&&msg.isSuccessful()) { if (isStarted()) { try{stop();}catch(Exception e){Log.ignore(e);} } break; } } } else { Log.warn("Publish, error=" + getResponseStatus()); } } finally { endBatch(); } recycle(); } /* ------------------------------------------------------------ */ protected void onExpire() { super.onExpire(); metaPublishFail(null,this.getOutboundMessages()); if (_disconnecting) { try{stop();}catch(Exception e){Log.ignore(e);} } } /* ------------------------------------------------------------ */ protected void onConnectionFailed(Throwable ex) { super.onConnectionFailed(ex); metaPublishFail(ex,this.getOutboundMessages()); if (_disconnecting) { try{stop();}catch(Exception e){Log.ignore(e);} } } /* ------------------------------------------------------------ */ protected void onException(Throwable ex) { super.onException(ex); metaPublishFail(ex,this.getOutboundMessages()); if (_disconnecting) { try{stop();}catch(Exception e){Log.ignore(e);} } } } /* ------------------------------------------------------------ */ public void addListener(ClientListener listener) { synchronized (_inQ) { boolean added=false; if (listener instanceof MessageListener) { added=true; if (_mListeners == null) _mListeners = new ArrayList<MessageListener>(); _mListeners.add((MessageListener)listener); } if (listener instanceof RemoveListener) { added=true; if (_rListeners == null) _rListeners = new ArrayList<RemoveListener>(); _rListeners.add((RemoveListener)listener); } if (!added) throw new IllegalArgumentException(); } } /* ------------------------------------------------------------ */ public void removeListener(ClientListener listener) { synchronized (_inQ) { if (listener instanceof MessageListener) { if (_mListeners != null) _mListeners.remove((MessageListener)listener); } if (listener instanceof RemoveListener) { if (_rListeners != null) _rListeners.remove((RemoveListener)listener); } } } /* ------------------------------------------------------------ */ public int getMaxQueue() { return -1; } /* ------------------------------------------------------------ */ public Queue<Message> getQueue() { return _inQ; } /* ------------------------------------------------------------ */ public void setMaxQueue(int max) { if (max != -1) throw new UnsupportedOperationException(); } /* ------------------------------------------------------------ */ /** * Send the exchange, possibly using a backoff. * * @param exchange * @param backoff * if true, use backoff algorithm to send * @return */ protected boolean send(final Exchange exchange, final boolean backoff) { long interval = (_advice != null?_advice.getInterval():0); if (backoff) { int retries = exchange.getBackoffRetries(); if (Log.isDebugEnabled()) Log.debug("Send with backoff, retries=" + retries + " for " + exchange); if (retries >= _backoffMaxRetries) return false; exchange.incBackoffRetries(); interval += (retries * _backoffInterval); } if (interval > 0) { TimerTask task = new TimerTask() { public void run() { try { send(exchange); } catch (IOException e) { Log.warn("Delayed send, retry: "+e); Log.debug(e); send(exchange,true); } } }; if (Log.isDebugEnabled()) Log.debug("Delay " + interval + " send of " + exchange); _timer.schedule(task,interval); } else { try { send(exchange); } catch (IOException e) { Log.warn("Send, retry on fail: "+e); Log.debug(e); return send(exchange,true); } } return true; } /* ------------------------------------------------------------ */ /** * Send the exchange. * * @param exchange * @throws IOException */ protected void send(HttpExchange exchange) throws IOException { exchange.reset(); // ensure at start state customize(exchange); if (Log.isDebugEnabled()) Log.debug("Send: using any connection=" + exchange); _httpClient.send(exchange); // use any connection } /* ------------------------------------------------------------ */ /** * False when we have received a success=false message in response to a * Connect, or we have had an exception when sending or receiving a Connect. * * True when handshake and then connect has happened. * * @param b */ protected void setInitialized(boolean b) { synchronized (_outQ) { _initialized = b; } } /* ------------------------------------------------------------ */ protected boolean isInitialized() { return _initialized; } /* ------------------------------------------------------------ */ /** * Called with the results of a /meta/connect message * @param success connect was returned with this status */ protected void metaConnect(boolean success, Message message) { if (!success) Log.warn(this.toString()+" "+message.toString()); } /* ------------------------------------------------------------ */ /** * Called with the results of a /meta/handshake message * @param success connect was returned with this status * @param reestablish the client was previously connected. */ protected void metaHandshake(boolean success, boolean reestablish, Message message) { if (!success) Log.warn(this.toString()+" "+message.toString()); } /* ------------------------------------------------------------ */ /** * Called with the results of a failed publish */ protected void metaPublishFail(Throwable e, Message[] messages) { Log.warn(this.toString()+": "+e); Log.debug(e); } /* ------------------------------------------------------------ */ /** Called to extend outbound string messages. * Some messages are sent as preformatted JSON strings (eg handshake * and connect messages). This extendOut method is a variation of the * {@link #extendOut(Message)} method to efficiently cater for these * preformatted strings. * <p> * This method calls the {@link Extension}s added by {@link #addExtension(Extension)} * * @param msg * @return the extended message */ protected String extendOut(String msg) { if (_extensions==null) return msg; try { Message[] messages = _msgPool.parse(msg); for (int i=0; i<messages.length; i++) extendOut(messages[i]); if (messages.length==1 && msg.charAt(0)=='{') return _msgPool.getMsgJSON().toJSON(messages[0]); return _msgPool.getMsgJSON().toJSON(messages); } catch(IOException e) { Log.warn(e); return msg; } } /* ------------------------------------------------------------ */ /** Called to extend outbound messages * <p> * This method calls the {@link Extension}s added by {@link #addExtension(Extension)} * */ protected void extendOut(Message message) { if (_extensions!=null) { Message m = message; if (m.getChannel().startsWith(Bayeux.META_SLASH)) for (int i=0;m!=null && i<_extensions.length;i++) m=_extensions[i].sendMeta(this,m); else for (int i=0;m!=null && i<_extensions.length;i++) m=_extensions[i].send(this,m); if (message!=m) { message.clear(); if (m!=null) for (Map.Entry<String,Object> entry:m.entrySet()) message.put(entry.getKey(),entry.getValue()); } } } /* ------------------------------------------------------------ */ /** Called to extend inbound messages * <p> * This method calls the {@link Extension}s added by {@link #addExtension(Extension)} * */ protected void extendIn(Message message) { if (_extensions!=null) { Message m = message; if (m.getChannel().startsWith(Bayeux.META_SLASH)) for (int i=_extensions.length;m!=null && i-->0;) m=_extensions[i].rcvMeta(this,m); else for (int i=_extensions.length;m!=null && i-->0;) m=_extensions[i].rcv(this,m); if (message!=m) { message.clear(); if (m!=null) for (Map.Entry<String,Object> entry:m.entrySet()) message.put(entry.getKey(),entry.getValue()); } } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -