📄 xbsession.java
字号:
} } } finally { if (this.statusChangeListener != null) this.statusChangeListener.statusPostChanged(this.sessionName, I_StatusChangeListener.RUNNING, I_StatusChangeListener.CLOSED); } } public void commit() throws JMSException { checkIfOpen("commit"); checkIfTransacted("commit"); checkControlThread(); log.warning("transacted sessions not implemented"); } public BytesMessage createBytesMessage() throws JMSException { checkIfOpen("createBytesMessage"); checkControlThread(); return new XBBytesMessage(this, null); } public MapMessage createMapMessage() throws JMSException { checkIfOpen("createMapMessage"); checkControlThread(); return new XBMapMessage(this, null); } public Message createMessage() throws JMSException { checkIfOpen("createMessage"); checkControlThread(); return new XBMessage(this, null, XBMessage.DEFAULT_TYPE); } public ObjectMessage createObjectMessage() throws JMSException { checkIfOpen("createObjectMessage"); checkControlThread(); return new XBObjectMessage(this, null); } public ObjectMessage createObjectMessage(Serializable content) throws JMSException { checkIfOpen("createObjectMessage"); checkControlThread(); ObjectMessage msg = createObjectMessage(); msg.setObject(content); return msg; } public StreamMessage createStreamMessage() throws JMSException { checkIfOpen("createStreamMessage"); checkControlThread(); return new XBStreamMessage(this, null); } public XBStreamingMessage createStreamingMessage(I_ReplaceContent replaceContent) throws JMSException { checkIfOpen("createStreamingMessage"); checkControlThread(); final InputStream inputStream = null; return new XBStreamingMessage(this, inputStream, replaceContent); // return new XBTextMessage(this, null, null, null); } public TextMessage createTextMessage() throws JMSException { checkIfOpen("createTextMessage"); checkControlThread(); return new XBTextMessage(this, null); } public TextMessage createTextMessage(String text) throws JMSException { checkIfOpen("createTextMessage"); checkControlThread(); return new XBTextMessage(this, text.getBytes()); } public MessageListener getMessageListener() throws JMSException { checkIfOpen("getMessageListener"); checkControlThread(); return this.msgListener; } public boolean getTransacted() throws JMSException { checkIfOpen("getTransacted"); checkControlThread(); return this.transacted; } /* (non-Javadoc) * @see javax.jms.Session#recover() */ public void recover() throws JMSException { checkIfOpen("recover"); checkIfTransacted("recover"); checkControlThread(); log.warning("transacted sessions not implemented yet"); } /* (non-Javadoc) * @see javax.jms.Session#rollback() */ public void rollback() throws JMSException { checkIfOpen("rollback"); checkIfTransacted("rollback"); checkControlThread(); log.warning("transacted sessions not implemented yet"); } public void run() { if (this.started) { log.info("run: session was already started, not doing anything"); return; } this.started = true; log.info("run: session started"); this.controlThread = Thread.currentThread(); while (true) { try { XBMsgEvent msgEvent = (XBMsgEvent)this.channel.take(); Message msg = msgEvent.getMessage(); if (msg == null) { log.info("Shutting down the running thread since close event received"); break; } msgEvent.getListener().onMessage(msg); // TODO notify the update thread waiting for ACK if (log.isLoggable(Level.FINE)) log.fine("run: msg='" + msg.getJMSMessageID() + "' ack='" + this.ackMode + "'"); if (this.ackMode == Session.DUPS_OK_ACKNOWLEDGE) msg.acknowledge(); } catch (InterruptedException ex) { log.severe("run InterruptedException occured " + ex.getMessage()); ex.printStackTrace(); } catch (Throwable ex) { log.severe("run a Throwable occured " + ex.getMessage()); ex.printStackTrace(); } } } public void setMessageListener(MessageListener msgListener) throws JMSException { checkIfOpen("setMessageListener"); checkControlThread(); log.warning("setMessageListener not implemented"); if (log.isLoggable(Level.FINE)) Thread.dumpStack(); this.msgListener = msgListener; } public Queue createQueue(String queueName) throws JMSException { checkIfOpen("createQueue"); checkControlThread(); return new XBDestination(null, queueName); } public QueueBrowser createBrowser(Queue queue) throws JMSException { checkIfOpen("createBrowser"); checkControlThread(); return new XBQueueBrowser(queue, null); } public QueueBrowser createBrowser(Queue queue, String msgSelector) throws JMSException { checkIfOpen("createBrowser"); checkControlThread(); return new XBQueueBrowser(queue, msgSelector); } public TemporaryQueue createTemporaryQueue() throws JMSException { checkIfOpen("createTemporaryQueue"); checkControlThread(); return new XBTemporaryQueue(); } public int getAcknowledgeMode() throws JMSException { checkIfOpen("getAcknowledgeMode"); return this.ackMode; } public MessageProducer createProducer(Destination destination) throws JMSException { checkIfOpen("createProducer"); checkControlThread(); return new XBMessageProducer(this, destination); } /** * For each consumer created, an own xmlBlaster subscription is done since * the msgSelector (i.e. in xmlBlaster the mime plugin) could be different from * one consumer to another. This is done in the constructor of the MessageConsumer and */ public MessageConsumer createConsumer(Destination destination) throws JMSException { checkIfOpen("createConsumer"); checkControlThread(); return new XBMessageConsumer(this, destination, this.msgSelectorDefault, this.noLocalDefault); } public MessageConsumer createConsumer(Destination destination, String msgSelector) throws JMSException { checkIfOpen("createConsumer"); checkControlThread(); return new XBMessageConsumer(this, destination, msgSelector, this.noLocalDefault); } public MessageConsumer createConsumer(Destination destination, String msgSelector, boolean noLocal) throws JMSException { checkIfOpen("createConsumer"); checkControlThread(); return new XBMessageConsumer(this, destination, msgSelector, noLocal); } public Topic createTopic(String name) throws JMSException { checkIfOpen("createTopic"); checkControlThread(); return new XBDestination(name, null, false); } public void unsubscribe(String subName) throws JMSException { if (log.isLoggable(Level.FINER)) log.finer("unsubscribe '" + subName + "'"); checkIfOpen("unsubscribe"); checkControlThread(); try { TopicSubscriber sub = (TopicSubscriber)this.durableSubscriptionMap.remove(subName); if (sub == null) throw new JMSException(ME, "unsubscribe '" + subName + "'failed because the topic has not been found in this session"); sub.close(); } catch (Exception ex) { throw new JMSException(ME, "unsubscribe '" + subName + "'failed. Cause: " + ex.getMessage()); } } public TemporaryTopic createTemporaryTopic() throws JMSException { checkIfOpen("createTemporaryTopic"); checkControlThread(); return new XBTemporaryTopic(); } public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException { log.severe("update: should never be invoked ... '" + new String(content) + "' '" + updateQos.toXml() + "'"); return "OK"; } /** * @return Returns the asyncMode. */ int getSyncMode() { return this.syncMode; } /** * @param asyncMode The asyncMode to set. This method starts the * runner thread if not running yet in case async is set. */ void setSyncMode(int asyncMode) { this.syncMode = asyncMode; if (asyncMode == XBSession.MODE_ASYNC && !this.started) start(); } void setControlThread(Thread controlThread) { this.controlThread = controlThread; } long getUpdateTimeout() { return this.updateTimeout; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -