📄 statsengine.java
字号:
/**
* The task which samples statistics and persits them to the database.
*
* @author Alexander Wenckus
*/
private class SampleTask extends TimerTask {
private long lastSampleTime = 0;
public void run() {
if (!ClusterManager.isSeniorClusterMember()) {
// Do not create new samples of the statistics since we are not the senior cluster member
return;
}
long newTime = getLastMinute();
if (lastSampleTime != 0 && newTime <= lastSampleTime) {
Log.warn("Sample task not run because less then a second has passed since last " +
"sample.");
return;
}
lastSampleTime = newTime;
// Gather sample statistics from remote cluster nodes
Collection<Object> remoteSamples = CacheFactory.doSynchronousClusterTask(new GetStatistics(), false);
List<String> sampledStats = new ArrayList<String>();
for (Map.Entry<String, Statistic> statisticEntry : statsManager.getAllStatistics()) {
String key = statisticEntry.getKey();
StatDefinition def = createDefintion(key);
// Check to see if this stat belongs to a multi-stat and if that multi-stat group
// has been completly defined
String group = statsManager.getMultistatGroup(key);
StatDefinition [] definitions;
if (group != null) {
definitions = checkAndCreateGroup(group, def, false);
if (definitions == null || sampledStats.contains(def.getDatasourceName())) {
continue;
}
}
else {
definitions = new StatDefinition[]{def};
}
RrdDb db = null;
try {
newTime = getLastMinute();
if (def.lastSampleTime <= 0) {
for(StatDefinition definition : definitions) {
definition.lastSampleTime = newTime;
// It is possible that this plugin and thus the StatsEngine didn't
// start when Openfire started so we want to put the stats in a known
// state for proper sampling.
sampleStat(key, definition);
}
continue;
}
db = new RrdDb(def.getDbPath(), false);
// We want to double check the last sample time recorded in the db so as to
// prevent the log files from being inundated if more than one instance of
// Openfire is updating the same database. Also, if there is a task taking a
// long time to complete
if(newTime <= db.getLastArchiveUpdateTime()) {
Log.warn("Sample time of " + newTime + " for statistic " + key + " is " +
"invalid.");
}
Sample sample = db.createSample(newTime);
if (Log.isDebugEnabled()) {
Log.debug("Stat: " + db.getPath() + ". Last sample: " + db.getLastUpdateTime() +
". New sample: " + sample.getTime());
}
for (StatDefinition definition : definitions) {
// Get a statistic sample of this JVM
double statSample = sampleStat(key, definition);
// Add up samples of remote cluster nodes
for (Object nodeResult : remoteSamples) {
Map<String, Double> nodeSamples = (Map<String, Double>) nodeResult;
Double remoteSample = nodeSamples.get(key);
if (remoteSample != null) {
statSample += remoteSample;
}
}
// Update sample with values
sample.setValue(definition.getDatasourceName(), statSample);
sampledStats.add(definition.getDatasourceName());
definition.lastSampleTime = newTime;
definition.lastSample = statSample;
}
sample.update();
}
catch (IOException e) {
Log.error("Error sampling for statistic " + key, e);
}
catch (RrdException e) {
Log.error("Error sampling for statistic " + key, e);
}
finally {
if (db != null) {
try {
db.close();
}
catch (IOException e) {
Log.error("Error releasing db resource", e);
}
}
}
}
}
/**
* Profiles the sampling to make sure that it does not take longer than half a second to
* complete, if it does, a warning is logged.
*
* @param statKey the key related to the statistic.
* @param definition the statistic definition for the stat to be sampled.
* @return the sample.
*/
private double sampleStat(String statKey, StatDefinition definition) {
long start = System.currentTimeMillis();
double sample = definition.getStatistic().sample();
if (System.currentTimeMillis() - start >= 500) {
Log.warn("Stat " + statKey + " took longer than a second to sample.");
}
return sample;
}
}
/**
* Class to process all information retrieved from the stats databases. It also retains
* any meta information related to these databases.
*
* @author Alexander Wenckus
*/
private class DefaultStatDefinition extends StatDefinition {
private String consolidationFunction;
DefaultStatDefinition(String dbPath, String datasourceName, Statistic stat) {
super(dbPath, datasourceName, stat);
this.consolidationFunction = determineConsolidationFun(stat.getStatType());
}
private String determineConsolidationFun(Statistic.Type type) {
switch (type) {
case count:
return ConsolFuns.CF_LAST;
default:
return ConsolFuns.CF_AVERAGE;
}
}
public double[][] getData(long startTime, long endTime) {
return fetchData(consolidationFunction, startTime, endTime, -1);
}
public double[][] getData(long startTime, long endTime, int dataPoints) {
// Our greatest datapoints is 60 so if it is something less than that
// then we want an average.
return fetchData((dataPoints != 60 ? ConsolFuns.CF_AVERAGE : consolidationFunction),
startTime, endTime, dataPoints);
}
public long getLastSampleTime() {
return lastSampleTime;
}
public double getLastSample() {
return lastSample;
}
public double[] getMax(long startTime, long endTime) {
return getMax(startTime, endTime, 1);
}
private double discoverMax(double[] doubles) {
double max = 0;
for (double d : doubles) {
if (d > max) {
max = d;
}
}
return max;
}
private double[][] fetchData(String function, long startTime, long endTime, int dataPoints) {
RrdDb db = null;
try {
db = new RrdDb(getDbPath(), true);
FetchData data;
if (dataPoints > 0) {
data = db.createFetchRequest(function, startTime, endTime,
getResolution(startTime, endTime, dataPoints)).fetchData();
}
else {
data = db.createFetchRequest(function, startTime, endTime).fetchData();
}
return data.getValues();
}
catch (IOException e) {
Log.error("Error initializing Rrdb", e);
}
catch (RrdException e) {
Log.error("Error initializing Rrdb", e);
}
finally {
try {
if (db != null) {
db.close();
}
}
catch (IOException e) {
Log.error("Unable to release Rrdb resources",e);
}
}
return null;
}
private long getResolution(long startTime, long endTime, int dataPoints) {
return (endTime - startTime) / (dataPoints * 60);
}
public double[] getMin(long startTime, long endTime) {
return getMin(startTime, endTime, 1);
}
public double[] getMin(long startTime, long endTime, int dataPoints) {
double[][] fetchedData = fetchData(consolidationFunction, startTime,
endTime, dataPoints);
if (fetchedData != null) {
double[] toReturn = new double[fetchedData.length];
for (int i = 0; i < fetchedData.length; i++) {
toReturn[i] = discoverMin(fetchedData[i]);
}
return toReturn;
}
return new double[] { 0 };
}
public double[] getMax(long startTime, long endTime, int dataPoints) {
double[][] fetchedData = fetchData(consolidationFunction, startTime,
endTime, dataPoints);
if (fetchedData != null) {
double[] toReturn = new double[fetchedData.length];
for (int i = 0; i < fetchedData.length; i++) {
toReturn[i] = discoverMax(fetchedData[i]);
}
return toReturn;
}
return new double[] { 0 };
}
private double discoverMin(double[] doubles) {
double min = doubles[0];
for (double d : doubles) {
if (d < min) {
min = d;
}
}
return min;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -