📄 transactionmanager.java
字号:
if (evt != null) { evt.addMessage (" prepare: " + p.getClass().getName() + (abort ? " ABORTED" : "") + (retry ? " RETRY" : "") + (pause ? " PAUSE" : "") + ((action & READONLY) == READONLY ? " READONLY" : "") + ((action & NO_JOIN) == NO_JOIN ? " NO_JOIN" : "")); } } if ((action & READONLY) == 0) { snapshot (id, context); } if ((action & NO_JOIN) == 0) { members.add (p); } if (p instanceof GroupSelector) { String groupName = null; try { groupName = ((GroupSelector)p).select (id, context); } catch (Exception e) { if (evt != null) evt.addMessage (" groupSelector " + p + " - " + e.getMessage()); else getLog().error (" groupSelector: " + p + " - " + e.getMessage()); } if (evt != null) evt.addMessage (" groupSelector: " + groupName); if (groupName != null) { StringTokenizer st = new StringTokenizer (groupName, " ,"); List participants = new ArrayList(); while (st.hasMoreTokens ()) { String grp = st.nextToken(); addGroup (id, grp); participants.addAll (getParticipants (grp)); } while (iter.hasNext()) participants.add (iter.next()); iter = participants.iterator(); continue; } } if (pause) { if (context instanceof Pausable) { Pausable pausable = (Pausable) context; TimerTask expirationMonitor = new PausedMonitor (pausable); PausedTransaction pt = new PausedTransaction ( this, id, members, iter, abort, expirationMonitor ); pausable.setPausedTransaction (pt); long t = pausable.getTimeout(); if (t == 0) t = pauseTimeout; if (t > 0) { synchronized (context) { if (!pt.isResumed()) { DefaultTimer.getTimer().schedule ( expirationMonitor, t ); } } } } else { throw new RuntimeException ("Unable to PAUSE transaction - Context is not Pausable"); } return PAUSE; } } return members.size() == 0 ? NO_JOIN : (abort ? (retry ? RETRY : ABORTED) : PREPARED); } protected List getParticipants (String groupName) { List participants = (List) groups.get (groupName); if (participants == null) participants = new ArrayList(); return participants; } protected List getParticipants (long id) { List participants = getParticipants (DEFAULT_GROUP); String key = getKey(GROUPS, id); String grp = null; while ( (grp = (String) psp.inp (key)) != null) { participants.addAll (getParticipants (grp)); } return participants; } protected void initParticipants (Element config) throws ConfigurationException { groups.put (DEFAULT_GROUP, initGroup (config)); Iterator iter = config.getChildren ("group").iterator(); while (iter.hasNext()) { Element e = (Element) iter.next(); String name = e.getAttributeValue ("name"); if (name == null) throw new ConfigurationException ("missing group name"); if (groups.get (name) != null) { throw new ConfigurationException ( "Group '" + name + "' already defined" ); } groups.put (name, initGroup (e)); } } protected ArrayList initGroup (Element e) throws ConfigurationException { ArrayList group = new ArrayList (); Iterator iter = e.getChildren ("participant").iterator(); while (iter.hasNext()) { group.add (createParticipant ((Element) iter.next())); } return group; } public TransactionParticipant createParticipant (Element e) throws ConfigurationException { QFactory factory = getFactory(); TransactionParticipant participant = (TransactionParticipant) factory.newInstance (e.getAttributeValue ("class") ); factory.setLogger (participant, e); factory.invoke (participant, "setTransactionManager", this, TransactionManager.class); factory.setConfiguration (participant, e); return participant; } public int getOutstandingTransactions() { if (sp instanceof LocalSpace) return ((LocalSpace)sp).size(queue); return -1; } protected String getKey (String prefix, long id) { StringBuffer sb = new StringBuffer (getName()); sb.append ('.'); sb.append (prefix); sb.append (Long.toString (id)); return sb.toString (); } protected long initCounter (String name, long defValue) { Long L = (Long) psp.rdp (name); if (L == null) { L = new Long (defValue); psp.out (name, L); } return L.longValue(); } protected void commitOff (Space sp) { if (sp instanceof JDBMSpace) { ((JDBMSpace) sp).setAutoCommit (false); } } protected void commitOn (Space sp) { if (sp instanceof JDBMSpace) { JDBMSpace jsp = (JDBMSpace) sp; jsp.commit (); jsp.setAutoCommit (true); } } protected void syncTail () { synchronized (psp) { commitOff (psp); psp.inp (TAIL); psp.out (TAIL, new Long (tail)); commitOn (psp); } } protected void initTailLock () { tailLock = TAILLOCK + "." + Integer.toString (this.hashCode()); SpaceUtil.wipe (sp, tailLock); sp.out (tailLock, TAILLOCK); } protected void checkTail () { Object lock = sp.in (tailLock); while (tailDone()) { // if (debug) { // getLog().debug ("tailDone " + tail); // } tail++; } syncTail (); sp.out (tailLock, lock); } protected boolean tailDone () { String stateKey = getKey (STATE, tail); Integer state = (Integer) psp.rdp (stateKey); if (DONE.equals (psp.rdp (stateKey))) { purge (tail); return true; } return false; } protected long nextId () { long h; synchronized (psp) { commitOff (psp); psp.in (HEAD); h = head; psp.out (HEAD, new Long (++head)); commitOn (psp); } return h; } protected void snapshot (long id, Serializable context) { snapshot (id, context, null); } protected void snapshot (long id, Serializable context, Integer status) { String contextKey = getKey (CONTEXT, id); synchronized (psp) { commitOff (psp); while (psp.inp (contextKey) != null) ; if (context != null) psp.out (contextKey, context); if (status != null) { String stateKey = getKey (STATE, id); while (psp.inp (stateKey) != null) ; psp.out (stateKey, status); } commitOn (psp); } } protected void setState (long id, Integer state) { String stateKey = getKey (STATE, id); synchronized (psp) { commitOff (psp); while (psp.inp (stateKey) != null) ; if (state!= null) psp.out (stateKey, state); commitOn (psp); } } protected void addGroup (long id, String groupName) { if (groupName != null) psp.out (getKey (GROUPS, id), groupName); } protected void purge (long id) { String stateKey = getKey (STATE, id); String contextKey = getKey (CONTEXT, id); String groupsKey = getKey (GROUPS, id); synchronized (psp) { commitOff (psp); while (psp.inp (stateKey) != null) ; while (psp.inp (contextKey) != null) ; while (psp.inp (groupsKey) != null) ; commitOn (psp); } } protected void recover () { if (doRecover) { if (tail < head) { getLog().info ("recover - tail=" +tail+", head="+head); } while (tail < head) { recover (tail++); } } else tail = head; syncTail (); } protected void recover (long id) { LogEvent evt = getLog().createLogEvent ("recover"); evt.addMessage ("<id>" + id + "</id>"); try { String stateKey = getKey (STATE, id); String contextKey = getKey (CONTEXT, id); Integer state = (Integer) psp.rdp (stateKey); if (state == null) { evt.addMessage ("unknown stateKey " + stateKey); SpaceUtil.wipe (psp, contextKey); // just in case ... return; } Serializable context = (Serializable) psp.rdp (contextKey); if (context != null) evt.addMessage (context); if (DONE.equals (state)) { evt.addMessage ("<done/>"); } else if (COMMITTING.equals (state)) { commit (id, context, getParticipants (id), true, evt); } else if (PREPARING.equals (state)) { abort (id, context, getParticipants (id), true, evt); } purge (id); } finally { Logger.log (evt); } } protected synchronized void checkRetryTask () { if (retryTask == null) { retryTask = new RetryTask(); new Thread(retryTask).start(); } } public class PausedMonitor extends TimerTask { Pausable context; public PausedMonitor (Pausable context) { super(); this.context = context; } public void run() { cancel(); context.getPausedTransaction().forceAbort(); context.resume(); } } public class RetryTask implements Runnable { public void run() { Thread.currentThread().setName (getName()+"retry-task"); while (running()) { for (Object context; (context = psp.rdp (RETRY_QUEUE)) != null;) { sp.out (queue, context); psp.inp (RETRY_QUEUE); } try { Thread.sleep (retryInterval); } catch (InterruptedException e) { } } } } public void setDebug (boolean debug) { this.debug = debug; } public boolean getDebug() { return debug; } public int getActiveSessions() { return activeSessions; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -