📄 gangliacontext.java
字号:
/* * GangliaContext.java * * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.metrics.ganglia;import java.io.IOException;import java.net.DatagramPacket;import java.net.DatagramSocket;import java.net.SocketAddress;import java.net.SocketException;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import org.apache.hadoop.metrics.ContextFactory;import org.apache.hadoop.metrics.MetricsException;import org.apache.hadoop.metrics.spi.AbstractMetricsContext;import org.apache.hadoop.metrics.spi.OutputRecord;import org.apache.hadoop.metrics.spi.Util;/** * Context for sending metrics to Ganglia. * */public class GangliaContext extends AbstractMetricsContext { private static final String PERIOD_PROPERTY = "period"; private static final String SERVERS_PROPERTY = "servers"; private static final String UNITS_PROPERTY = "units"; private static final String SLOPE_PROPERTY = "slope"; private static final String TMAX_PROPERTY = "tmax"; private static final String DMAX_PROPERTY = "dmax"; private static final String DEFAULT_UNITS = ""; private static final String DEFAULT_SLOPE = "both"; private static final int DEFAULT_TMAX = 60; private static final int DEFAULT_DMAX = 0; private static final int DEFAULT_PORT = 8649; private static final int BUFFER_SIZE = 1500; // as per libgmond.c //private static final Map<Class,String> typeTable = new HashMap<Class,String>(5); private static final Map typeTable = new HashMap(5); static { typeTable.put(String.class, "string"); typeTable.put(Byte.class, "int8"); typeTable.put(Short.class, "int16"); typeTable.put(Integer.class, "int32"); typeTable.put(Float.class, "float"); } private byte[] buffer = new byte[BUFFER_SIZE]; private int offset; //private List<SocketAddress> metricsServers; private List metricsServers; //private Map<String,String> unitsTable; private Map unitsTable; //private Map<String,String> slopeTable; private Map slopeTable; //private Map<String,String> tmaxTable; private Map tmaxTable; //private Map<String,String> dmaxTable; private Map dmaxTable; private DatagramSocket datagramSocket; /** Creates a new instance of GangliaContext */ public GangliaContext() { } public void init(String contextName, ContextFactory factory) { super.init(contextName, factory); String periodStr = getAttribute(PERIOD_PROPERTY); if (periodStr != null) { int period = 0; try { period = Integer.parseInt(periodStr); } catch (NumberFormatException nfe) { } if (period <= 0) { throw new MetricsException("Invalid period: " + periodStr); } setPeriod(period); } metricsServers = Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); unitsTable = getAttributeTable(UNITS_PROPERTY); slopeTable = getAttributeTable(SLOPE_PROPERTY); tmaxTable = getAttributeTable(TMAX_PROPERTY); dmaxTable = getAttributeTable(DMAX_PROPERTY); try { datagramSocket = new DatagramSocket(); } catch (SocketException se) { se.printStackTrace(); } } public void emitRecord(String contextName, String recordName, OutputRecord outRec) throws IOException { // metric name formed from record name and tag values StringBuffer nameBuf = new StringBuffer(recordName); // for (String tagName : outRec.getTagNames()) { Iterator tagIt = outRec.getTagNames().iterator(); while (tagIt.hasNext()) { String tagName = (String) tagIt.next(); nameBuf.append('.'); nameBuf.append(outRec.getTag(tagName)); } nameBuf.append('.'); String namePrefix = new String(nameBuf); // emit each metric in turn // for (String metricName : outRec.getMetricNames()) { Iterator metricIt = outRec.getMetricNames().iterator(); while (metricIt.hasNext()) { String metricName = (String) metricIt.next(); Object metric = outRec.getMetric(metricName); String type = (String) typeTable.get(metric.getClass()); emitMetric(namePrefix + metricName, type, metric.toString()); } } private void emitMetric(String name, String type, String value) throws IOException { String units = getUnits(name); int slope = getSlope(name); int tmax = getTmax(name); int dmax = getDmax(name); offset = 0; xdr_int(0); // metric_user_defined xdr_string(type); xdr_string(name); xdr_string(value); xdr_string(units); xdr_int(slope); xdr_int(tmax); xdr_int(dmax); // for (SocketAddress socketAddress : metricsServers) { Iterator socketIt = metricsServers.iterator(); while (socketIt.hasNext()) { SocketAddress socketAddress = (SocketAddress) socketIt.next(); DatagramPacket packet = new DatagramPacket(buffer, offset, socketAddress); datagramSocket.send(packet); } } private String getUnits(String metricName) { String result = (String) unitsTable.get(metricName); if (result == null) { result = DEFAULT_UNITS; } return result; } private int getSlope(String metricName) { String slopeString = (String) slopeTable.get(metricName); if (slopeString == null) { slopeString = DEFAULT_SLOPE; } return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c } private int getTmax(String metricName) { String tmaxString = (String) tmaxTable.get(metricName); if (tmaxString == null) { return DEFAULT_TMAX; } else { return Integer.parseInt(tmaxString); } } private int getDmax(String metricName) { String dmaxString = (String) dmaxTable.get(metricName); if (dmaxString == null) { return DEFAULT_DMAX; } else { return Integer.parseInt(dmaxString); } } /** * Puts a string into the buffer by first writing the size of the string * as an int, followed by the bytes of the string, padded if necessary to * a multiple of 4. */ private void xdr_string(String s) { byte[] bytes = s.getBytes(); int len = bytes.length; xdr_int(len); System.arraycopy(bytes, 0, buffer, offset, len); offset += len; pad(); } /** * Pads the buffer with zero bytes up to the nearest multiple of 4. */ private void pad() { int newOffset = ((offset + 3) / 4) * 4; while (offset < newOffset) { buffer[offset++] = 0; } } /** * Puts an integer into the buffer as 4 bytes, big-endian. */ private void xdr_int(int i) { buffer[offset++] = (byte)((i >> 24) & 0xff); buffer[offset++] = (byte)((i >> 16) & 0xff); buffer[offset++] = (byte)((i >> 8) & 0xff); buffer[offset++] = (byte)(i & 0xff); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -