📄 abstractbayeux.java
字号:
// ========================================================================// Copyright 2006 Mort Bay Consulting Pty. Ltd.// ------------------------------------------------------------------------// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at// http://www.apache.org/licenses/LICENSE-2.0// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.//========================================================================package org.mortbay.cometd;import java.io.IOException;import java.security.SecureRandom;import java.util.ArrayList;import java.util.Collection;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Random;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CopyOnWriteArrayList;import javax.servlet.ServletContext;import javax.servlet.http.HttpServletRequest;import org.cometd.Bayeux;import org.cometd.BayeuxListener;import org.cometd.Channel;import org.cometd.ChannelBayeuxListener;import org.cometd.Client;import org.cometd.ClientBayeuxListener;import org.cometd.Extension;import org.cometd.Message;import org.cometd.SecurityPolicy;import org.mortbay.util.LazyList;import org.mortbay.util.ajax.JSON;/* ------------------------------------------------------------ *//** * @author gregw * @author aabeling: added JSONP transport * */public abstract class AbstractBayeux extends MessagePool implements Bayeux{ public static final ChannelId META_ID=new ChannelId(META); public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT); public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT); public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT); public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE); public static final ChannelId META_PING_ID=new ChannelId(META_PING); public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS); public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE); public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE); private HashMap<String,Handler> _handlers=new HashMap<String,Handler>(); private ChannelImpl _root = new ChannelImpl("/",this); private ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>(); protected SecurityPolicy _securityPolicy=new DefaultPolicy(); protected JSON.Literal _advice; protected JSON.Literal _multiFrameAdvice; protected int _adviceVersion=0; protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}"); protected int _logLevel; protected long _timeout=240000; protected long _interval=0; protected long _maxInterval=30000; protected boolean _initialized; protected ConcurrentHashMap<String, List<String>> _browser2client=new ConcurrentHashMap<String, List<String>>(); protected int _multiFrameInterval=-1; protected boolean _requestAvailable; protected ThreadLocal<HttpServletRequest> _request = new ThreadLocal<HttpServletRequest>(); transient ServletContext _context; transient Random _random; transient ConcurrentHashMap<String, ChannelId> _channelIdCache; protected Handler _publishHandler; protected Handler _metaPublishHandler; protected int _maxClientQueue=-1; protected Extension[] _extensions; protected JSON.Literal _transports=new JSON.Literal("[\""+Bayeux.TRANSPORT_LONG_POLL+ "\",\""+Bayeux.TRANSPORT_CALLBACK_POLL+"\"]"); protected JSON.Literal _replyExt = new JSON.Literal("{\"ack\":\"true\"}"); protected List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>(); protected List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>(); /* ------------------------------------------------------------ */ /** * @param context. * The logLevel init parameter is used to set the logging to * 0=none, 1=info, 2=debug */ protected AbstractBayeux() { _publishHandler=new PublishHandler(); _metaPublishHandler=new MetaPublishHandler(); _handlers.put(META_HANDSHAKE,new HandshakeHandler()); _handlers.put(META_CONNECT,new ConnectHandler()); _handlers.put(META_DISCONNECT,new DisconnectHandler()); _handlers.put(META_SUBSCRIBE,new SubscribeHandler()); _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler()); _handlers.put(META_PING,new PingHandler()); setTimeout(getTimeout()); } /* ------------------------------------------------------------ */ public void addExtension(Extension ext) { _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class); } /* ------------------------------------------------------------ */ /** * @param id * @return */ public ChannelImpl getChannel(ChannelId id) { return _root.getChild(id); } /* ------------------------------------------------------------ */ public ChannelImpl getChannel(String id) { ChannelId cid=getChannelId(id); if (cid.depth()==0) return null; return _root.getChild(cid); } /* ------------------------------------------------------------ */ public Channel getChannel(String id, boolean create) { synchronized(this) { ChannelImpl channel=getChannel(id); if (channel==null && create) { channel=new ChannelImpl(id,this); _root.addChild(channel); if (isLogInfo()) logInfo("newChannel: "+channel); } return channel; } } /* ------------------------------------------------------------ */ public ChannelId getChannelId(String id) { ChannelId cid = _channelIdCache.get(id); if (cid==null) { // TODO shrink cache! cid=new ChannelId(id); _channelIdCache.put(id,cid); } return cid; } /* ------------------------------------------------------------ */ /* (non-Javadoc) * @see org.mortbay.cometd.Bx#getClient(java.lang.String) */ public Client getClient(String client_id) { synchronized(this) { if (client_id==null) return null; Client client = _clients.get(client_id); return client; } } /* ------------------------------------------------------------ */ public Set<String> getClientIDs() { return _clients.keySet(); } /* ------------------------------------------------------------ */ /** * @return The maximum time in ms to wait between polls before timing out a client */ public long getMaxInterval() { return _maxInterval; } /* ------------------------------------------------------------ */ /** * @return the logLevel. 0=none, 1=info, 2=debug */ public int getLogLevel() { return _logLevel; } /* ------------------------------------------------------------ */ /* (non-Javadoc) * @see org.mortbay.cometd.Bx#getSecurityPolicy() */ public SecurityPolicy getSecurityPolicy() { return _securityPolicy; } /* ------------------------------------------------------------ */ public long getTimeout() { return _timeout; } /* ------------------------------------------------------------ */ public long getInterval() { return _interval; } /* ------------------------------------------------------------ */ /** * @return true if published messages are directly delivered to subscribers. False if * a new message is to be created that holds only supported fields. */ public boolean isDirectDeliver() { return false; } /* ------------------------------------------------------------ */ /** * @deprecated * @param directDeliver true if published messages are directly delivered to subscribers. False if * a new message is to be created that holds only supported fields. */ public void setDirectDeliver(boolean directDeliver) { _context.log("directDeliver is deprecated"); } /* ------------------------------------------------------------ */ /** Handle a Bayeux message. * This is normally only called by the bayeux servlet or a test harness. * @param client The client if known * @param transport The transport to use for the message * @param message The bayeux message. */ public String handle(ClientImpl client, Transport transport, Message message) throws IOException { String channel_id=message.getChannel(); Handler handler=(Handler)_handlers.get(channel_id); if (handler!=null) { message=extendRcvMeta(client,message); handler.handle(client,transport,message); _metaPublishHandler.handle(client,transport,message); } else if (channel_id.startsWith(META_SLASH)) { message=extendRcvMeta(client,message); _metaPublishHandler.handle(client,transport,message); } else { // non meta channel handler=_publishHandler; message=extendRcv(client,message); handler.handle(client,transport,message); } return channel_id; } /* ------------------------------------------------------------ */ public boolean hasChannel(String id) { ChannelId cid=getChannelId(id); return _root.getChild(cid)!=null; } /* ------------------------------------------------------------ */ public boolean isInitialized() { return _initialized; } /* ------------------------------------------------------------ */ /** * @return the commented * @deprecated */ public boolean isJSONCommented() { return false; } /* ------------------------------------------------------------ */ public boolean isLogDebug() { return _logLevel>1; } /* ------------------------------------------------------------ */ public boolean isLogInfo() { return _logLevel>0; } /* ------------------------------------------------------------ */ public void logDebug(String message) { if (_logLevel>1) _context.log(message); } /* ------------------------------------------------------------ */ public void logDebug(String message, Throwable th) { if (_logLevel>1) _context.log(message,th); } /* ------------------------------------------------------------ */ public void logWarn(String message, Throwable th) { _context.log(message+": "+th.toString()); } /* ------------------------------------------------------------ */ public void logWarn(String message) { _context.log(message); } /* ------------------------------------------------------------ */ public void logInfo(String message) { if (_logLevel>0) _context.log(message); } /* ------------------------------------------------------------ */ public Client newClient(String idPrefix) { ClientImpl client = new ClientImpl(this,idPrefix); return client; } /* ------------------------------------------------------------ */ public abstract ClientImpl newRemoteClient(); /* ------------------------------------------------------------ */ /** Create new transport object for a bayeux message * @param client The client * @param message the bayeux message * @return the negotiated transport. */ public Transport newTransport(ClientImpl client, Map<?,?> message) { if (isLogDebug()) logDebug("newTransport: client="+client+",message="+message); Transport result=null; try { String type=client==null?null:client.getConnectionType(); if (type==null) type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD); if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type) || type==null) { String jsonp=(String)message.get(Bayeux.JSONP_PARAMETER); if(jsonp!=null) result=new JSONPTransport(jsonp); else result=new JSONTransport(); } else result=new JSONTransport(); } catch (Exception e) { throw new RuntimeException(e); } if (isLogDebug()) logDebug("newTransport: result="+result); return result; } /* ------------------------------------------------------------ */ /** Publish data to a channel. * Creates a message and delivers it to the root channel. * @param to * @param from * @param data * @param msgId */ protected void doPublish(ChannelId to, Client from, Object data, String msgId, boolean lazy) { final MessageImpl message = newMessage(); message.put(CHANNEL_FIELD,to.toString()); if (msgId==null) { long id=message.hashCode() ^(to==null?0:to.hashCode()) ^(from==null?0:from.hashCode()); id=id<0?-id:id; message.put(ID_FIELD,Long.toString(id,36)); } else message.put(ID_FIELD,msgId); message.put(DATA_FIELD,data); message.setLazy(lazy); final Message m=extendSendBayeux(from,message); if (m!=null) _root.doDelivery(to,from,m); if (m instanceof MessageImpl) ((MessageImpl)m).decRef(); } /* ------------------------------------------------------------ */ public boolean removeChannel(ChannelImpl channel) { boolean removed = _root.doRemove(channel); if (removed) for (ChannelBayeuxListener l : _channelListeners) l.channelRemoved(channel); return removed; } /* ------------------------------------------------------------ */ public void addChannel(ChannelImpl channel) { for (ChannelBayeuxListener l : _channelListeners) l.channelAdded(channel); } /* ------------------------------------------------------------ */ protected String newClientId(long variation, String idPrefix) { if (idPrefix==null) return Long.toString(getRandom(),36)+Long.toString(variation,36); else return idPrefix+"_"+Long.toString(getRandom(),36); } /* ------------------------------------------------------------ */ protected void addClient(ClientImpl client,String idPrefix) { while(true) { String id = newClientId(client.hashCode(),idPrefix); client.setId(id); ClientImpl other = _clients.putIfAbsent(id,client);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -