resourcemanager.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 1,362 行 · 第 1/4 页
JAVA
1,362 行
// ???? try { logTransactionState(xid, TransactionState.PREPARED); } catch (Exception exception) { throw new XAException("Error processing prepare : " + exception); } return XAResource.XA_OK; } /** * Inform the resource manager to roll back work done on behalf of a * transaction branch * * @throws XAException - if there is a problem completing the call */ public synchronized Xid[] recover(int flag) throws XAException { Xid[] result = new Xid[0]; if ((flag == XAResource.TMNOFLAGS) || (flag == XAResource.TMSTARTRSCAN) || (flag == XAResource.TMENDRSCAN)) { LinkedList xids = new LinkedList(); Iterator iter = _activeTransactions.keySet().iterator(); while (iter.hasNext()) { Xid xid = (Xid) iter.next(); LinkedList list = (LinkedList) _activeTransactions.get(xid); if (list.size() > 1) { // need at least a start in the chain. Object last = list.getLast(); if ((last instanceof StateTransactionLogEntry) && (((StateTransactionLogEntry) last).getState() .isPrepared())) { xids.add(xid); } } } result = (Xid[]) xids.toArray(); } return result; } /** * Set the current transaction timeout value for this XAResource instance. * * @throws XAException - if there is a problem completing the call */ public synchronized void rollback(Xid id) throws XAException { //check the xid is not null if (id == null) { throw new XAException(XAException.XAER_NOTA); } // covert to our internal representation of an xid ExternalXid xid = new ExternalXid(id); // check to see that the xid actually exists if (!isTransactionActive(xid)) { throw new XAException(XAException.XAER_PROTO); } // process the data in that transaction. If it was a published message // then drop it. If it was a consumed message then return it back to // the destination. try { // retrieve a list of recrods for the specified global transaction // and process them. Ignore the state records and only process the // data records, which are of type TransacitonalObjectWrapper. Object[] records = getTransactionRecords(xid, _rid); for (int index = 0; index < records.length; index++) { if (records[index] instanceof TransactionalObjectWrapper) { TransactionalObjectWrapper wrapper = (TransactionalObjectWrapper) records[index]; if (wrapper.isPublishedMessage()) { // we don't need to process these messages since the // global transaction has been rolled back. } else if (wrapper.isReceivedMessage()) { ReceivedMessageWrapper rmsg_wrapper = (ReceivedMessageWrapper) wrapper; MessageHandle handle = (MessageHandle) rmsg_wrapper.getObject(); JmsDestination dest = handle.getDestination(); DestinationCache cache = _destinations.getDestinationCache(dest); cache.returnMessageHandle(handle); } } else { // ignore since it is a state records. } } } catch (Exception exception) { throw new XAException("Failed in ResourceManager.rollback : " + exception.toString()); } finally { // and now mark the transaction as closed try { logTransactionState(xid, TransactionState.CLOSED); } catch (Exception exception) { throw new XAException( "Error processing rollback : " + exception); } } } /** * Start work on behalf of a transaction branch specified in xid If TMJOIN * is specified, the start is for joining a transaction previously seen by * the resource manager * * @throws XAException - if there is a problem completing the call */ public synchronized boolean setTransactionTimeout(int seconds) throws XAException { _txExpiryTime = seconds; return true; } // implementation of XAResource.start public synchronized void start(Xid id, int flags) throws XAException { //check the xid is not null if (id == null) { throw new XAException(XAException.XAER_NOTA); } // covert to our internal representation of an xid ExternalXid xid = new ExternalXid(id); // check that the flags are valid for this method if ((flags != XAResource.TMNOFLAGS) || (flags != XAResource.TMJOIN) || (flags != XAResource.TMRESUME)) { throw new XAException(XAException.XAER_PROTO); } switch (flags) { case XAResource.TMNOFLAGS: // check to see that the xid does not already exist if (isTransactionActive(xid)) { throw new XAException(XAException.XAER_DUPID); } // otherwise log the start of the transaction try { logTransactionState(xid, TransactionState.OPENED); } catch (Exception exception) { throw new XAException( "Error processing start : " + exception); } break; case XAResource.TMJOIN: case XAResource.TMRESUME: // joining a transaction previously seen by the resource // manager if (!isTransactionActive(xid)) { throw new XAException(XAException.XAER_PROTO); } break; } } /** * Return the resource manager identity * * @return the resource manager identity */ public String getResourceManagerId() { return _rid; } /** * Create the next {@link TransactionLog} and add it to the list of managed * transaction logs. * <p/> * The method will throw ResourceManagerException if there is a problem * completing the request. * * @throws ResourceManagerException */ protected TransactionLog createNextTransactionLog() throws ResourceManagerException { TransactionLog newlog = null; synchronized (_logs) { try { // get the last log number long last = 1; if (!_logs.isEmpty()) { last = getSequenceNumber( ((TransactionLog) _logs.last()).getName()); } // now that we have the last log number, increment it and use // it to build the name of the next log file. String name = _logDirectory + System.getProperty("file.separator") + RM_LOGFILE_PREFIX + Long.toString(++last) + RM_LOGFILE_EXTENSION; // create a transaction log and add it to the collection newlog = new TransactionLog(name, true); _logs.add(newlog); } catch (TransactionLogException exception) { throw new ResourceManagerException( "Error in createNextTransactionLog " + exception); } } return newlog; } /** * Build a list of all log files in the specified log directory * * @throws IllegalArgumentException - if the directory does not exist. */ protected void buildLogFileList() { File dir = new File(_logDirectory); if ((!dir.exists()) || (!dir.isDirectory())) { throw new IllegalArgumentException(_logDirectory + " is not a directory"); } try { File[] list = dir.listFiles(new FilenameFilter() { // implementation of FilenameFilter.accept public boolean accept(File dir, String name) { boolean result = false; if ((name.startsWith(RM_LOGFILE_PREFIX)) && (name.endsWith(RM_LOGFILE_EXTENSION))) { result = true; } return result; } }); // add the files to the list synchronized (_logs) { for (int index = 0; index < list.length; index++) { _logs.add(new TransactionLog(list[index].getPath(), false)); } } } catch (Exception exception) { // replace this with the exception strategy exception.printStackTrace(); } } /** * This method will process all the transaction logs, in the log diretory * and call recover on each of them. * * @throws ResourceManagerException - if there is a problem recovering */ private synchronized void recover() throws ResourceManagerException { try { if (!_logs.isEmpty()) { Iterator iter = _logs.iterator(); while (iter.hasNext()) { TransactionLog log = (TransactionLog) iter.next(); HashMap records = log.recover(); } } } catch (Exception exception) { throw new ResourceManagerException("Error in recover " + exception.toString()); } } /** * Retrieve the transaction log for the specified transaction id * * @param txid - the transaction identity * @return TransactionLog * @throws TransactionLogException - if there is tx log exception * @throws ResourceManagerException - if there is a resource problem. */ private TransactionLog getTransactionLog(ExternalXid txid) throws TransactionLogException, ResourceManagerException { TransactionLog log = (TransactionLog) _tridToLogCache.get(txid); if (log == null) { log = getCurrentTransactionLog(); addTridLogEntry(txid, log); } return log; } /** * Get the current transaction log. It will check the last transaction log * opened by the resource manager and determine whether there is space * enough to process another transaction. * <p/> * If there is space enough then it will return that transaction, otherwise * it will create a new transaction log for the resource * * @return TransactionLog - the transaction log to use * @throws ResourceManagerException * @throws TransactionLogException */ private TransactionLog getCurrentTransactionLog() throws TransactionLogException, ResourceManagerException { TransactionLog log = null; synchronized (_logs) { if (_logs.size() > 0) { log = (TransactionLog) _logs.last(); } if ((log == null) || (log.size() > _logFileSize)) { log = createNextTransactionLog(); } } return log; } /** * Add an entry to the trid log cache table for the specified trid and * transaction log mapping. * * @param trid - the transaction identifier * @param log - the transaction log */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?