📄 bayeuxclient.java
字号:
} setJson(_json); if (!send(this,backoff)) Log.warn("Retries exhausted"); // giving up } /* ------------------------------------------------------------ */ protected void recycle() { if (_responses!=null) for (Message msg:_responses) if (msg instanceof MessageImpl) ((MessageImpl)msg).decRef(); _responses=null; } } /* ------------------------------------------------------------ */ /** * The Bayeux handshake exchange. Negotiates a client Id and initializes the * protocol. * */ protected class Handshake extends Exchange { public final static String __HANDSHAKE = "[{" + "\"channel\":\"/meta/handshake\"," + "\"version\":\"0.9\"," + "\"minimumVersion\":\"0.9\"" + "}]"; Handshake() { super("handshake"); setMessage(__HANDSHAKE); } /* ------------------------------------------------------------ */ /* * (non-Javadoc) * * @see * org.mortbay.cometd.client.BayeuxClient.Exchange#onResponseComplete() */ protected void onResponseComplete() throws IOException { super.onResponseComplete(); if (!isRunning()) return; if (_disconnecting) { Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure","expired"); metaHandshake(false,false,error); try{stop();}catch(Exception e){Log.ignore(e);} return; } if (getResponseStatus() == 200 && _responses != null && _responses.length > 0) { MessageImpl response = (MessageImpl)_responses[0]; boolean successful = response.isSuccessful(); // Get advice if there is any Map adviceField = (Map)response.get(Bayeux.ADVICE_FIELD); if (adviceField != null) _advice = new Advice(adviceField); if (successful) { _handshook = true; if (Log.isDebugEnabled()) Log.debug("Successful handshake, sending connect"); _clientId = (String)response.get(Bayeux.CLIENT_FIELD); metaHandshake(true,_handshook,response); _pull = new Connect(); send(_pull,false); } else { metaHandshake(false,false,response); _handshook = false; if (_advice != null && _advice.isReconnectNone()) throw new IOException("Handshake failed with advice reconnect=none :" + _responses[0]); else if (_advice != null && _advice.isReconnectHandshake()) { _pull = new Handshake(); if (!send(_pull,true)) throw new IOException("Handshake, retries exhausted"); } else // assume retry = reconnect? { _pull = new Connect(); if (!send(_pull,true)) throw new IOException("Connect after handshake, retries exhausted"); } } } else { Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("status",new Integer(getResponseStatus())); error.put("content",getResponseContent()); metaHandshake(false,false,error); resend(true); } recycle(); } /* ------------------------------------------------------------ */ protected void onExpire() { // super.onExpire(); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure","expired"); metaHandshake(false,false,error); resend(true); } /* ------------------------------------------------------------ */ protected void onConnectionFailed(Throwable ex) { // super.onConnectionFailed(ex); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure",ex.toString()); error.put("exception",ex); ex.printStackTrace(); metaHandshake(false,false,error); resend(true); } /* ------------------------------------------------------------ */ protected void onException(Throwable ex) { // super.onException(ex); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure",ex.toString()); error.put("exception",ex); metaHandshake(false,false,error); resend(true); } } /* ------------------------------------------------------------ */ /** * The Bayeux Connect exchange. Connect exchanges implement the long poll * for Bayeux. */ protected class Connect extends Exchange { String _connectString; Connect() { super("connect"); _connectString = "{" + "\"channel\":\"/meta/connect\"," + "\"clientId\":\"" + _clientId + "\"," + "\"connectionType\":\"long-polling\"" + "}"; setMessage(_connectString); } protected void onResponseComplete() throws IOException { super.onResponseComplete(); if (!isRunning()) return; if (getResponseStatus() == 200 && _responses != null && _responses.length > 0) { try { startBatch(); for (int i = 0; i < _responses.length; i++) { Message msg = _responses[i]; // get advice if there is any Map adviceField = (Map)msg.get(Bayeux.ADVICE_FIELD); if (adviceField != null) _advice = new Advice(adviceField); if (Bayeux.META_CONNECT.equals(msg.get(Bayeux.CHANNEL_FIELD))) { Boolean successful = (Boolean)msg.get(Bayeux.SUCCESSFUL_FIELD); if (successful != null && successful.booleanValue()) { metaConnect(true,msg); if (!isRunning()) break; synchronized (_outQ) { if (_disconnecting) continue; if (!isInitialized()) { setInitialized(true); { if (_outQ.size() > 0) { _push = new Publish(); send(_push); } } } } // send a Connect (ie longpoll) possibly with // delay according to interval advice _pull = new Connect(); send(_pull,false); } else { // received a failure to our connect message, // check the advice to see what to do: // reconnect: none = hard error // reconnect: handshake = send a handshake // message // reconnect: retry = send another connect, // possibly using interval setInitialized(false); metaConnect(false,msg); synchronized(_outQ) { if (!isRunning()||_disconnecting) break; } if (_advice != null && _advice.isReconnectNone()) throw new IOException("Connect failed, advice reconnect=none"); else if (_advice != null && _advice.isReconnectHandshake()) { if (Log.isDebugEnabled()) Log.debug("connect received success=false, advice is to rehandshake"); _pull = new Handshake(); send(_pull,true); } else { // assume retry = reconnect if (Log.isDebugEnabled()) Log.debug("Assuming retry=reconnect"); resend(true); } } } deliver(null,msg); } } finally { endBatch(); } } else { Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("status",getResponseStatus()); error.put("content",getResponseContent()); metaConnect(false,error); resend(true); } recycle(); } /* ------------------------------------------------------------ */ protected void onExpire() { // super.onExpire(); setInitialized(false); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure","expired"); metaConnect(false,error); resend(true); } /* ------------------------------------------------------------ */ protected void onConnectionFailed(Throwable ex) { // super.onConnectionFailed(ex); setInitialized(false); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure",ex.toString()); error.put("exception",ex); metaConnect(false,error); resend(true); } /* ------------------------------------------------------------ */ protected void onException(Throwable ex) { // super.onException(ex); setInitialized(false); Message error=_msgPool.newMessage(); error.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE); error.put("failure",ex.toString()); error.put("exception",ex); metaConnect(false,error); resend(true); } } /* ------------------------------------------------------------ */ /** * Publish message exchange. Sends messages to bayeux server and handles any * messages received as a result. */ protected class Publish extends Exchange { Publish() { super("publish"); StringBuffer json = new StringBuffer(256); synchronized (json) { synchronized (_outQ) { int s=_outQ.size(); if (s == 0) return; for (int i=0;i<s;i++) { Message message = _outQ.getUnsafe(i); message.put(Bayeux.CLIENT_FIELD,_clientId); extendOut(message); json.append(i==0?'[':','); _jsonOut.append(json,message); if (message instanceof MessageImpl) ((MessageImpl)message).decRef(); } json.append(']'); _outQ.clear(); setJson(json.toString()); } } } protected Message[] getOutboundMessages() { try { return _msgPool.parse(_json); } catch (IOException e) { Log.warn("Error converting outbound messages"); if (Log.isDebugEnabled()) Log.debug(e); return null; } } /* ------------------------------------------------------------ */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -