⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 batchingrdbmsadapter.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    // implementation of PersistenceAdapter.getQueueMessageCount
    public Vector getNonExpiredMessages(Connection connection,
                                        JmsDestination destination)
        throws PersistenceException {
        flush();
        return _rdbms.getNonExpiredMessages(connection, destination);
    }

    /**
     * Return a connection to the database from the pool of connections. It
     * will throw an PersistenceException if it cannot retrieve a connection.
     * The client should close the connection normally, since the pool is a
     * connection event listener.
     *
     * @return Connection - a pooled connection or null
     * @exception PersistenceException - if it cannot retrieve a connection
     */
    public Connection getConnection()
        throws PersistenceException {

        return _rdbms.getConnection();
    }

    /**
     * Purge all processed messages from the database.
     *
     * @return int - the number of messages deleted
     */
    public synchronized int purgeMessages() {
        try {
            flush();
        } catch (PersistenceException exception) {
            _log.error("Error in purgeMessages " + exception);
        }

        return _rdbms.purgeMessages();
    }

    // implementation of EventHandler.handleEvent
    public void handleEvent(int event, Object callback, long time) {
        _rdbms.handleEvent(event, callback, time);
    }

    // implementation of EventHandler.getHandle
    public HandleIfc getHandle() {
        return null;
    }

    /**
     * Close the current piece of work and commit it to the database. This is
     * called before a query or fetch is executed on the data.
     * <p>
     * It will grab a connection and commit the transactional objects before
     * returning. If there is any problem it will throw a PersistenceException
     * excpetion
     *
     * @throws PersistenceException
     */
    private synchronized void flush() throws PersistenceException {
        if (_batch.size() == 0) {
            return;
        }

        // need to do this in a separate thread since the current thread is
        // already associated with a Connection object
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Connection connection = null;
                try {
                    connection = _rdbms.getConnection();

                    Iterator iter = _batch.iterator();
                    while (iter.hasNext()) {
                        TransactionalObjectWrapper wrapper =
                            (TransactionalObjectWrapper) iter.next();
                        switch (wrapper._action) {
                            case TransactionalObjectWrapper.ADD_MESSAGE:
                                _rdbms.addMessage(connection,
                                    (MessageImpl) wrapper._object);
                                break;

                            case TransactionalObjectWrapper.UPDATE_MESSAGE:
                                _rdbms.updateMessage(connection,
                                    (MessageImpl) wrapper._object);
                                break;

                            case TransactionalObjectWrapper.DELETE_MESSAGE:
                                _rdbms.removeMessage(connection,
                                    (String) wrapper._object);
                                break;

                            case TransactionalObjectWrapper.ADD_HANDLE:
                                _rdbms.addMessageHandle(connection,
                                    (PersistentMessageHandle) wrapper._object);
                                break;

                            case TransactionalObjectWrapper.UPDATE_HANDLE:
                                _rdbms.updateMessageHandle(connection,
                                    (PersistentMessageHandle) wrapper._object);
                                break;

                            case TransactionalObjectWrapper.DELETE_HANDLE:
                                _rdbms.removeMessageHandle(connection,
                                    (PersistentMessageHandle) wrapper._object);
                                break;
                        }
                    }
                    connection.commit();

                    // if the commit has worked then flush the batch list
                    _batch.clear();
                    _messages.clear();
                    _handles.clear();
                } catch (PersistenceException exception) {
                    SQLHelper.rollback(connection);
                    _log.error("Failure in flush()", exception);
                } catch (Exception exception) {
                    _log.error("Failure in flush()", exception);
                } finally {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Exception nested) {
                            _log.error("Failure in flush()", nested);
                        }
                    }
                }
            }
        });

        // start the thread.
        thread.start();

        // wait for the thread to finish...sort of defeat the purpose here.
        try {
            thread.join();
        } catch (InterruptedException exception) {
            // ignore the error
        }
    }

    /**
     * Add this transactional object along with the associated action to the
     * current batch job.
     *
     * @param action - the action to take
     * @param object - the transactional object
     * @throws PersistenceException
     */
    private synchronized void addToBatch(int action, Object object)
        throws PersistenceException {
        if (_batch.size() >= _maxStatementsToBatch) {
            flush();
        }

        switch (action) {
            case TransactionalObjectWrapper.ADD_MESSAGE:
                {
                    TransactionalObjectWrapper txobj =
                        new TransactionalObjectWrapper(action, object);
                    MessageImpl message = (MessageImpl) object;
                    MessageId id = message.getMessageId();
                    if (_messages.containsKey(id)) {
                        throw new PersistenceException("Inconsistency in cache " +
                            id + " is present when it shouldn't be.");
                    }
                    _messages.put(id, txobj);
                    _batch.addLast(txobj);
                    break;
                }

            case TransactionalObjectWrapper.UPDATE_MESSAGE:
                {
                    MessageImpl message = (MessageImpl) object;
                    MessageId id = message.getMessageId();
                    TransactionalObjectWrapper txobj =
                        (TransactionalObjectWrapper) _messages.get(id);
                    TransactionalObjectWrapper newtxobj =
                        new TransactionalObjectWrapper(action, object);

                    if (txobj != null) {
                        // if the batch already contains a entry for this
                        // message then remove it and add the new version
                        // with an
                        _batch.remove(txobj);
                        if (txobj._action == TransactionalObjectWrapper.ADD_MESSAGE) {
                            newtxobj._action = TransactionalObjectWrapper.ADD_MESSAGE;
                            _batch.addLast(newtxobj);
                        } else if (txobj._action == TransactionalObjectWrapper.UPDATE_MESSAGE) {
                            _batch.addLast(newtxobj);
                        } else {
                            // could only be a delete and should never happen
                            throw new PersistenceException("Inconsistency in cache." +
                                " Cannot update a deleted message.");
                        }
                    } else {
                        _batch.addLast(newtxobj);
                    }
                    _messages.put(id, newtxobj);
                    break;
                }

            case TransactionalObjectWrapper.DELETE_MESSAGE:
                {
                    MessageImpl message = (MessageImpl) object;
                    MessageId id = message.getMessageId();
                    TransactionalObjectWrapper txobj =
                        (TransactionalObjectWrapper) _messages.get(id);
                    TransactionalObjectWrapper newtxobj =
                        new TransactionalObjectWrapper(action, object);

                    if (txobj != null) {
                        // if the batch already contains a entry for this
                        // message then remove it and add the new version
                        // with an
                        _batch.remove(txobj);
                        if (txobj._action == TransactionalObjectWrapper.ADD_MESSAGE) {
                            // if an add and the delete happened in the same batch then
                            // we don't have to commit either of the transactions.
                        } else if (txobj._action == TransactionalObjectWrapper.UPDATE_MESSAGE) {
                            // if the update and the delete happened in the same batch then
                            // we need add the transaction
                            _batch.addLast(newtxobj);
                        } else {
                            // if the delete already exists then simple ignore it.
                        }
                    } else {
                        _batch.addLast(newtxobj);
                    }
                    _messages.put(id, newtxobj);
                    break;
                }

            case TransactionalObjectWrapper.ADD_HANDLE:
                {
                    TransactionalObjectWrapper txobj =
                        new TransactionalObjectWrapper(action, object);
                    PersistentMessageHandle handle = (PersistentMessageHandle) object;
                    if (_handles.containsKey(handle)) {
                        throw new PersistenceException("Inconsistency in cache " +
                            handle + " is present when it shouldn't be.");
                    }
                    _handles.put(handle, txobj);
                    _batch.addLast(txobj);
                    break;
                }

            case TransactionalObjectWrapper.UPDATE_HANDLE:
                {
                    PersistentMessageHandle handle = (PersistentMessageHandle) object;
                    TransactionalObjectWrapper txobj =
                        (TransactionalObjectWrapper) _handles.get(handle);
                    TransactionalObjectWrapper newtxobj =
                        new TransactionalObjectWrapper(action, object);

                    if (txobj != null) {
                        // if the batch already contains a entry for this
                        // handle then remove it and add the new version.
                        _batch.remove(txobj);
                        if (txobj._action == TransactionalObjectWrapper.ADD_HANDLE) {
                            newtxobj._action = TransactionalObjectWrapper.ADD_HANDLE;
                            _batch.addLast(newtxobj);
                        } else if (txobj._action == TransactionalObjectWrapper.UPDATE_HANDLE) {
                            _batch.addLast(newtxobj);
                        } else {
                            // could only be a delete and should never happen
                            throw new PersistenceException("Inconsistency in cache." +
                                " Cannot update a deleted handle.");
                        }
                    } else {
                        _batch.addLast(newtxobj);
                    }
                    _handles.put(handle, newtxobj);
                    break;
                }

            case TransactionalObjectWrapper.DELETE_HANDLE:
                {
                    PersistentMessageHandle handle = (PersistentMessageHandle) object;
                    TransactionalObjectWrapper txobj =
                        (TransactionalObjectWrapper) _handles.get(handle);
                    TransactionalObjectWrapper newtxobj =
                        new TransactionalObjectWrapper(action, object);

                    if (txobj != null) {
                        // if the batch already contains a entry for this
                        // message then remove it and add the new version
                        // with an
                        _batch.remove(txobj);
                        if (txobj._action == TransactionalObjectWrapper.ADD_HANDLE) {
                            // if an add and the delete happenedd in the same batch then
                            // we don't have to commit either of the transactions.
                        } else if (txobj._action == TransactionalObjectWrapper.UPDATE_HANDLE) {
                            // if the update and the delete happened in the same batch then
                            // we need add the transaction
                            _batch.addLast(newtxobj);
                        } else {
                            // if the delete already exists then simple ignore it.
                        }
                    } else {
                        _batch.addLast(newtxobj);
                    }
                    _handles.put(handle, newtxobj);
                    break;
                }
        }

    }

    public void addUser(Connection connection, User user)
        throws PersistenceException {
    }

    public Enumeration getAllUsers(Connection connection)
        throws PersistenceException {
        return null;
    }

    public User getUser(Connection connection, User user)
        throws PersistenceException {
        return null;
    }

    public void removeUser(Connection connection, User user)
        throws PersistenceException {
    }

    public void updateUser(Connection connection, User user)
        throws PersistenceException {
    }

    /**
     * Inner class that wraps up transactional objects so that they can
     * correctly be processed in a batch transaction
     */
    private class TransactionalObjectWrapper {

        /**
         * This is the list of actions that the wrapper supports.
         */
        public final static int ADD_MESSAGE = 1;
        public final static int UPDATE_MESSAGE = 2;
        public final static int DELETE_MESSAGE = 3;
        public final static int ADD_HANDLE = 4;
        public final static int UPDATE_HANDLE = 5;
        public final static int DELETE_HANDLE = 6;

        /**
         * Action associated with the transactional object
         */
        public int _action;

        /**
         * The transactional object
         */
        public Object _object;

        /**
         * Constructor using the action and object
         */
        public TransactionalObjectWrapper(int action, Object object) {
            _action = action;
            _object = object;
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -