⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpstreamhandler.java

📁 opennms得相关源码 请大家看看
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                try {                    pipeIn = new PipedInputStream((PipedOutputStream) o);                } catch (IOException e) {                    log.error("An I/O exception occured construction a record reader", e);                    break MAINLOOP;                }                // signal that we got the stream                //                synchronized (o) {                    o.notify();                }            } // end synchronization on xchange            // decrement the record count if greater than zero            //            m_recsPerConn -= (m_recsPerConn > 0 ? 1 : 0);            // convert the pipe input stream into            // and actual buffered input stream            //            InputStreamReader in = new InputStreamReader(new BufferedInputStream(pipeIn));            // Unmarshall the XML document            //            Log eLog = null;            boolean doCleanup = false;            try {                eLog = (Log) Unmarshaller.unmarshal(Log.class, in);                if (log.isDebugEnabled())                    log.debug("Event record converted");            } catch (ValidationException e) {                log.debug("The XML record is not valid", new ValidationException(e.getMessage()));                doCleanup = true;            } catch (MarshalException e) {                log.debug("Could not unmarshall the XML record", new MarshalException(e.getMessage()));                doCleanup = true;            }            // clean up the data on the current pipe if necessary            //            if (doCleanup) {                // cleanup a failed record. Need to read                // the remaining bytes from the other thread                // to synchronize up. The other thread might                // be blocked writing                //                try {                    while (in.read() != -1)                        /* do nothing */;                } catch (IOException e) {                    // do nothing                }                // start from the top!                //                continue MAINLOOP;            }            // Now that we have a list of events, process them            //            Event[] events = eLog.getEvents().getEvent();            // sort the events by time            //            Arrays.sort(events, new Comparator() {                public int compare(Object o1, Object o2) {                    Event e1 = (Event) o1;                    Event e2 = (Event) o2;                    boolean e1t = (e1.getTime() != null);                    boolean e2t = (e2.getTime() != null);                    if (e1t && !e2t) {                        return 1;                    } else if (!e1t && e2t) {                        return -1;                    } else if (!e1t && !e2t) {                        return 0;                    }                    // else                    //                    DateFormat fmt = DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL);                    Date de1 = null;                    try {                        de1 = fmt.parse(e1.getTime());                    } catch (Throwable t) {                    }                    Date de2 = null;                    try {                        de2 = fmt.parse(e2.getTime());                    } catch (Throwable t) {                    }                    if (de1 != null && de2 != null)                        return (int) (de1.getTime() - de2.getTime());                    else if (de1 == null && de2 != null)                        return -1;                    else if (de1 != null && de2 == null)                        return 1;                    // else                    return 0;                }            });            // process the events            //            if (events != null && events.length != 0) {                List okEvents = new ArrayList(events.length);                // This synchronization loop will hold onto the lock                // for a while. If the handlers are going to change                // often, which is shouldn't then might want to consider                // duplicating the handlers into an array before processing                // the events.                //                // Doing the synchronization in the outer loop prevents spending                // lots of cycles doing synchronization when it should not                // normally                // be necesary.                //                synchronized (m_handlers) {                    Iterator iter = m_handlers.iterator();                    while (iter.hasNext()) {                        // get the handler and then have it process all                        // the events in the document before moving to the                        // next event handler.                        //                        EventHandler hdl = (EventHandler) iter.next();                        for (int ndx = 0; ndx < events.length; ndx++) {                            // Process the event and log any errors,                            // but don't die on these errors                            //                            try {                                if (isTracing)                                    log.debug("handling event, uei = " + events[ndx].getUei());                                // shortcut and BOTH parts MUST execute!                                //                                if (hdl.processEvent(events[ndx])) {                                    if (!okEvents.contains(events[ndx])) {                                        okEvents.add(events[ndx]);                                    }                                }                            } catch (Throwable t) {                                log.warn("An exception occured while processing an event", t);                            }                        }                    } // end iteration over handler list.                } // end synchronization                // Now process the good events and send                // a receipt message                //                boolean hasReceipt = false;                EventReceipt receipt = new EventReceipt();                Iterator iter = okEvents.iterator();                while (iter.hasNext()) {                    Event e = (Event) iter.next();                    if (e.getUuid() != null) {                        receipt.addUuid(e.getUuid());                        hasReceipt = true;                    }                }                if (hasReceipt) {                    // Transform it to XML and send it across the                    // socket all in one call                    //                    try {                        OutputStreamWriter writer = new OutputStreamWriter(new BufferedOutputStream(m_connection.getOutputStream()));                        Marshaller.marshal(receipt, writer);                        writer.flush();                        synchronized (m_handlers) {                            iter = m_handlers.iterator();                            while (iter.hasNext()) {                                // get the handler and then have it process all                                // the events in the document before moving to                                // the                                // next event handler.                                //                                EventHandler hdl = (EventHandler) iter.next();                                try {                                    hdl.receiptSent(receipt);                                } catch (Throwable t) {                                    log.warn("An exception occured while processing an event receipt", t);                                }                            } // end iteration over handler list.                        } // end synchronization                        if (isTracing) {                            log.debug("Sending Event Receipt {");                            StringWriter swriter = new StringWriter();                            try {                                Marshaller.marshal(receipt, swriter);                            } catch (Exception e) {                                log.debug("An error occured during marshalling", e);                            }                            log.debug(swriter.getBuffer().toString());                            log.debug("}");                        }                    } catch (ValidationException e) {                        log.warn("Failed to send event-receipt XML document", e);                        break MAINLOOP;                    } catch (MarshalException e) {                        log.warn("Failed to send event-receipt XML document", e);                        break MAINLOOP;                    } catch (IOException e) {                        log.warn("Failed to send event-receipt XML document", e);                        break MAINLOOP;                    }                }            } else if (isTracing) {                log.debug("The agent sent an empty event stream");            }        } // end main loop        try {            if (isTracing)                log.debug("stopping record handler");            chunker.stop();            if (isTracing)                log.debug("record handler stopped");        } catch (InterruptedException e) {            log.warn("The thread was interrupted while trying to close the record handler", e);        }        // regardless of any errors, be sure to release the socket.        //        try {            if (isTracing)                log.debug("closing connnection");            m_connection.close();            if (isTracing)                log.debug("connnection closed ");        } catch (IOException e) {            log.warn("An I/O exception occured while closing the TCP/IP connection", e);        }        if (isTracing)            log.debug("Thread exiting");    } // end run}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -