📄 session.java
字号:
throws JMSException { for (int j = 0; j < consumers.size(); j++) { MessageConsumer cons = (MessageConsumer) consumers.elementAt(j); if (agentId.equals(cons.dest.agentId)) { throw new JMSException( "Consumers still exist for this temp queue."); } } } /** * Called here and by sub-classes. */ protected void addProducer(MessageProducer mp) { producers.addElement(mp); } /** * Called by MessageProducer. */ synchronized void closeProducer(MessageProducer mp) { producers.removeElement(mp); } /** * Called by Queue browser. */ synchronized void closeBrowser(QueueBrowser qb) { browsers.removeElement(qb); } /** * Called by MessageConsumer */ synchronized MessageConsumerListener addMessageListener( MessageConsumerListener mcl) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.addMessageListener(" + mcl + ')'); checkClosed(); checkThreadOfControl(); checkSessionMode(SessionMode.LISTENER); mcl.start(); if (status == Status.START && listenerCount == 0) { doStart(); } listenerCount++; return mcl; } /** * Called by MessageConsumer. The thread of control and the status * must be checked if the call results from a setMessageListener * but not from a close. */ void removeMessageListener( MessageConsumerListener mcl, boolean check) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.removeMessageListener(" + mcl + ',' + check + ')'); if (check) { checkClosed(); checkThreadOfControl(); } // This may block if a message listener // is currently receiving a message (onMessage is called) // so we have to be out of the synchronized block. mcl.close(); synchronized (this) { listenerCount--; if (status == Status.START && listenerCount == 0) { try { repliesIn.stop(); } catch (InterruptedException iE) { } // All the message listeners have been closed // so we can call doStop() in a synchronized // block. No deadlock possible. doStop(); } } } /** * Called by MessageConsumerListener (demultiplexer thread * from RequestMultiplexer) in order to distribute messages * to a message consumer. * Not synchronized because a concurrent close * can be done. * * @exception */ void pushMessages(SingleSessionConsumer consumerListener, ConsumerMessages messages) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.pushMessages(" + consumerListener + ',' + messages + ')'); repliesIn.push( new MessageListenerContext( consumerListener, messages)); } /** * Called by ConnectionConsumer in order * to distribute a message through the * method run(). * (session mode is APP_SERVER) */ void onMessage(org.objectweb.joram.shared.messages.Message momMsg) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Session.onMessage(" + momMsg + ')'); repliesIn.push(momMsg); } /** * Called by: * - method run (application server thread) synchronized */ private void ackMessage(String targetName, String msgId, boolean queueMode) throws JMSException { ConsumerAckRequest ack = new ConsumerAckRequest( targetName, queueMode); ack.addId(msgId); mtpx.sendRequest(ack); } /** * Called by: * - method run (application server thread) synchronized * - method onMessage (SessionDaemon thread) not synchronized * but no concurrent call except a close which first stops * SessionDaemon. */ private void denyMessage(String targetName, String msgId, boolean queueMode) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.denyMessage(" + targetName + ',' + msgId + ',' + queueMode + ')'); ConsumerDenyRequest cdr = new ConsumerDenyRequest( targetName, msgId, queueMode); if (queueMode) { requestor.request(cdr); } else { mtpx.sendRequest(cdr, null); } } /** * Called by SessionDaemon. * Not synchronized but no concurrent call except * a close which first stops SessionDaemon. */ private void onMessages(MessageListenerContext ctx) throws JMSException { Vector msgs = ctx.messages.getMessages(); for (int i = 0; i < msgs.size(); i++) { onMessage( (org.objectweb.joram.shared.messages.Message)msgs.elementAt(i), ctx.consumerListener); } } /** * Called by onMessage() */ private Message prepareMessage( org.objectweb.joram.shared.messages.Message momMsg, String targetName, boolean queueMode) throws JMSException { if (! autoAck) { prepareAck(targetName, momMsg.getIdentifier(), queueMode); } Message msg; try { return Message.wrapMomMessage(this, momMsg); } catch (JMSException jE) { // Catching a JMSException means that the building of the Joram // message went wrong: denying the message: if (autoAck) { denyMessage(targetName, momMsg.getIdentifier(), queueMode); } return null; } } /** * Called by onMessages() */ void onMessage( org.objectweb.joram.shared.messages.Message momMsg, MessageConsumerListener consumerListener) throws JMSException { Message msg = prepareMessage( momMsg, consumerListener.getTargetName(), consumerListener.getQueueMode()); if (msg == null) return; try { if (messageListener == null) { // Standard JMS (MessageConsumer) consumerListener.onMessage(msg, acknowledgeMode); } else { // ASF (ConnectionConsumer) consumerListener.onMessage(msg, messageListener, acknowledgeMode); } } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "", exc); if (autoAck || consumerListener.isClosed()) { denyMessage(consumerListener.getTargetName(), momMsg.getIdentifier(), consumerListener.getQueueMode()); } return; } if (recover) { // The session has been recovered by the // listener thread. if (autoAck) { denyMessage(consumerListener.getTargetName(), momMsg.getIdentifier(), consumerListener.getQueueMode()); } else { doRecover(); recover = false; } } else { if (autoAck) { consumerListener.ack( momMsg.getIdentifier(), acknowledgeMode); } } } /** * Called by MessageProducer. */ synchronized void send(Destination dest, javax.jms.Message message, int deliveryMode, int priority, long timeToLive, boolean timestampDisabled) throws JMSException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log( BasicLevel.DEBUG, "Session.send(" + dest + ',' + message + ',' + deliveryMode + ',' + priority + ',' + timeToLive + ',' + timestampDisabled + ')'); checkClosed(); checkThreadOfControl(); // Updating the message property fields: String msgID = cnx.nextMessageId(); message.setJMSMessageID(msgID); message.setJMSDeliveryMode(deliveryMode); message.setJMSDestination(dest); if (timeToLive == 0) { message.setJMSExpiration(0); } else { message.setJMSExpiration(System.currentTimeMillis() + timeToLive); } message.setJMSPriority(priority); if (! timestampDisabled) { message.setJMSTimestamp(System.currentTimeMillis()); } org.objectweb.joram.shared.messages.Message momMsg = null; if (message instanceof org.objectweb.joram.client.jms.Message) { // If the message to send is a proprietary one, getting the MOM message // it wraps: momMsg = ((Message) message).getMomMessage(); } else if (message instanceof javax.jms.Message) { // If the message to send is a non proprietary JMS message, building // a proprietary message and then getting the MOM message it wraps: try { Message joramMessage = Message.convertJMSMessage(message); momMsg = joramMessage.getMomMessage(); } catch (JMSException jE) { MessageFormatException mE = new MessageFormatException("Message to" + " send is" + " invalid."); mE.setLinkedException(jE); throw mE; } } else { // If not, building a new request and sending it: MessageFormatException mE = new MessageFormatException("Message to" + " send is" + " invalid."); throw mE; } if (transacted) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Buffering the message."); // If the session is transacted, keeping the request for later delivery: prepareSend( dest, (org.objectweb.joram.shared.messages.Message) momMsg.clone()); } else { ProducerMessages pM = new ProducerMessages(dest.getName(), (org.objectweb.joram.shared.messages.Message) momMsg.clone()); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Sending " + momMsg); if (asyncSend || (! momMsg.getPersistent())) { // Asynchronous sending pM.setAsyncSend(true); mtpx.sendRequest(pM); } else { requestor.request(pM); } } } /** * Called by MessageConsumer. The requestor raises an * exception if it is called during another request. * This cannot happen as a session is monothreaded. * A concurrent close first aborts the current request * so it releases the requestor for a subsequent use. */ synchronized AbstractJmsReply syncRequest( AbstractJmsRequest request) throws JMSException { return requestor.request(request); } final Connection getConnection() { return cnx; } final String getId() { return ident; } final RequestMultiplexer getRequestMultiplexer() { return mtpx; } public final boolean isAutoAck() { return autoAck; } private void activateMessageInput() throws JMSException { for (int i = 0; i < consumers.size(); i++) { MessageConsumer cons = (MessageConsumer) consumers.elementAt(i); cons.activateMessageInput(); } passiveMsgInput = false; } private void passivateMessageInput() throws JMSException { for (int i = 0; i < consumers.size(); i++) { MessageConsumer cons = (MessageConsumer) consumers.elementAt(i); cons.passivateMessageInput(); } passiveMsgInput = true; } /** * Set asyncSend for this Session. * * @param b */ public void setAsyncSend(boolean b) { asyncSend = b; } /** * Set queueMessageReadMax for this Session. * * @param i */ public void setQueueMessageReadMax(int i) { queueMessageReadMax = i; } public final int getQueueMessageReadMax() { return queueMessageReadMax; } public final int getTopicAckBufferMax() { return topicAckBufferMax; } public void setTopicAckBufferMax(int i) { topicAckBufferMax = i; } public final int getTopicActivationThreshold() { return topicActivationThreshold; } public void setTopicActivationThreshold(int i) { topicActivationThreshold = i; } public final int getTopicPassivationThreshold() { return topicPassivationThreshold; } public void setTopicPassivationThreshold(int i) { topicPassivationThreshold = i; } /** * The <code>SessionCloseTask</code> class is used by non-XA transacted * sessions for taking care of closing them if they tend to be pending, * and if a transaction timer has been set. */ private class SessionCloseTask extends TimerTask { private long txPendingTimer; SessionCloseTask(long txPendingTimer) { this.txPendingTimer = txPendingTimer; } /** Method called when the timer expires, actually closing the session. */ public void run() { try { if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, "Session closed " + "because of pending transaction"); close(); } catch (Exception e) {} } public void start() { try { mtpx.schedule(this, txPendingTimer); } catch (Exception e) {} } } /** * This thread controls the session in mode LISTENER. */ private class SessionDaemon extends fr.dyade.aaa.util.Daemon { SessionDaemon() { super("Connection#" + cnx + " - Session#" + ident); } public void run() { while (running) { canStop = true; MessageListenerContext ctx; try { ctx = (MessageListenerContext)repliesIn.get(); repliesIn.pop(); } catch (InterruptedException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "", exc); return; } canStop = false; try { onMessages(ctx); } catch (JMSException exc) { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "", exc); } } } Thread getThread() { return thread; } protected void shutdown() {} protected void close() {} } /** * Context used to associate a message consumer with * a set of messages to consume. */ private static class MessageListenerContext { SingleSessionConsumer consumerListener; ConsumerMessages messages; MessageListenerContext( SingleSessionConsumer consumerListener, ConsumerMessages messages) { this.consumerListener = consumerListener; this.messages = messages; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -