pingmanager.java
来自「opennms得相关源码 请大家看看」· Java 代码 · 共 704 行 · 第 1/2 页
JAVA
704 行
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc. All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// 2003 Jan 31: Cleaned up some unused imports.//// Original code base Copyright (C) 1999-2001 Oculan Corp. All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details. //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.// // For more information contact: // OpenNMS Licensing <license@opennms.org>// http://www.opennms.org/// http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.discovery;import java.io.IOException;import java.net.DatagramPacket;import java.net.InetAddress;import java.net.NoRouteToHostException;import org.apache.log4j.Category;import org.opennms.core.concurrent.QuantumSemaphore;import org.opennms.core.fiber.ExtendedStatusFiber;import org.opennms.core.fiber.PausableFiber;import org.opennms.core.queue.FifoQueue;import org.opennms.core.queue.FifoQueueImpl;import org.opennms.core.utils.ThreadCategory;import org.opennms.netmgt.ping.Packet;import org.opennms.netmgt.ping.Reply;import org.opennms.netmgt.ping.ReplyReceiver;import org.opennms.protocols.icmp.IcmpSocket;final class PingManager implements Runnable, PausableFiber { private static final long TID_CONST_KEY = 0x0110deadbeef0000L; private Pinger[] m_pingers; private QuantumSemaphore m_semaphore; private IcmpSocket m_socket; private FifoQueue m_replyQ; private FifoQueue m_discoveredQ; private ReplyReceiver m_icmpReceiver; private int m_status; private short m_filterId; private Thread m_worker; static final class Pinger implements Runnable, PausableFiber, ExtendedStatusFiber { /** * The semaphore that each poller must acquire before they can send a * packet */ private final QuantumSemaphore m_semaphore; /** * The socket used to send datagrams. */ private final IcmpSocket m_socket; /** * The queue of inbound addresses to poll. */ private final FifoQueue m_addressQ; /** * The value stored in the icmp identifier field. */ private final short m_icmpId; /** * The identifier for the fiber. This is stored in the ping packet. */ private final long m_fiberId; /** * The normal fiber's status. */ private int m_status; /** * The extended status. */ private int m_xstatus; /** * The current target address. */ private InetAddress m_target; /** * True if the <code>signal</code> method is invoked. Used to indicate * a responsive address. */ private boolean m_signaled; /** * The thread that is executing the <code>run</code> method on behalf * of the fiber. */ private Thread m_worker; /** * This is the constant value that represents an idle pinger thread. */ final static int IDLE = 0; /** * This is the constant value that represents a polling pinger thread. */ final static int POLLING = 1; /** * <p> * This method does the actual polling of the encapsulated address using * the timeout and retry values. The polling will continue until either * all the retries are exhausted or the pinger is signaled that the * address responded. * </p> * * <p> * <strong>NOTICE: The instance lock must be heald prior to calling this * method. </strong> This method uses the * {@link java.lang.Object#wait wait}method and lock ownership is * required to invoke the <code>wait</code> method. * </p> * * @param addr * The address and information for polling. * * @throws java.io.IOException * Thrown if an error occurs sending the ICMP information. * @throws java.lang.InterruptedException * Thrown if the thread is interrupted. */ private/* synchronized */boolean poll(IPPollAddress addr) throws IOException, InterruptedException { Category log = ThreadCategory.getInstance(getClass()); for (int tries = 0; !m_signaled && tries <= addr.getRetries(); tries++) { // build a packet // Packet pingPkt = new Packet(m_fiberId); pingPkt.setIdentity(m_icmpId); pingPkt.computeChecksum(); // convert it to byte[] buf = pingPkt.toBytes(); DatagramPacket sendPkt = new DatagramPacket(buf, buf.length, addr.getAddress(), 0); buf = null; pingPkt = null; // Aquire a right to send // boolean x = m_semaphore.acquire(); long expire = System.currentTimeMillis() + addr.getTimeout(); while (m_status != STOP_PENDING && !m_signaled) { long wtime = expire - System.currentTimeMillis(); if (wtime <= 0) break; if (log.isDebugEnabled()) log.debug("poll: try#: " + tries + " - sending ping to address " + addr.getAddress().getHostAddress()); m_socket.send(sendPkt); this.wait(wtime); } } return m_signaled; } /** * Constructs a new instance of a pinging fiber. * * @param socket * The ICMP socket * @param semaphore * The control semaphore * @param addrQ * The input address queue. * @param filterId * The ICMP id for filtering datagrams * @param tid * The Thread ID stored in the Packet. * */ Pinger(IcmpSocket socket, QuantumSemaphore semaphore, FifoQueue addrQ, short filterId, long tid) { m_socket = socket; m_semaphore = semaphore; m_addressQ = addrQ; m_icmpId = filterId; m_fiberId = tid; m_status = START_PENDING; m_xstatus = IDLE; m_signaled = false; m_target = null; m_worker = null; } /** * Starts the current fiber. If the fiber has already been started, * regardless of it's current state, then an IllegalStateException is * thrown. * * @throws java.lang.IllegalStateException * Thrown if the fiber has already been started. * */ public synchronized void start() { if (m_worker != null) throw new IllegalStateException("The fiber is running or has already run"); m_worker = new Thread(this, getName()); m_worker.start(); m_status = STARTING; } /** * Attempts to stop the current polling cycle as quickly as possbile. If * the fiber has never been started then an * <code>IllegalStateExceptio</code> is generated. * * @throws java.lang.IllegalStateException * Thrown if the fiber has never been started. */ public synchronized void stop() { if (m_worker == null) throw new IllegalStateException("The fiber has never run"); m_status = STOP_PENDING; m_worker.interrupt(); notifyAll(); } /** * <p> * Pauses the current fiber. This does not take effect immediently, but * after the current polling cycle has completed. * </p> * * <p> * To determine if the fiber has finished its current polling cycle * check for a status equal to <code>PAUSED</code> and the extended * status equal to <code>IDLE</code>. * </p> * */ public synchronized void pause() { if (m_worker == null || m_worker.isAlive() == false) throw new IllegalStateException("The fiber is not running"); m_status = PAUSED; notifyAll(); } /** * Resumes a currently paused fiber. */ public synchronized void resume() { if (m_worker == null || m_worker.isAlive() == false) throw new IllegalStateException("The fiber is not running"); m_status = RUNNING; notifyAll(); } /** * Returns the name of the fiber. * * @return The name of the Fiber. */ public String getName() { return "PingManager.Pinger-" + (m_fiberId ^ TID_CONST_KEY); } /** * Returns the current status of the pinging thread. * * @return The status of the Fiber. */ public synchronized int getStatus() { if (m_worker != null && !m_worker.isAlive()) m_status = STOPPED; return m_status; } /** * Returns the extends status of the thread. This will always be one of * two values: POLLING or IDLE. * * @return The current polling status. * */ public synchronized int getExtendedStatus() { return m_xstatus; } /** * <p> * The main method that does the work for the pinging thread. This * method reads {@link IPPollAddress IPPollAddress}instances from the * input queue and then polls the target using the information. * </p> * * <p> * While a poll is in process the extended status of the fiber will * return {@link #POLLING POLLING}otherwise it should be * {@link #IDLE IDLE}. * </p> * * <p> * If an error occurs then the thread will exit and set the status to * {@link org.opennms.core.fiber.Fiber#STOPPED STOPPED}. * </p> * */ public void run() { Category log = ThreadCategory.getInstance(getClass());
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?