queuingrrdstrategy.java

来自「opennms得相关源码 请大家看看」· Java 代码 · 共 954 行 · 第 1/3 页

JAVA
954
字号
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2005 The OpenNMS Group, Inc.  All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// Jul 8, 2004: Created this file.//// Original code base Copyright (C) 1999-2001 Oculan Corp.  All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.                                                            //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.//       // For more information contact: //      OpenNMS Licensing       <license@opennms.org>//      http://www.opennms.org///      http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.rrd;import java.io.File;import java.io.IOException;import java.io.InputStream;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Map;import java.util.Set;import org.apache.log4j.Category;import org.jrobin.core.Util;/** * Provides queueing implementation of RrdStrategy. *  * In order to provide a more scalable collector. We created a queuing * RrdStrategy that enabled the system to amortize the high cost of opening an * round robin database across multiple updates. *  * This RrdStrategy implementation enqueues the create and update operations on * a per file basis and maintains a set of threads that process enqueued work * file by file. *  * If the I/O system can keep up with the collection threads while performing * only a single update per file then eventually all the data is processed and * the threads sleep until there is more work to do. *  * If the I/O system is initially slower than than the collection threads then * work will enqueue here and the write threads will get behind. As this happens * each file will eventually have more than a single update enqueued and * therefore the number of updates pushed thru the system will increase because * more then one will be output per 'open' Eventually, the I/O system and the * collection system will balance out. When this happens all data will be * collected but will not be output to the rrd files until the next time the * file is processed by the write threads. *  * As another performance improving strategy. The queue distinguishes between * files with signficant vs insignifact updates. Files with only insignificant * updates are put at the lowest priority and are only written when the highest * priority updates have been written *  * This implementation delegates all the actual writing to another RrdStrategy * implementation. *  * System properites effecting the operation: *  * org.opennms.rrd.queuing.writethreads: (default 2) The number of rrd write * threads that process the queue *  * org.opennms.rrd.queuing.queueCreates: (default true) indicates whether rrd * file creates should be queued or processed synchronously *  * org.opennms.rrd.queuing.maxInsigUpdateSeconds: (default 0) the number of * seconds over which all files with significant updates only should be promoted * onto the significant less. This is to ensure they don't stay unprocessed * forever. Zero means not promotion. *  * org.opennms.rrd.queuing.modulus: (default 10000) the number of updates the * get enqueued between statistics output *  * org.opennms.rrd.queuing.category: (default "UNCATEGORIZED") the log category * to place the statistics output in *  *  *  * TODO: Promote files when ZeroUpdate operations can't be merged. This may be a * collection miss which we want to push thru. It should also help with memory. *  * TODO: Set an upper bound on enqueued operations *  * TODO: Provide an event that will write data for a particular file... Say * right before we try to graph it. */class QueuingRrdStrategy implements RrdStrategy, Runnable {    RrdStrategy m_delegate;    static final int UPDATE = 0;    static final int CREATE = 1;    static final int WRITE_THREADS = RrdConfig.getProperty("org.opennms.rrd.queuing.writethreads", 2);    private static final boolean QUEUE_CREATES = RrdConfig.getProperty("org.opennms.rrd.queuing.queuecreates", true);        private static final boolean PRIORITIZE_SIGS = RrdConfig.getProperty("org.opennms.rrd.queuing.prioritizeSignificatUpdates", true);    private static final long INSIG_HIGH_WATER_MARK = RrdConfig.getProperty("org.opennms.rrd.queuing.inSigHighWaterMark", 0L);    private static final long SIG_HIGH_WATER_MARK = RrdConfig.getProperty("org.opennms.rrd.queuing.sigHighWaterMark", 0L);    private static final long QUEUE_HIGH_WATER_MARK = RrdConfig.getProperty("org.opennms.rrd.queuing.queueHighWaterMark", 0L);    private static final long MODULUS = RrdConfig.getProperty("org.opennms.rrd.queuing.modulus", 10000L);    private static final String LOG4J_CATEGORY = RrdConfig.getProperty("org.opennms.rrd.queuing.category", "UNCATEGORIZED");    private static final long MAX_INSIG_UPDATE_SECONDS = RrdConfig.getProperty("org.opennms.rrd.queuing.maxInsigUpdateSeconds", 0L);    private static final long WRITE_THREAD_SLEEP_TIME = RrdConfig.getProperty("org.opennms.rrd.queuing.writethread.sleepTime", 50L);    private static final long WRITE_THREAD_EXIT_DELAY = RrdConfig.getProperty("org.opennms.rrd.queuing.writethread.exitDelay", 60000L);        LinkedList filesWithSignificantWork = new LinkedList();    LinkedList filesWithInsignificantWork = new LinkedList();    Map pendingFileOperations = new HashMap();    Map fileAssignments = new HashMap();    Set reservedFiles = new HashSet();    long totalOperationsPending = 0;    long enqueuedOperations = 0;    long dequeuedOperations = 0;    long significantOpsEnqueued = 0;    long significantOpsDequeued = 0;    long significantOpsCompleted = 0;    long dequeuedItems = 0;    long createsCompleted = 0;    long updatesCompleted = 0;    long errors = 0;    int threadsRunning = 0;    long updateStart = 0;    long promotionCount = 0;    long lastLap = System.currentTimeMillis();    long lastStatsTime = 0;    long lastEnqueued = 0;    long lastDequeued = 0;    long lastSignificantEnqueued = 0;    long lastSignificantDequeued = 0;    long lastSignificantCompleted = 0;    long lastDequeuedItems = 0;    long lastOpsPending = 0;    /**     * This is the base class for an enqueueable operation     */    static abstract class Operation {        String fileName;        int type;        Object data;        boolean significant;        Operation(String fileName, int type, Object data, boolean significant) {            this.fileName = fileName;            this.type = type;            this.data = data;            this.significant = significant;        }        int getCount() {            return 1;        }        String getFileName() {            return this.fileName;        }        int getType() {            return this.type;        }        Object getData() {            return this.data;        }        boolean isSignificant() {            return significant;        }        void addToPendingList(LinkedList pendingOperations) {            pendingOperations.add(this);        }        abstract Object process(Object rrd) throws Exception;            }    /**     * This class represents an operation to create an rrd file     */    public class CreateOperation extends Operation {        CreateOperation(String fileName, Object rrdDef) {            super(fileName, CREATE, rrdDef, true);        }        Object process(Object rrd) throws Exception {            // if the rrd is already open we are confused            if (rrd != null) {                System.err.println("WHAT! rrd open but not created?");                m_delegate.closeFile(rrd);                rrd = null;            }            // create the file            m_delegate.createFile(getData());            // keep stats            ++createsCompleted;            // return the file            return rrd;        }    }    /**     * Represents an update to a rrd file.     */    public class UpdateOperation extends Operation {        UpdateOperation(String fileName, String data) {            super(fileName, UPDATE, data, true);        }        UpdateOperation(String fileName, String data, boolean significant) {            super(fileName, UPDATE, data, significant);        }        Object process(Object rrd) throws Exception {            // open the file if we need to            if (rrd == null)                rrd = m_delegate.openFile(getFileName());            String update = (String) getData();            try {                // process the update                m_delegate.updateFile(rrd, update);            } catch (Exception e) {                throw new Exception("Error processing update for file " + getFileName() + ": " + update, e);            }            // keep stats            if (++updatesCompleted % MODULUS == 0) {                printStats();            }            // return the open rrd for further processing            return rrd;        }    }    /**     * Represents an update whose value is 0. These operations can be merged     * together and take up less memory     */    public class ZeroUpdateOperation extends UpdateOperation {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?