📄 transactionmanager.java
字号:
/* * jPOS Project [http://jpos.org] * Copyright (C) 2000-2008 Alejandro P. Revilla * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */package org.jpos.transaction;import java.io.Serializable;import java.util.Map;import java.util.List;import java.util.HashMap;import java.util.ArrayList;import java.util.Iterator;import java.util.TimerTask;import java.util.StringTokenizer;import org.jpos.util.NameRegistrar;import org.jdom.Element;import org.jpos.space.Space;import org.jpos.space.LocalSpace;import org.jpos.space.JDBMSpace;import org.jpos.space.SpaceFactory;import org.jpos.space.SpaceUtil;import org.jpos.q2.QFactory;import org.jpos.q2.QBeanSupport;import org.jpos.core.Configuration;import org.jpos.core.ConfigurationException;import org.jpos.util.Logger;import org.jpos.util.LogEvent;import org.jpos.util.DefaultTimer;public class TransactionManager extends QBeanSupport implements Runnable, TransactionConstants, TransactionManagerMBean{ Space sp; Space psp; String queue; String tailLock; Map groups; Thread[] threads; int activeSessions; boolean debug; boolean doRecover; long head, tail, lastGC; long retryInterval = 5000L; long pauseTimeout = 300*60*1000L; // five minutes RetryTask retryTask = null; public static final String HEAD = "$HEAD"; public static final String TAIL = "$TAIL"; public static final String CONTEXT = "$CONTEXT."; public static final String STATE = "$STATE."; public static final String GROUPS = "$GROUPS."; public static final String TAILLOCK = "$TAILLOCK"; public static final String RETRY_QUEUE = "$RETRY_QUEUE"; public static final String LAST_RETRY = "$LAST_RETRY"; public static final Integer PREPARING = new Integer (0); public static final Integer COMMITTING = new Integer (1); public static final Integer DONE = new Integer (2); public static final String DEFAULT_GROUP = ""; public static final long MAX_PARTICIPANTS = 1000; // loop prevention public void initService () throws ConfigurationException { queue = cfg.get ("queue", null); if (queue == null) throw new ConfigurationException ("queue property not specified"); sp = SpaceFactory.getSpace (cfg.get ("space")); psp = SpaceFactory.getSpace (cfg.get ("persistent-space", this.toString())); tail = initCounter (TAIL, cfg.getLong ("initial-tail", 1)); head = Math.max (initCounter (HEAD, tail), tail); initTailLock (); groups = new HashMap(); initParticipants (getPersist()); } public void startService () throws Exception { NameRegistrar.register (getName (), this); recover (); int sessions = cfg.getInt ("sessions", 1); threads = new Thread[sessions]; for (int i=0; i<sessions; i++) { Thread t = new Thread (this); t.setName (getName() + "-" + i); t.setDaemon (false); t.start (); threads[i] = t; } if (psp.rdp (RETRY_QUEUE) != null) checkRetryTask(); } public void stopService () throws Exception { NameRegistrar.unregister (getName ()); long sessions = cfg.getLong ("sessions", 1); for (int i=0; i<sessions; i++) { sp.out (queue, this, 60*1000); } for (int i=0; i<sessions; i++) { try { threads[i].join (60*1000); } catch (InterruptedException e) { getLog().warn ("Session " +i +" does not response - attempting to interrupt"); threads[i].interrupt(); } threads[i] = null; } } public void queue (Serializable context) { sp.out (queue, context); } public void push (Serializable context) { sp.push (queue, context); } public String getQueueName() { return queue; } public Space getSpace() { return this.sp; } public Space getPersistentSpace() { return this.psp; } public void run () { long id = 0; List members = null; Iterator iter = null; PausedTransaction pt; boolean abort = false; LogEvent evt = null; String threadName = Thread.currentThread().getName(); getLog().info (threadName + " start"); long startTime = 0L; synchronized (HEAD) { activeSessions++; } while (running()) { try { Object obj = sp.in (queue); if (obj == this) continue; // stopService ``hack'' if (!(obj instanceof Serializable)) { getLog().error ( "non serializable '" + obj.getClass().getName() + "' on queue '" + queue + "'" ); continue; } if (obj instanceof Pausable) { Pausable pausable = (Pausable) obj; pt = pausable.getPausedTransaction(); if (pt != null) { pt.cancelExpirationMonitor(); id = pt.id(); members = pt.members(); iter = pt.iterator(); abort = pt.isAborting(); } } else pt = null; if (pt == null) { abort = false; id = nextId (); members = new ArrayList (); iter = getParticipants (DEFAULT_GROUP).iterator(); } if (debug) { evt = getLog().createLogEvent ("debug", Thread.currentThread().getName() + ":" + Long.toString(id) + (pt != null ? " [resuming]" : "") ); startTime = System.currentTimeMillis(); } Serializable context = (Serializable) obj; snapshot (id, context, PREPARING); int action = prepare (id, context, members, iter, abort, evt); switch (action) { case PAUSE: break; case PREPARED: setState (id, COMMITTING); commit (id, context, members, false, evt); break; case ABORTED: abort (id, context, members, false, evt); break; case RETRY: psp.out (RETRY_QUEUE, context); checkRetryTask(); break; case NO_JOIN: break; } if ((action & PAUSE) == 0) { snapshot (id, null, DONE); if (id == tail) { checkTail (); } } } catch (Throwable t) { if (evt == null) getLog().fatal (t); // should never happen else evt.addMessage (t); } finally { if (evt != null) { evt.addMessage ("elapsed time: " + (System.currentTimeMillis() - startTime) + "ms" ); evt.addMessage ("head=" + head + ", tail=" + tail); Logger.log (evt); evt = null; } } } getLog().info (threadName + " stop"); synchronized (HEAD) { activeSessions--; } getLog().info ("stop " + Thread.currentThread() + ", active sessions=" + activeSessions); } public long getTail () { return tail; } public long getHead () { return head; } public void setConfiguration (Configuration cfg) throws ConfigurationException { super.setConfiguration (cfg); debug = cfg.getBoolean ("debug"); doRecover = cfg.getBoolean ("recover", true); retryInterval = cfg.getLong ("retry-interval", retryInterval); pauseTimeout = cfg.getLong ("pause-timeout", pauseTimeout); } protected void commit (long id, Serializable context, List members, boolean recover, LogEvent evt) { Iterator iter = members.iterator(); while (iter.hasNext ()) { TransactionParticipant p = (TransactionParticipant) iter.next(); if (recover && p instanceof ContextRecovery) { context = ((ContextRecovery) p).recover (id, context, true); if (evt != null) evt.addMessage (" commit-recover: " + p.getClass().getName()); } commit (p, id, context); if (evt != null) evt.addMessage (" commit: " + p.getClass().getName()); } } protected void abort (long id, Serializable context, List members, boolean recover, LogEvent evt) { Iterator iter = members.iterator(); while (iter.hasNext ()) { TransactionParticipant p = (TransactionParticipant) iter.next(); if (recover && p instanceof ContextRecovery) { context = ((ContextRecovery) p).recover (id, context, false); if (evt != null) evt.addMessage (" abort-recover: " + p.getClass().getName()); } abort (p, id, context); if (evt != null) evt.addMessage (" abort: " + p.getClass().getName()); } } protected int prepareForAbort (TransactionParticipant p, long id, Serializable context) { try { if (p instanceof AbortParticipant) return ((AbortParticipant)p).prepareForAbort (id, context); } catch (Throwable t) { getLog().warn ("PREPARE-FOR-ABORT: " + Long.toString (id), t); } return ABORTED | NO_JOIN; } protected int prepare (TransactionParticipant p, long id, Serializable context) { try { return p.prepare (id, context); } catch (Throwable t) { getLog().warn ("PREPARE: " + Long.toString (id), t); } return ABORTED; } protected void commit (TransactionParticipant p, long id, Serializable context) { try { p.commit (id, context); } catch (Throwable t) { getLog().warn ("COMMIT: " + Long.toString (id), t); } } protected void abort (TransactionParticipant p, long id, Serializable context) { try { p.abort (id, context); } catch (Throwable t) { getLog().warn ("ABORT: " + Long.toString (id), t); } } protected int prepare (long id, Serializable context, List members, Iterator iter, boolean abort, LogEvent evt) { boolean retry = false; boolean pause = false; for (int i=0; iter.hasNext (); i++) { int action = 0; if (i > MAX_PARTICIPANTS) { getLog().warn ( "loop detected - transaction " +id + " aborted." ); return ABORTED; } TransactionParticipant p = (TransactionParticipant) iter.next(); if (abort) { action = prepareForAbort (p, id, context); if (evt != null && (p instanceof AbortParticipant)) evt.addMessage ("prepareForAbort: " + p.getClass().getName()); } else { action = prepare (p, id, context); abort = (action & PREPARED) == ABORTED; retry = (action & RETRY) == RETRY; pause = (action & PAUSE) == PAUSE;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -