📄 xbsession.java
字号:
/*------------------------------------------------------------------------------Name: XBSession.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.jms;import java.io.InputStream;import java.io.Serializable;import java.util.HashMap;import java.util.Map;import javax.jms.BytesMessage;import javax.jms.Destination;import javax.jms.ExceptionListener;import javax.jms.IllegalStateException;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Queue;import javax.jms.QueueBrowser;import javax.jms.StreamMessage;import javax.jms.TemporaryQueue;import javax.jms.TemporaryTopic;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.Session;import javax.jms.TopicSubscriber;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.client.I_Callback;import org.xmlBlaster.client.I_XmlBlasterAccess;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.DisconnectQos;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.client.qos.UpdateQos;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.I_ReplaceContent;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import EDU.oswego.cs.dl.util.concurrent.Channel;import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;/** * XBSession * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> * */public class XBSession extends Thread implements Session, I_Callback { final static int MODE_UNSET = 0; final static int MODE_ASYNC = 1; final static int MODE_SYNC = 2; private final static String ME = "XBSession"; protected Global global; private static Logger log = Logger.getLogger(XBSession.class.getName()); XBConnection connection; protected int ackMode; protected final boolean noLocalDefault = false; // is this conform to jms ? protected final boolean durableDefault = false; protected final String msgSelectorDefault = null; protected MessageListener msgListener; protected HashMap durableSubscriptionMap; protected boolean open; protected boolean transacted; protected int syncMode = MODE_UNSET; protected long updateTimeout = 60000L; protected Map consumerMap; // TODO REMOVE THIS LATER (we bypass a check to be allowed to publish from several threads // but this is not permitted by the JMS specification since a session must be single-threaded. private boolean tmpBypassCheckSet; /** * Set in the constructor it can be changed in the update methods * (also of the message consumers) only. */ protected Thread controlThread; // protected ConnectQos connectQos; protected String sessionName; private I_StatusChangeListener statusChangeListener; protected ExceptionListener exceptionListener; protected boolean connectionActivated; protected Channel channel; private boolean started; /** * This constructor extracts the global from the ConnectQos. Note that * you need to clone the ConnectQos before passing it to this constructor. * global contained in the connectQos. * * @param connectQos is the connectQos to be used for this session. It is an own instance and can not be null. */ XBSession(XBConnection connection, int ackMode, boolean transacted) { this.connection = connection; this.global = this.connection.getConnectQos().getData().getGlobal(); postConstructor(ackMode, transacted); } private final void postConstructor(int ackMode, boolean transacted) { if (log.isLoggable(Level.FINER)) log.finer("constructor"); this.ackMode = ackMode; this.durableSubscriptionMap = new HashMap(); this.open = true; this.transacted = transacted; this.controlThread = Thread.currentThread(); this.channel = new LinkedQueue(); this.consumerMap = new HashMap(); } /** * This constructor is used if you want to pass a Global which has already * done some work (connected) on the I_XmlBlasterAccess. Caution, you will not * be able to connect and disconnect if you use this constructor. * * @param global * @param ackMode * @param transacted */ public XBSession(Global global, int ackMode, boolean transacted) { this.global = global; postConstructor(ackMode, transacted); this.syncMode = MODE_ASYNC; } /** * registeres the listener about status changes. In general this listener is the * owner XBConnection since it needs to be notified everytime one of its sessions * is closing. Care must be used when invoking since this is not synchronized. * @param statusChangeListener */ void setStatusChangeListener(I_StatusChangeListener statusChangeListener) { if (log.isLoggable(Level.FINER)) log.finer("setStatusChangeListener"); if (statusChangeListener == null) { this.statusChangeListener = null; } else { if (statusChangeListener == this.statusChangeListener) return; //synchronized(statusChangeListener) { this.statusChangeListener = statusChangeListener; //} } } /** * Activates or deactivates the dispatcher associated to this session. * It does it only if the Session is in ASYNC Mode. * @param doActivate */ final synchronized void activateDispatcher(boolean doActivate) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("activateDispatcher '" + doActivate + "'"); // only activate if already in asyc mode, i.e. if there is at least // one msgListener associated to this session boolean realDoActivate = (doActivate && (this.syncMode == MODE_ASYNC)); if ( (doActivate && this.syncMode == MODE_ASYNC) || !doActivate) { String oid = "__cmd:" + this.sessionName + "/?dispatcherActive=" + doActivate; PublishQos qos = new PublishQos(this.global); PublishKey key = new PublishKey(this.global, oid); this.global.getXmlBlasterAccess().publish(new MsgUnit(key, (byte[])null, qos)); this.connectionActivated = realDoActivate; } } /** * * @return a string containing the complete session name. * @throws JMSException */ String connect() throws JMSException { if (log.isLoggable(Level.FINER)) log.finer("connect"); try { I_XmlBlasterAccess accessor = this.global.getXmlBlasterAccess(); if (!accessor.isConnected()) this.sessionName = accessor.connect(this.connection.getConnectQos(), this).getSessionName().getRelativeName(); else this.sessionName = accessor.getConnectReturnQos().getSessionName().getRelativeName(); // activateDispatcher(false); return this.sessionName; } catch (XmlBlasterException ex) { throw new XBException(ex, ME + ".connect"); } } protected final void checkIfOpen(String methodName) throws IllegalStateException { if (!this.open) throw new IllegalStateException(ME + " the session has been closed, operation '" + methodName + "' not permitted"); } protected final void checkIfTransacted(String methodName) throws IllegalStateException { if (!this.transacted) throw new IllegalStateException(ME, "the session is not transacted, operation '" + methodName + "' not permitted"); } final void checkControlThread() throws JMSException { if (!this.tmpBypassCheckSet) { log.warning("Publishes will be done by temporarly bypassing the control thread check"); this.tmpBypassCheckSet = true; } if (true) return; if (this.controlThread == null || this.controlThread == Thread.currentThread()) return; throw new JMSException(ME, "the session must be used within the same thread"); } public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { return createDurableSubscriber(topic, name, this.msgSelectorDefault, this.noLocalDefault); } public TopicSubscriber createDurableSubscriber(Topic topic, String name, String msgSelector, boolean noLocal) throws JMSException { if (log.isLoggable(Level.FINER)) log.finer("createDurableSubscriber '" + name + "' msgSelector=" + msgSelector + "' noLocal='" + noLocal + "'"); checkIfOpen("createDurableSubscriber"); checkControlThread(); TopicSubscriber sub = new XBTopicSubscriber(this, topic, msgSelector, noLocal); this.durableSubscriptionMap.put(name, sub); return sub; } /** * It disconnects from xmlBlaster and deregisters from its XBConnection */ public void close() throws JMSException { if (log.isLoggable(Level.FINER)) log.finer("close"); if (this.statusChangeListener != null) this.statusChangeListener.statusPreChanged(this.sessionName, I_StatusChangeListener.RUNNING, I_StatusChangeListener.CLOSED); try { synchronized (this) { try { this.open = false; Object[] keys = this.consumerMap.keySet().toArray(); if (log.isLoggable(Level.FINE)) log.fine("close: going to close '" + keys.length + "' consumers too"); for (int i=0; i < keys.length; i++) { MessageConsumer consumer = (MessageConsumer)this.consumerMap.get(keys[i]); if (consumer != null) consumer.close(); } this.consumerMap.clear(); this.global.getXmlBlasterAccess().disconnect(new DisconnectQos(this.global)); } finally { // to avoid thread leak if (this.syncMode == MODE_ASYNC) { if (log.isLoggable(Level.FINE)) log.fine("close: shutting down the running thread by sending a null message to its channel"); XBMsgEvent event = new XBMsgEvent(null, null); try { this.channel.put(event); } catch (InterruptedException ex) { ex.printStackTrace(); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -