replyreceiver.java

来自「opennms得相关源码 请大家看看」· Java 代码 · 共 337 行

JAVA
337
字号
//// This file is part of the OpenNMS(R) Application.//// OpenNMS(R) is Copyright (C) 2002-2003 The OpenNMS Group, Inc.  All rights reserved.// OpenNMS(R) is a derivative work, containing both original code, included code and modified// code that was published under the GNU General Public License. Copyrights for modified // and included code are below.//// OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.//// Modifications://// 2003 Mar 05: Cleaned up some ICMP related code.// 2002 Nov 13: Added response time stats for ICMP requests.// // Original code base Copyright (C) 1999-2001 Oculan Corp.  All rights reserved.//// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.//// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.                                                            //// You should have received a copy of the GNU General Public License// along with this program; if not, write to the Free Software// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.//       // For more information contact: //      OpenNMS Licensing       <license@opennms.org>//      http://www.opennms.org///      http://www.opennms.com///// Tab Size = 8//// ReplyReceiver.java,v 1.1.1.1 2001/11/11 17:34:37 ben Exp//package org.opennms.netmgt.ping;import java.net.DatagramPacket;import org.apache.log4j.Category;import org.opennms.core.fiber.PausableFiber;import org.opennms.core.queue.FifoQueue;import org.opennms.core.queue.FifoQueueException;import org.opennms.core.utils.ThreadCategory;import org.opennms.protocols.icmp.IcmpSocket;/** * <p> * This class is designed to be a single point of reciept for all ICMP messages * received by an {@link org.opennms.protocols.icmp.IcmpSocketIcmpSocket} * instance. The class implements the * {@link org.opennms.core.fiber.PausableFiber PausableFiber}interface as a * means to control the operation of the receiver. * </p> *  * <p> * Once the receiver is started it will process all recieved datagrams and * filter them based upon their ICMP code and the filter identifier used to * construct the reciever. All ICMP messages, except for Echo Replies, are * discared by the reciever. In addition, only those echo replies that have * their identifier set to the passed filter identifier are also discarded. * </p> *  * <p> * Received datagrams that pass the requirement of the receiver are added to the * reply queue for processing by the application. Only instances of the * {@link Reply Reply}class are added to the queue for processing. * </p> *  * @author <A HREF="sowmya@opennms.org">Sowmya </A> * @author <A HREF="weave@oculan.com">Brian Weaver </A> * @author <A HREF="http://www.opennms.org/">OpenNMS </A> *  */public final class ReplyReceiver implements PausableFiber, Runnable {    /**     * The name of this instance, always starts the same and the filterID is     * appended to the name.     */    private static final String NAME = "ICMPReceiver";    /**     * The queue to write the received replies     */    private FifoQueue m_replyQ;    /**     * The connection to the icmp daemon.     */    private IcmpSocket m_portal;    /**     * The filter to look for     */    private short m_filterID;    /**     * The paused flag     */    private volatile boolean m_paused;    /**     * The name of this instance.     */    private String m_name;    /**     * The underlying thread doing the work.     */    private Thread m_worker;    /**     * The thread's status     */    private int m_status;    /**     * The default constructor is marked private to prevent it's used. The     * constructor always throws an UnsupportedOperationException.     *      * @exception java.lang.UnsupportedOperationException     *                Always thrown.     */    private ReplyReceiver() throws java.lang.UnsupportedOperationException {        throw new java.lang.UnsupportedOperationException("invalid constructor invocation");    }    /**     * <p>     * Processes the received datagram and adds a new {@link Reply Reply}     * instance to the reply queue. The recieved packet must pass the following     * criteria:     * </p>     *      * <ul>     * <li>ICMP Type == Echo Reply</li>     * <li>ICMP Identity == Filter ID</li>     * <li>ICMP Length =={@link Packet#getNetworkSize Packet.getNetworkSize()}     * </li>     * </ul>     *      * @param pkt     *            The datagram to process.     *      * @throws java.lang.InterruptedException     *             Thrown if the thread is interrupted.     * @throws org.opennms.core.fiber.FifoQueueException     *             Thrown if a queue exception occurs adding a new reply.     *      */    protected void process(DatagramPacket pkt) throws InterruptedException, FifoQueueException {        boolean doIt = false;        synchronized (this) {            doIt = m_paused;        }        if (!doIt) {            Reply reply = null;            try {                reply = Reply.create(pkt); // create a reply            } catch (IllegalArgumentException iaE) {                // Throw by Reply.create if the packet                // is not of type Packet                //                // Discard                return;            } catch (IndexOutOfBoundsException iooB) {                // Throw by Reply.create if the packet                // is not the correct length                //                // Discard                return;            }            // Test the match criteria            //            if (reply.isEchoReply() && reply.getIdentity() == m_filterID) {                Category log = ThreadCategory.getInstance(getClass());                m_replyQ.add(reply);                if (log.isDebugEnabled())                    log.debug("process: received matching echo reply from host " + pkt.getAddress().getHostAddress());            }        }    }    /**     * <p>     * Constructs a ping reciever thread that reads datagrams from the     * connection and adds them to the queue. As each datagram is received and     * processed by the receiver, replies matching the criteria are added to the     * queue. Each reply must be of type ICMP Echo Reply, its identity must     * match the filterID, and its length must be equal to the     * {@link Packet ping packet's}length.     * </p>     *      * @param portal     *            The ICMP socket     * @param replyQ     *            The reply queue for matching messages.     * @param filterID     *            The ICMP Identity for matching.     *      */    public ReplyReceiver(IcmpSocket portal, FifoQueue replyQ, short filterID) {        m_portal = portal;        m_replyQ = replyQ;        m_filterID = filterID;        m_paused = false;        m_name = NAME + (filterID < 0 ? filterID + 0x10000 : filterID);        m_worker = null;        m_status = START_PENDING;    }    /**     * Starts the ICMP receiver. Once started the receiver reads new messages     * from the ICMP socket and processes them. Packets that match the proper     * criteria are added to the queue. If the receiver is already started then     * an exception is thrown.     *      * @throws java.lang.IllegalStateException     *             Thrown if the receiver has already been started.     *      */    public final synchronized void start() {        if (m_worker != null)            throw new IllegalStateException("The Fiber is already running or has run");        m_status = STARTING;        m_worker = new Thread(this, m_name);        m_worker.setDaemon(true);        m_worker.start();    }    /**     * Stops the current receiver. If the receiver was never started then an     * exception is thrown.     *      * @throws java.lang.IllegalStateException     *             Thrown if the receiver was never started.     *      */    public final synchronized void stop() {        if (m_worker == null)            throw new IllegalStateException("The Fiber has not been started");        if (m_worker.isAlive()) {            if (m_status != STOPPED)                m_status = STOP_PENDING;            m_worker.interrupt();        } else            m_status = STOPPED;    }    /**     * Pauses the reciever. While the receiver is pauses the it still reads and     * processes new ICMP datagrams. However, all datagrams are discarded while     * in a paused state regardless of matching criteria. Messages are still     * read since the operating system will continue to deliver them.     *      */    public final synchronized void pause() {        if (m_worker == null || !m_worker.isAlive())            throw new IllegalStateException("The fiber is not running");        m_paused = true;    }    /**     * Resumes the recipt and processing of ICMP messages.     *      */    public final synchronized void resume() {        if (m_worker == null || !m_worker.isAlive())            throw new IllegalStateException("The fiber is not running");        m_paused = false;    }    /**     * Returns the name of this fiber.     *      * @return The fiber's name.     */    public final String getName() {        return m_name;    }    /**     * Returns the status of the fiber.     *      * @return The fiber's status.     */    public final synchronized int getStatus() {        if (m_status == RUNNING && m_paused)            return PAUSED;        return m_status;    }    /**     * The run() method does the actual work of reading messages from the daemon     * and placing those messages in the appropoiate queue for use by other     * threads.     *      */    public final void run() {        synchronized (this) {            m_status = RUNNING;        }        try {            for (;;) {                synchronized (this) {                    if (m_status == STOP_PENDING)                        break;                }                process(m_portal.receive());            }        } catch (Exception e) {            Category log = ThreadCategory.getInstance(getClass());            if (log.isDebugEnabled())                log.debug("run: an exception occured processing the datagram, thread exiting");            return;        } finally {            synchronized (this) {                m_status = STOPPED;            }        }    }} // end class ReplyReceiver

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?