pingmanager.java

来自「opennms得相关源码 请大家看看」· Java 代码 · 共 704 行 · 第 1/2 页

JAVA
704
字号
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc.  All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// 2003 Jan 31: Cleaned up some unused imports.//// Original code base Copyright (C) 1999-2001 Oculan Corp.  All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.                                                            //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.//       // For more information contact: //      OpenNMS Licensing       <license@opennms.org>//      http://www.opennms.org///      http://www.opennms.com///// Tab Size = 8//package org.opennms.netmgt.discovery;import java.io.IOException;import java.net.DatagramPacket;import java.net.InetAddress;import java.net.NoRouteToHostException;import org.apache.log4j.Category;import org.opennms.core.concurrent.QuantumSemaphore;import org.opennms.core.fiber.ExtendedStatusFiber;import org.opennms.core.fiber.PausableFiber;import org.opennms.core.queue.FifoQueue;import org.opennms.core.queue.FifoQueueImpl;import org.opennms.core.utils.ThreadCategory;import org.opennms.netmgt.ping.Packet;import org.opennms.netmgt.ping.Reply;import org.opennms.netmgt.ping.ReplyReceiver;import org.opennms.protocols.icmp.IcmpSocket;final class PingManager implements Runnable, PausableFiber {    private static final long TID_CONST_KEY = 0x0110deadbeef0000L;    private Pinger[] m_pingers;    private QuantumSemaphore m_semaphore;    private IcmpSocket m_socket;    private FifoQueue m_replyQ;    private FifoQueue m_discoveredQ;    private ReplyReceiver m_icmpReceiver;    private int m_status;    private short m_filterId;    private Thread m_worker;    static final class Pinger implements Runnable, PausableFiber, ExtendedStatusFiber {        /**         * The semaphore that each poller must acquire before they can send a         * packet         */        private final QuantumSemaphore m_semaphore;        /**         * The socket used to send datagrams.         */        private final IcmpSocket m_socket;        /**         * The queue of inbound addresses to poll.         */        private final FifoQueue m_addressQ;        /**         * The value stored in the icmp identifier field.         */        private final short m_icmpId;        /**         * The identifier for the fiber. This is stored in the ping packet.         */        private final long m_fiberId;        /**         * The normal fiber's status.         */        private int m_status;        /**         * The extended status.         */        private int m_xstatus;        /**         * The current target address.         */        private InetAddress m_target;        /**         * True if the <code>signal</code> method is invoked. Used to indicate         * a responsive address.         */        private boolean m_signaled;        /**         * The thread that is executing the <code>run</code> method on behalf         * of the fiber.         */        private Thread m_worker;        /**         * This is the constant value that represents an idle pinger thread.         */        final static int IDLE = 0;        /**         * This is the constant value that represents a polling pinger thread.         */        final static int POLLING = 1;        /**         * <p>         * This method does the actual polling of the encapsulated address using         * the timeout and retry values. The polling will continue until either         * all the retries are exhausted or the pinger is signaled that the         * address responded.         * </p>         *          * <p>         * <strong>NOTICE: The instance lock must be heald prior to calling this         * method. </strong> This method uses the         * {@link java.lang.Object#wait wait}method and lock ownership is         * required to invoke the <code>wait</code> method.         * </p>         *          * @param addr         *            The address and information for polling.         *          * @throws java.io.IOException         *             Thrown if an error occurs sending the ICMP information.         * @throws java.lang.InterruptedException         *             Thrown if the thread is interrupted.         */        private/* synchronized */boolean poll(IPPollAddress addr) throws IOException, InterruptedException {            Category log = ThreadCategory.getInstance(getClass());            for (int tries = 0; !m_signaled && tries <= addr.getRetries(); tries++) {                // build a packet                //                Packet pingPkt = new Packet(m_fiberId);                pingPkt.setIdentity(m_icmpId);                pingPkt.computeChecksum();                // convert it to                byte[] buf = pingPkt.toBytes();                DatagramPacket sendPkt = new DatagramPacket(buf, buf.length, addr.getAddress(), 0);                buf = null;                pingPkt = null;                // Aquire a right to send                //                boolean x = m_semaphore.acquire();                long expire = System.currentTimeMillis() + addr.getTimeout();                while (m_status != STOP_PENDING && !m_signaled) {                    long wtime = expire - System.currentTimeMillis();                    if (wtime <= 0)                        break;                    if (log.isDebugEnabled())                        log.debug("poll: try#: " + tries + " - sending ping to address " + addr.getAddress().getHostAddress());                    m_socket.send(sendPkt);                    this.wait(wtime);                }            }            return m_signaled;        }        /**         * Constructs a new instance of a pinging fiber.         *          * @param socket         *            The ICMP socket         * @param semaphore         *            The control semaphore         * @param addrQ         *            The input address queue.         * @param filterId         *            The ICMP id for filtering datagrams         * @param tid         *            The Thread ID stored in the Packet.         *          */        Pinger(IcmpSocket socket, QuantumSemaphore semaphore, FifoQueue addrQ, short filterId, long tid) {            m_socket = socket;            m_semaphore = semaphore;            m_addressQ = addrQ;            m_icmpId = filterId;            m_fiberId = tid;            m_status = START_PENDING;            m_xstatus = IDLE;            m_signaled = false;            m_target = null;            m_worker = null;        }        /**         * Starts the current fiber. If the fiber has already been started,         * regardless of it's current state, then an IllegalStateException is         * thrown.         *          * @throws java.lang.IllegalStateException         *             Thrown if the fiber has already been started.         *          */        public synchronized void start() {            if (m_worker != null)                throw new IllegalStateException("The fiber is running or has already run");            m_worker = new Thread(this, getName());            m_worker.start();            m_status = STARTING;        }        /**         * Attempts to stop the current polling cycle as quickly as possbile. If         * the fiber has never been started then an         * <code>IllegalStateExceptio</code> is generated.         *          * @throws java.lang.IllegalStateException         *             Thrown if the fiber has never been started.         */        public synchronized void stop() {            if (m_worker == null)                throw new IllegalStateException("The fiber has never run");            m_status = STOP_PENDING;            m_worker.interrupt();            notifyAll();        }        /**         * <p>         * Pauses the current fiber. This does not take effect immediently, but         * after the current polling cycle has completed.         * </p>         *          * <p>         * To determine if the fiber has finished its current polling cycle         * check for a status equal to <code>PAUSED</code> and the extended         * status equal to <code>IDLE</code>.         * </p>         *          */        public synchronized void pause() {            if (m_worker == null || m_worker.isAlive() == false)                throw new IllegalStateException("The fiber is not running");            m_status = PAUSED;            notifyAll();        }        /**         * Resumes a currently paused fiber.         */        public synchronized void resume() {            if (m_worker == null || m_worker.isAlive() == false)                throw new IllegalStateException("The fiber is not running");            m_status = RUNNING;            notifyAll();        }        /**         * Returns the name of the fiber.         *          * @return The name of the Fiber.         */        public String getName() {            return "PingManager.Pinger-" + (m_fiberId ^ TID_CONST_KEY);        }        /**         * Returns the current status of the pinging thread.         *          * @return The status of the Fiber.         */        public synchronized int getStatus() {            if (m_worker != null && !m_worker.isAlive())                m_status = STOPPED;            return m_status;        }        /**         * Returns the extends status of the thread. This will always be one of         * two values: POLLING or IDLE.         *          * @return The current polling status.         *          */        public synchronized int getExtendedStatus() {            return m_xstatus;        }        /**         * <p>         * The main method that does the work for the pinging thread. This         * method reads {@link IPPollAddress IPPollAddress}instances from the         * input queue and then polls the target using the information.         * </p>         *          * <p>         * While a poll is in process the extended status of the fiber will         * return {@link #POLLING POLLING}otherwise it should be         * {@link #IDLE IDLE}.         * </p>         *          * <p>         * If an error occurs then the thread will exit and set the status to         * {@link org.opennms.core.fiber.Fiber#STOPPED STOPPED}.         * </p>         *          */        public void run() {            Category log = ThreadCategory.getInstance(getClass());

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?