⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transactionmanager.java

📁 POS is a Java&#174 platform-based, mission-critical, ISO-8583 based financial transaction library/fr
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * jPOS Project [http://jpos.org] * Copyright (C) 2000-2008 Alejandro P. Revilla * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program.  If not, see <http://www.gnu.org/licenses/>. */package org.jpos.transaction;import java.io.Serializable;import java.util.Map;import java.util.List;import java.util.HashMap;import java.util.ArrayList;import java.util.Iterator;import java.util.TimerTask;import java.util.StringTokenizer;import org.jpos.util.NameRegistrar;import org.jdom.Element;import org.jpos.space.Space;import org.jpos.space.LocalSpace;import org.jpos.space.JDBMSpace;import org.jpos.space.SpaceFactory;import org.jpos.space.SpaceUtil;import org.jpos.q2.QFactory;import org.jpos.q2.QBeanSupport;import org.jpos.core.Configuration;import org.jpos.core.ConfigurationException;import org.jpos.util.Logger;import org.jpos.util.LogEvent;import org.jpos.util.DefaultTimer;public class TransactionManager     extends QBeanSupport     implements Runnable, TransactionConstants, TransactionManagerMBean{    Space sp;    Space psp;    String queue;    String tailLock;    Map groups;    Thread[] threads;    int activeSessions;    boolean debug;    boolean doRecover;    long head, tail, lastGC;    long retryInterval = 5000L;    long pauseTimeout  = 300*60*1000L;  // five minutes    RetryTask retryTask = null;    public static final String  HEAD       = "$HEAD";    public static final String  TAIL       = "$TAIL";    public static final String  CONTEXT    = "$CONTEXT.";    public static final String  STATE      = "$STATE.";    public static final String  GROUPS     = "$GROUPS.";    public static final String  TAILLOCK   = "$TAILLOCK";    public static final String  RETRY_QUEUE = "$RETRY_QUEUE";    public static final String  LAST_RETRY = "$LAST_RETRY";    public static final Integer PREPARING  = new Integer (0);    public static final Integer COMMITTING = new Integer (1);    public static final Integer DONE       = new Integer (2);    public static final String  DEFAULT_GROUP = "";    public static final long    MAX_PARTICIPANTS = 1000;  // loop prevention    public void initService () throws ConfigurationException {        queue = cfg.get ("queue", null);        if (queue == null)            throw new ConfigurationException ("queue property not specified");        sp   = SpaceFactory.getSpace (cfg.get ("space"));        psp  = SpaceFactory.getSpace (cfg.get ("persistent-space", this.toString()));        tail = initCounter (TAIL, cfg.getLong ("initial-tail", 1));        head = Math.max (initCounter (HEAD, tail), tail);        initTailLock ();        groups = new HashMap();        initParticipants (getPersist());    }    public void startService () throws Exception {        NameRegistrar.register (getName (), this);        recover ();        int sessions = cfg.getInt ("sessions", 1);        threads = new Thread[sessions];        for (int i=0; i<sessions; i++) {            Thread t = new Thread (this);            t.setName (getName() + "-" + i);            t.setDaemon (false);            t.start ();            threads[i] = t;        }        if (psp.rdp (RETRY_QUEUE) != null)            checkRetryTask();    }    public void stopService () throws Exception {        NameRegistrar.unregister (getName ());        long sessions = cfg.getLong ("sessions", 1);        for (int i=0; i<sessions; i++) {            sp.out (queue, this, 60*1000);        }        for (int i=0; i<sessions; i++) {            try {                threads[i].join (60*1000);            } catch (InterruptedException e) {                getLog().warn ("Session " +i +" does not response - attempting to interrupt");                threads[i].interrupt();            }            threads[i] = null;        }    }    public void queue (Serializable context) {        sp.out (queue, context);    }    public void push (Serializable context) {        sp.push (queue, context);    }    public String getQueueName() {        return queue;    }    public Space getSpace() {        return this.sp;    }    public Space getPersistentSpace() {        return this.psp;    }    public void run () {        long id = 0;        List members = null;        Iterator iter = null;        PausedTransaction pt;        boolean abort = false;        LogEvent evt = null;        String threadName = Thread.currentThread().getName();        getLog().info (threadName + " start");        long startTime = 0L;        synchronized (HEAD) {             activeSessions++;        }        while (running()) {            try {                Object obj = sp.in (queue);                if (obj == this)                    continue;   // stopService ``hack''                if (!(obj instanceof Serializable)) {                    getLog().error (                        "non serializable '" + obj.getClass().getName()                       + "' on queue '" + queue + "'"                    );                    continue;                }                if (obj instanceof Pausable) {                    Pausable pausable = (Pausable) obj;                    pt = pausable.getPausedTransaction();                    if (pt != null) {                        pt.cancelExpirationMonitor();                        id      = pt.id();                        members = pt.members();                        iter    = pt.iterator();                        abort   = pt.isAborting();                    }                } else                     pt = null;                if (pt == null) {                    abort = false;                    id = nextId ();                    members = new ArrayList ();                    iter = getParticipants (DEFAULT_GROUP).iterator();                }                if (debug) {                    evt = getLog().createLogEvent ("debug",                        Thread.currentThread().getName()                         + ":" + Long.toString(id) +                        (pt != null ? " [resuming]" : "")                    );                    startTime = System.currentTimeMillis();                }                Serializable context = (Serializable) obj;                snapshot (id, context, PREPARING);                int action = prepare (id, context, members, iter, abort, evt);                switch (action) {                    case PAUSE:                        break;                    case PREPARED:                        setState (id, COMMITTING);                        commit (id, context, members, false, evt);                        break;                    case ABORTED:                        abort (id, context, members, false, evt);                        break;                    case RETRY:                        psp.out (RETRY_QUEUE, context);                        checkRetryTask();                        break;                    case NO_JOIN:                        break;                }                if ((action & PAUSE) == 0) {                    snapshot (id, null, DONE);                    if (id == tail) {                        checkTail ();                    }                }            } catch (Throwable t) {                if (evt == null)                    getLog().fatal (t); // should never happen                else                    evt.addMessage (t);            } finally {                if (evt != null) {                    evt.addMessage ("elapsed time: "                         + (System.currentTimeMillis() - startTime) + "ms"                    );                    evt.addMessage ("head=" + head + ", tail=" + tail);                    Logger.log (evt);                    evt = null;                }            }        }        getLog().info (threadName + " stop");        synchronized (HEAD) {            activeSessions--;        }        getLog().info ("stop " + Thread.currentThread() + ", active sessions=" + activeSessions);    }    public long getTail () {        return tail;    }    public long getHead () {        return head;    }    public void setConfiguration (Configuration cfg)         throws ConfigurationException     {        super.setConfiguration (cfg);        debug = cfg.getBoolean ("debug");        doRecover = cfg.getBoolean ("recover", true);        retryInterval = cfg.getLong ("retry-interval", retryInterval);        pauseTimeout  = cfg.getLong ("pause-timeout", pauseTimeout);    }    protected void commit         (long id, Serializable context, List members, boolean recover, LogEvent evt)     {        Iterator iter = members.iterator();        while (iter.hasNext ()) {            TransactionParticipant p = (TransactionParticipant) iter.next();            if (recover && p instanceof ContextRecovery) {                context = ((ContextRecovery) p).recover (id, context, true);                if (evt != null)                    evt.addMessage (" commit-recover: " + p.getClass().getName());            }            commit (p, id, context);            if (evt != null)                evt.addMessage ("         commit: " + p.getClass().getName());        }    }    protected void abort         (long id, Serializable context, List members, boolean recover, LogEvent evt)     {        Iterator iter = members.iterator();        while (iter.hasNext ()) {            TransactionParticipant p = (TransactionParticipant) iter.next();            if (recover && p instanceof ContextRecovery) {                context = ((ContextRecovery) p).recover (id, context, false);                if (evt != null)                    evt.addMessage ("  abort-recover: " + p.getClass().getName());            }            abort (p, id, context);            if (evt != null)                evt.addMessage ("          abort: " + p.getClass().getName());        }    }    protected int prepareForAbort        (TransactionParticipant p, long id, Serializable context)     {        try {            if (p instanceof AbortParticipant)                return ((AbortParticipant)p).prepareForAbort (id, context);        } catch (Throwable t) {            getLog().warn ("PREPARE-FOR-ABORT: " + Long.toString (id), t);        }        return ABORTED | NO_JOIN;    }    protected int prepare         (TransactionParticipant p, long id, Serializable context)     {        try {            return p.prepare (id, context);        } catch (Throwable t) {            getLog().warn ("PREPARE: " + Long.toString (id), t);        }        return ABORTED;    }    protected void commit         (TransactionParticipant p, long id, Serializable context)     {        try {            p.commit (id, context);        } catch (Throwable t) {            getLog().warn ("COMMIT: " + Long.toString (id), t);        }    }    protected void abort         (TransactionParticipant p, long id, Serializable context)     {        try {            p.abort (id, context);        } catch (Throwable t) {            getLog().warn ("ABORT: " + Long.toString (id), t);        }    }    protected int prepare (long id, Serializable context, List members, Iterator iter, boolean abort, LogEvent evt) {        boolean retry = false;        boolean pause = false;        for (int i=0; iter.hasNext (); i++) {            int action = 0;            if (i > MAX_PARTICIPANTS) {                getLog().warn (                    "loop detected - transaction " +id + " aborted."                );                return ABORTED;            }            TransactionParticipant p = (TransactionParticipant) iter.next();            if (abort) {                action = prepareForAbort (p, id, context);                if (evt != null && (p instanceof AbortParticipant))                    evt.addMessage ("prepareForAbort: " + p.getClass().getName());            } else {                action = prepare (p, id, context);                abort  = (action & PREPARED) == ABORTED;                retry  = (action & RETRY) == RETRY;                pause  = (action & PAUSE) == PAUSE;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -