queuingrrdstrategy.java
来自「opennms得相关源码 请大家看看」· Java 代码 · 共 954 行 · 第 1/3 页
JAVA
954 行
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2005 The OpenNMS Group, Inc. All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// Jul 8, 2004: Created this file.//// Original code base Copyright (C) 1999-2001 Oculan Corp. All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details. //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.// // For more information contact: // OpenNMS Licensing <license@opennms.org>// http://www.opennms.org/// http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.rrd;import java.io.File;import java.io.IOException;import java.io.InputStream;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Map;import java.util.Set;import org.apache.log4j.Category;import org.jrobin.core.Util;/** * Provides queueing implementation of RrdStrategy. * * In order to provide a more scalable collector. We created a queuing * RrdStrategy that enabled the system to amortize the high cost of opening an * round robin database across multiple updates. * * This RrdStrategy implementation enqueues the create and update operations on * a per file basis and maintains a set of threads that process enqueued work * file by file. * * If the I/O system can keep up with the collection threads while performing * only a single update per file then eventually all the data is processed and * the threads sleep until there is more work to do. * * If the I/O system is initially slower than than the collection threads then * work will enqueue here and the write threads will get behind. As this happens * each file will eventually have more than a single update enqueued and * therefore the number of updates pushed thru the system will increase because * more then one will be output per 'open' Eventually, the I/O system and the * collection system will balance out. When this happens all data will be * collected but will not be output to the rrd files until the next time the * file is processed by the write threads. * * As another performance improving strategy. The queue distinguishes between * files with signficant vs insignifact updates. Files with only insignificant * updates are put at the lowest priority and are only written when the highest * priority updates have been written * * This implementation delegates all the actual writing to another RrdStrategy * implementation. * * System properites effecting the operation: * * org.opennms.rrd.queuing.writethreads: (default 2) The number of rrd write * threads that process the queue * * org.opennms.rrd.queuing.queueCreates: (default true) indicates whether rrd * file creates should be queued or processed synchronously * * org.opennms.rrd.queuing.maxInsigUpdateSeconds: (default 0) the number of * seconds over which all files with significant updates only should be promoted * onto the significant less. This is to ensure they don't stay unprocessed * forever. Zero means not promotion. * * org.opennms.rrd.queuing.modulus: (default 10000) the number of updates the * get enqueued between statistics output * * org.opennms.rrd.queuing.category: (default "UNCATEGORIZED") the log category * to place the statistics output in * * * * TODO: Promote files when ZeroUpdate operations can't be merged. This may be a * collection miss which we want to push thru. It should also help with memory. * * TODO: Set an upper bound on enqueued operations * * TODO: Provide an event that will write data for a particular file... Say * right before we try to graph it. */class QueuingRrdStrategy implements RrdStrategy, Runnable { RrdStrategy m_delegate; static final int UPDATE = 0; static final int CREATE = 1; static final int WRITE_THREADS = RrdConfig.getProperty("org.opennms.rrd.queuing.writethreads", 2); private static final boolean QUEUE_CREATES = RrdConfig.getProperty("org.opennms.rrd.queuing.queuecreates", true); private static final boolean PRIORITIZE_SIGS = RrdConfig.getProperty("org.opennms.rrd.queuing.prioritizeSignificatUpdates", true); private static final long INSIG_HIGH_WATER_MARK = RrdConfig.getProperty("org.opennms.rrd.queuing.inSigHighWaterMark", 0L); private static final long SIG_HIGH_WATER_MARK = RrdConfig.getProperty("org.opennms.rrd.queuing.sigHighWaterMark", 0L); private static final long QUEUE_HIGH_WATER_MARK = RrdConfig.getProperty("org.opennms.rrd.queuing.queueHighWaterMark", 0L); private static final long MODULUS = RrdConfig.getProperty("org.opennms.rrd.queuing.modulus", 10000L); private static final String LOG4J_CATEGORY = RrdConfig.getProperty("org.opennms.rrd.queuing.category", "UNCATEGORIZED"); private static final long MAX_INSIG_UPDATE_SECONDS = RrdConfig.getProperty("org.opennms.rrd.queuing.maxInsigUpdateSeconds", 0L); private static final long WRITE_THREAD_SLEEP_TIME = RrdConfig.getProperty("org.opennms.rrd.queuing.writethread.sleepTime", 50L); private static final long WRITE_THREAD_EXIT_DELAY = RrdConfig.getProperty("org.opennms.rrd.queuing.writethread.exitDelay", 60000L); LinkedList filesWithSignificantWork = new LinkedList(); LinkedList filesWithInsignificantWork = new LinkedList(); Map pendingFileOperations = new HashMap(); Map fileAssignments = new HashMap(); Set reservedFiles = new HashSet(); long totalOperationsPending = 0; long enqueuedOperations = 0; long dequeuedOperations = 0; long significantOpsEnqueued = 0; long significantOpsDequeued = 0; long significantOpsCompleted = 0; long dequeuedItems = 0; long createsCompleted = 0; long updatesCompleted = 0; long errors = 0; int threadsRunning = 0; long updateStart = 0; long promotionCount = 0; long lastLap = System.currentTimeMillis(); long lastStatsTime = 0; long lastEnqueued = 0; long lastDequeued = 0; long lastSignificantEnqueued = 0; long lastSignificantDequeued = 0; long lastSignificantCompleted = 0; long lastDequeuedItems = 0; long lastOpsPending = 0; /** * This is the base class for an enqueueable operation */ static abstract class Operation { String fileName; int type; Object data; boolean significant; Operation(String fileName, int type, Object data, boolean significant) { this.fileName = fileName; this.type = type; this.data = data; this.significant = significant; } int getCount() { return 1; } String getFileName() { return this.fileName; } int getType() { return this.type; } Object getData() { return this.data; } boolean isSignificant() { return significant; } void addToPendingList(LinkedList pendingOperations) { pendingOperations.add(this); } abstract Object process(Object rrd) throws Exception; } /** * This class represents an operation to create an rrd file */ public class CreateOperation extends Operation { CreateOperation(String fileName, Object rrdDef) { super(fileName, CREATE, rrdDef, true); } Object process(Object rrd) throws Exception { // if the rrd is already open we are confused if (rrd != null) { System.err.println("WHAT! rrd open but not created?"); m_delegate.closeFile(rrd); rrd = null; } // create the file m_delegate.createFile(getData()); // keep stats ++createsCompleted; // return the file return rrd; } } /** * Represents an update to a rrd file. */ public class UpdateOperation extends Operation { UpdateOperation(String fileName, String data) { super(fileName, UPDATE, data, true); } UpdateOperation(String fileName, String data, boolean significant) { super(fileName, UPDATE, data, significant); } Object process(Object rrd) throws Exception { // open the file if we need to if (rrd == null) rrd = m_delegate.openFile(getFileName()); String update = (String) getData(); try { // process the update m_delegate.updateFile(rrd, update); } catch (Exception e) { throw new Exception("Error processing update for file " + getFileName() + ": " + update, e); } // keep stats if (++updatesCompleted % MODULUS == 0) { printStats(); } // return the open rrd for further processing return rrd; } } /** * Represents an update whose value is 0. These operations can be merged * together and take up less memory */ public class ZeroUpdateOperation extends UpdateOperation {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?