📄 xmlblasteraccess.java
字号:
/*------------------------------------------------------------------------------Name: XmlBlasterAccess.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.client;import java.io.IOException;import java.io.InputStream;import java.util.Map;import java.util.ArrayList;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.FileDumper;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_ReplaceContent;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.client.qos.ConnectReturnQos;import org.xmlBlaster.client.qos.DisconnectQos;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.dispatch.DispatchManager;import org.xmlBlaster.util.dispatch.I_PostSendListener;import org.xmlBlaster.util.dispatch.DispatchStatistic;import org.xmlBlaster.util.error.I_MsgErrorHandler;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.client.queuemsg.MsgQueueConnectEntry;import org.xmlBlaster.client.queuemsg.MsgQueueDisconnectEntry;import org.xmlBlaster.client.queuemsg.MsgQueuePublishEntry;import org.xmlBlaster.client.queuemsg.MsgQueueSubscribeEntry;import org.xmlBlaster.client.queuemsg.MsgQueueUnSubscribeEntry;import org.xmlBlaster.client.queuemsg.MsgQueueEraseEntry;import org.xmlBlaster.client.queuemsg.MsgQueueGetEntry;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.client.protocol.I_CallbackServer;import org.xmlBlaster.client.protocol.AbstractCallbackExtended;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.TopicProperty;import org.xmlBlaster.util.qos.storage.CbQueueProperty;import org.xmlBlaster.util.qos.storage.ClientQueueProperty;import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;import org.xmlBlaster.util.qos.address.CallbackAddress;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.key.GetKey;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.key.EraseKey;import org.xmlBlaster.client.qos.GetQos;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.qos.EraseQos;import org.xmlBlaster.client.qos.EraseReturnQos;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;import org.xmlBlaster.jms.XBConnectionMetaData;import org.xmlBlaster.authentication.plugins.I_ClientPlugin;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;/** * This is the default implementation of the java client side remote access to xmlBlaster. * <p> * It hides a client side queue, the client side dispatcher framework for polling * or pinging the server and some more features. * </p> * <p> * The interface I_CallbackRaw/I_Callback/I_CallbackExtenden are enforced by AbstractCallbackExtended. * </p> */public /*final*/ class XmlBlasterAccess extends AbstractCallbackExtended implements I_XmlBlasterAccess, I_ConnectionStatusListener, I_PostSendListener, XmlBlasterAccessMBean{ private static Logger log = Logger.getLogger(XmlBlasterAccess.class.getName()); private String ME = "XmlBlasterAccess"; private ContextNode contextNode; /** * The cluster node id (name) to which we want to connect, needed for nicer logging, typically null * Can be set manually from outside before connect */ private String serverNodeId = null; private ConnectQos connectQos; /** The return from connect() */ private ConnectReturnQos connectReturnQos; private long jmxPublicSessionId; /** Client side queue during connection failure */ private I_Queue clientQueue; /** The dispatcher framework **/ private DispatchManager dispatchManager; /** Statistic about send/received messages, can be null if there is a DispatchManager around */ private volatile DispatchStatistic statistic; /** The object handling message delivery problems */ private I_MsgErrorHandler msgErrorHandler; /** Client side helper classes to load the authentication xml string */ private I_ClientPlugin secPlgn; /** The callback server */ private I_CallbackServer cbServer; /** Handles the registered callback interfaces for given subscriptions. */ private final UpdateDispatcher updateDispatcher; /** Used to callback the clients default update() method (as given on connect()) */ private I_Callback updateListener; /** Is not null if the client wishes to be notified about connection state changes in fail safe operation */ private I_ConnectionStateListener connectionListener; private I_PostSendListener postSendListener; /** Allow to cache updated messages for simulated synchronous access with get(). * Do behind a get() a subscribe to allow cached synchronous get() access */ private SynchronousCache synchronousCache; private boolean disconnectInProgress; private boolean connectInProgress; /** this I_XmlBlasterAccess is valid until a 'leaveServer' invocation is done.*/ private boolean isValid = true; private boolean firstWarn = true; private Timestamp sessionRefreshTimeoutHandle; /** My JMX registration */ private JmxMBeanHandle mbeanHandle; /** First call to connect() in millis */ private long startupTime; StreamingCallback streamingCb; private String storageIdPrefix; private FileDumper fileDumper; private boolean shutdown = false; /** * Create an xmlBlaster accessor. * Please don't create directly but use the factory instead: * <pre> * import org.xmlBlaster.util.Global; * ... * final Global glob = new Global(args); * final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess(); * </pre> * @param glob Your environment handle or null to use the default Global.instance() * You must use a cloned Global for each XmlBlasterAccess created. * engine.Global is not allowed here, only util.Global is supported * @exception IllegalArgumentException If engine.Global is used as parameter */ public XmlBlasterAccess(Global glob) { super((glob==null) ? Global.instance() : glob); //if (glob.wantsHelp()) { // usage(); //} if (super.glob.getNodeId() != null) { // it is a engine.Global! throw new IllegalArgumentException("XmlBlasterAccess can't be created with a engine.Global, please clone a org.xmlBlaster.util.Global to create me"); } this.updateDispatcher = new UpdateDispatcher(super.glob); } /** * Create an xmlBlaster accessor. * Please don't create directly but use the factory instead: * <pre> * final Global glob = new Global(args); * final I_XmlBlasterAccess xmlBlasterAccess = glob.getXmlBlasterAccess(); * </pre> * @param args Your command line arguments */ public XmlBlasterAccess(String[] args) { super(new Global(args, true, false)); this.updateDispatcher = new UpdateDispatcher(super.glob); } /** * @see org.xmlBlaster.client.I_XmlBlasterAccess#registerConnectionListener(I_ConnectionStateListener) */ public synchronized void registerConnectionListener(I_ConnectionStateListener connectionListener) { if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Initializing registering connectionListener"); this.connectionListener = connectionListener; } /** * Register a listener to get notifications when a messages is successfully send from * the client side tail back queue. * Max one can be registered, any old one will be overwritten * @param postSendListener The postSendListener to set. * @return the old listener or null if no previous was registered */ public final I_PostSendListener registerPostSendListener(I_PostSendListener postSendListener) { I_PostSendListener old = this.postSendListener; this.postSendListener = postSendListener; return old; } /** * Called after a messages is send from the client side queue, but not for oneway messages. * Enforced by I_PostSendListener * @param msgQueueEntry, includes the returned QoS (e.g. PublisReturnQos) */ public final void postSend(MsgQueueEntry[] entries) { for (int i=0; i<entries.length; i++) { MsgQueueEntry msgQueueEntry = entries[i]; if (msgQueueEntry.getMethodName() == MethodName.CONNECT) { this.connectReturnQos = (ConnectReturnQos)msgQueueEntry.getReturnObj(); setContextNodeId(this.connectReturnQos.getServerInstanceId()); //break; Loop to the latest if any } } I_PostSendListener l = this.postSendListener; if (l != null) { try { l.postSend(entries); } catch (Throwable e) { e.printStackTrace(); } } } public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException exception) { I_PostSendListener l = this.postSendListener; try { if (l == null) { for (int i=0; i<entries.length; i++) { MsgUnit msgUnit = entries[i].getMsgUnit(); String fn = this.getFileDumper().dumpMessage(msgUnit.getKeyData(), msgUnit.getContent(), msgUnit.getQosData()); log.severe("Async sending of message failed for message " + msgUnit.getKeyOid() +", is dumped to " + fn + ": " + exception.getMessage()); } } else { return l.sendingFailed(entries, exception); } } catch (Throwable e) { e.printStackTrace(); for (int i=0; i<entries.length; i++) log.severe("Async sending of message failed for message " + entries[i].toXml() +"\nreason is: " + exception.getMessage()); } return false; } public FileDumper getFileDumper() throws XmlBlasterException { if (this.fileDumper == null) { synchronized (this) { if (this.fileDumper == null) { this.fileDumper = new FileDumper(this.glob); } } } return this.fileDumper; } /** */ public SynchronousCache createSynchronousCache(int size) { if (this.synchronousCache != null) return this.synchronousCache; // Is initialized already if (log.isLoggable(Level.FINER)) log.finer(getLogId()+"Initializing synchronous cache: size=" + size); this.synchronousCache = new SynchronousCache(glob, size); log.info(getLogId()+"SynchronousCache has been initialized with size="+size); return this.synchronousCache; } public void setClientErrorHandler(I_MsgErrorHandler msgErrorHandler) { this.msgErrorHandler = msgErrorHandler; } public String getConnectionQueueId() { if (this.clientQueue != null) { return this.clientQueue.getStorageId().toString(); } return ""; } /** * The unique name of this session instance. * @return Never null, for example "/xmlBlaster/node/heron/client/joe/session/-2" */ public final ContextNode getContextNode() { return this.contextNode; } public ConnectReturnQos connect(ConnectQos qos, I_StreamingCallback streamingUpdateListener, boolean withQueue) throws XmlBlasterException { if (streamingUpdateListener == null) throw new XmlBlasterException(this.glob, ErrorCode.USER_ILLEGALARGUMENT, "connect", "the streamingUpdateListener is null, you must provide one"); this.streamingCb = new StreamingCallback(this.glob, streamingUpdateListener, 0, 0, withQueue); if (withQueue) registerConnectionListener(this.streamingCb); return connect(qos, this.streamingCb); } /** * @see org.xmlBlaster.client.I_XmlBlasterAccess#connect(ConnectQos, I_Callback) */ public ConnectReturnQos connect(ConnectQos qos, I_Callback updateListener) throws XmlBlasterException { if (!this.isValid) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "connect");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -