⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jmssession.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 3 页
字号:

        // send all the cached messages to the server
        getJmsSessionStub().sendMessages(_messagesToSend);
        _publishCount += _messagesToSend.size();
        _messagesToSend.clear();

        // commit the session
        getJmsSessionStub().commit();
    }

    /**
     * Rollback any messages done in this transaction
     *
     * @throws JMSException if the transaction cannot be rolled back
     */
    public synchronized void rollback() throws JMSException {
        ensureOpen();
        ensureTransactional();

        // clear all the cached messages
        _messagesToSend.clear();

        // rollback the session
        getJmsSessionStub().rollback();
    }

    /**
     * Close the session. This call will block until a receive or message
     * listener in progress has completed. A blocked message consumer receive
     * call returns <code>null</code> when this session is closed.
     *
     * @throws JMSException if the session can't be closed
     */
    public synchronized void close() throws JMSException {
        if (!_closed) {
            _closing = true;

            // signal the stub that we are preparing to close the
            // connection.
            getJmsSessionStub().beforeClose();

            // must stop first before we close
            stop();

            // wake up any blocking consumers
            notifyConsumers();

            // go through all the producer and call close on them
            // respectively
            Enumeration producers = getProducers();
            while (producers.hasMoreElements()) {
                JmsMessageProducer producer =
                    (JmsMessageProducer) producers.nextElement();
                producer.close();
            }

            // go through all the consumer and call close on them
            // respectively
            Enumeration consumers = getConsumers();
            while (consumers.hasMoreElements()) {
                JmsMessageConsumer consumer =
                    (JmsMessageConsumer) consumers.nextElement();
                consumer.close();
            }

            // deregister this with the connection
            _connection.removeSession(this);
            _connection = null;

            // clear any cached messages or acks
            _messagesToSend.clear();

            // issue a close to the remote session. This will release any
            // allocated remote resources
            getJmsSessionStub().close();
            _stub = null;

            // update the session state
            _closed = true;
            _closing = false;
        }
    }

    /**
     * Stop message delivery in this session, and restart sending messages with
     * the oldest unacknowledged message
     *
     * @throws JMSException if the session can't be recovered
     */
    public synchronized void recover() throws JMSException {
        ensureOpen();
        if (!_transacted) {
            // let the server handle the recovery
            getJmsSessionStub().recover();
        } else {
            throw new IllegalStateException(
                "Cannot recover from a transacted session");
        }
    }

    /**
     * Returns the message listener associated with the session
     *
     * @return the message listener associated with the session, or
     * <code>null</code> if no listener is registered
     * @throws JMSException if the session is closed
     */
    public MessageListener getMessageListener() throws JMSException {
        ensureOpen();
        return _listener;
    }

    /**
     * Sets the session's message listener.
     *
     * @param listener the session's message listener
     * @throws JMSException if the session is closed
     */
    public void setMessageListener(MessageListener listener)
        throws JMSException {
        ensureOpen();
        _listener = listener;
    }

    /**
     * Iterates through the list of messages added by an
     * {@link JmsConnectionConsumer}, sending them to the registered listener
     */
    public void run() {
        try {
            while (!_messageCache.isEmpty()) {
                Message message = (Message) _messageCache.remove(0);
                _listener.onMessage(message);
            }
        } catch (Exception exception) {
            _log.error("Error in the Session.run()", exception);
        } finally {
            // Clear message cache
            _messageCache.clear();
        }
    }

    /**
     * Set the message listener for a particular consumer.
     * <p>
     * If a listener is already registered for the consumer, it will be
     * automatically overwritten
     *
     * @param listener the message listener
     * @throws JMSException if the listener can't be set
     */
    public void setMessageListener(JmsMessageConsumer listener)
        throws JMSException {
        ensureOpen();
        enableAsynchronousDelivery(listener.getClientId(),
                                   listener.getLastMessageDelivered(), true);
    }

    /**
     * Remove a message listener
     *
     * @param listener the message listener to remove
     * @throws JMSException if the listener can't be removed
     */
    public void removeMessageListener(JmsMessageConsumer listener)
        throws JMSException {

        ensureOpen();
        enableAsynchronousDelivery(listener.getClientId(),
            listener.getLastMessageDelivered(), false);
    }

    /**
     * This will start message delivery to this session. If message delivery
     * has already started then this is a no-op.
     *
     * @throws JMSException if message delivery can't be started
     */
    public void start() throws JMSException {
        ensureOpen();
        if (_stopped) {
            getJmsSessionStub().startMessageDelivery();
            _stopped = false;

            // wake up any blocking consumers
            notifyConsumers();
        }
    }

    /**
     * This will stop message delivery to this session. If message delivery
     * has already stoped then this is a no-op.
     *
     * @throws JMSException if message delivery can't be stopped
     */
    public void stop() throws JMSException {
        ensureOpen();
        if (!_stopped) {
            getJmsSessionStub().stopMessageDelivery();
            _stopped = true;

            // wake up any blocking consumers
            notifyConsumers();
        }
    }

    /**
     * Acknowledge the specified message. This is only applicable for
     * CLIENT_ACKNOWLEDGE sessions. For other session types, the request
     * is ignored.
     * <p>
     * Acking a message automatically acks all those that have come
     * before it.
     *
     * @param message the message to acknowledge
     * @throws JMSException if the message can't be acknowledged
     */
    public void acknowledgeMessage(Message message) throws JMSException {
        ensureOpen();
        if (_ackMode == Session.CLIENT_ACKNOWLEDGE) {
            MessageImpl impl = (MessageImpl) message;
            getJmsSessionStub().acknowledgeMessage(impl.getClientId(),
                                                   impl.getAckMessageID());
        }
    }

    /**
     * Enable or disable asynchronous message delivery for the specified
     * client.
     *
     * @param clientId - the client identity
     * @param id - the last message delivered asynchronously
     * @param enable - <code>true</code> to enable; <code>false</code> to
     * disable
     * @throws JMSException if message delivery cannot be enabled or disabled
     */
    public void enableAsynchronousDelivery(long clientId, String id,
                                           boolean enable)
        throws JMSException {

        ensureOpen();
        getJmsSessionStub().enableAsynchronousDelivery(clientId, id, enable);
    }

    /**
     * Asynchronously deliver a message to a <code>MessageConsumer</code>
     *
     * @param message the message to deliver
     */
    public void onMessage(Message message) {
        if (message != null) {
            MessageImpl impl = (MessageImpl) message;
            impl.setJMSXRcvTimestamp(System.currentTimeMillis());

            // dispatch the message;
            execute(message);
        }
    }

    /**
     * Asynchronously deliver a set of message to a
     * <code>MessageConsumer</code>
     *
     * @param messages the messages to deliver
     */
    public void onMessages(Vector messages) {
        while (messages.size() > 0) {
            onMessage((Message) messages.remove(0));
        }
    }

    /**
     * Inform the session that there is a message available
     * for the message consumer with the specified identity
     *
     * @param clientId the identity of the client
     */
    public void onMessageAvailable(long clientId) {
        // wake up any blocking consumers
        notifyConsumers();
    }

    /**
     * This is the called to process messages asynchronously delivered by the
     * server. The session is then responsible for delivering it to the
     * appropriate registered consumer. If it cannot resolve the consumer then
     * it must log an exception
     * <p>
     * If the session has a registered listener then all messages will be
     * delivered to the session's listener instead of the individual consumer
     * message listeners.
     *
     * @param       object         received message
     */
    public synchronized void execute(Object object) {
        // if the session is closed then drop the object
        if (_closed) {
            _log.error("Received a message for a closed session");
            return;
        }

        MessageImpl message = (MessageImpl) object;
        long clientId = message.getClientId();
        JmsMessageConsumer consumer =
            (JmsMessageConsumer) _consumers.get(new Long(clientId));

        // tag the session that received this message
        message.setSession(this);
        if (consumer != null) {
            // if a listener is defined for the session then send all the
            // messages to that listener regardless if any consumers are
            // have registered listeners...bit confusing but this is what
            // I believe it should do
            if (_listener != null) {
                _listener.onMessage(message);
            } else {
                // send it to the appropriate consumer
                consumer.onMessage(message);
            }
        } else {
            // consumer no longer active...so drop the message
            _log.error("Received a message for an inactive consumer");
        }
    }

    /**
     * Returns the session identifier
     *
     * @return the session identifier
     */
    public String getSessionId() {
        return _sessionId;
    }

    /**
     * Return the acknowledgement mode for the session
     *
     * @return the acknowledgement mode for the session
     */
    public int getAckMode() {
        return _ackMode;
    }

    /**
     * Fetch the next message for this client. If the session's ackMode is
     * client acknowledge then set the session for the message, othwerwise
     * ack the message before returning it.
     *
     * @param clientId the consumer identififer.
     * @param wait the maximum time to wait for a message, in milliseconds.
     * If <code>-1</code>, don't wait, if <code>0</code> wait indefinitely,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -