📄 tcpstreamhandler.java
字号:
try { pipeIn = new PipedInputStream((PipedOutputStream) o); } catch (IOException e) { log.error("An I/O exception occured construction a record reader", e); break MAINLOOP; } // signal that we got the stream // synchronized (o) { o.notify(); } } // end synchronization on xchange // decrement the record count if greater than zero // m_recsPerConn -= (m_recsPerConn > 0 ? 1 : 0); // convert the pipe input stream into // and actual buffered input stream // InputStreamReader in = new InputStreamReader(new BufferedInputStream(pipeIn)); // Unmarshall the XML document // Log eLog = null; boolean doCleanup = false; try { eLog = (Log) Unmarshaller.unmarshal(Log.class, in); if (log.isDebugEnabled()) log.debug("Event record converted"); } catch (ValidationException e) { log.debug("The XML record is not valid", new ValidationException(e.getMessage())); doCleanup = true; } catch (MarshalException e) { log.debug("Could not unmarshall the XML record", new MarshalException(e.getMessage())); doCleanup = true; } // clean up the data on the current pipe if necessary // if (doCleanup) { // cleanup a failed record. Need to read // the remaining bytes from the other thread // to synchronize up. The other thread might // be blocked writing // try { while (in.read() != -1) /* do nothing */; } catch (IOException e) { // do nothing } // start from the top! // continue MAINLOOP; } // Now that we have a list of events, process them // Event[] events = eLog.getEvents().getEvent(); // sort the events by time // Arrays.sort(events, new Comparator() { public int compare(Object o1, Object o2) { Event e1 = (Event) o1; Event e2 = (Event) o2; boolean e1t = (e1.getTime() != null); boolean e2t = (e2.getTime() != null); if (e1t && !e2t) { return 1; } else if (!e1t && e2t) { return -1; } else if (!e1t && !e2t) { return 0; } // else // DateFormat fmt = DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL); Date de1 = null; try { de1 = fmt.parse(e1.getTime()); } catch (Throwable t) { } Date de2 = null; try { de2 = fmt.parse(e2.getTime()); } catch (Throwable t) { } if (de1 != null && de2 != null) return (int) (de1.getTime() - de2.getTime()); else if (de1 == null && de2 != null) return -1; else if (de1 != null && de2 == null) return 1; // else return 0; } }); // process the events // if (events != null && events.length != 0) { List okEvents = new ArrayList(events.length); // This synchronization loop will hold onto the lock // for a while. If the handlers are going to change // often, which is shouldn't then might want to consider // duplicating the handlers into an array before processing // the events. // // Doing the synchronization in the outer loop prevents spending // lots of cycles doing synchronization when it should not // normally // be necesary. // synchronized (m_handlers) { Iterator iter = m_handlers.iterator(); while (iter.hasNext()) { // get the handler and then have it process all // the events in the document before moving to the // next event handler. // EventHandler hdl = (EventHandler) iter.next(); for (int ndx = 0; ndx < events.length; ndx++) { // Process the event and log any errors, // but don't die on these errors // try { if (isTracing) log.debug("handling event, uei = " + events[ndx].getUei()); // shortcut and BOTH parts MUST execute! // if (hdl.processEvent(events[ndx])) { if (!okEvents.contains(events[ndx])) { okEvents.add(events[ndx]); } } } catch (Throwable t) { log.warn("An exception occured while processing an event", t); } } } // end iteration over handler list. } // end synchronization // Now process the good events and send // a receipt message // boolean hasReceipt = false; EventReceipt receipt = new EventReceipt(); Iterator iter = okEvents.iterator(); while (iter.hasNext()) { Event e = (Event) iter.next(); if (e.getUuid() != null) { receipt.addUuid(e.getUuid()); hasReceipt = true; } } if (hasReceipt) { // Transform it to XML and send it across the // socket all in one call // try { OutputStreamWriter writer = new OutputStreamWriter(new BufferedOutputStream(m_connection.getOutputStream())); Marshaller.marshal(receipt, writer); writer.flush(); synchronized (m_handlers) { iter = m_handlers.iterator(); while (iter.hasNext()) { // get the handler and then have it process all // the events in the document before moving to // the // next event handler. // EventHandler hdl = (EventHandler) iter.next(); try { hdl.receiptSent(receipt); } catch (Throwable t) { log.warn("An exception occured while processing an event receipt", t); } } // end iteration over handler list. } // end synchronization if (isTracing) { log.debug("Sending Event Receipt {"); StringWriter swriter = new StringWriter(); try { Marshaller.marshal(receipt, swriter); } catch (Exception e) { log.debug("An error occured during marshalling", e); } log.debug(swriter.getBuffer().toString()); log.debug("}"); } } catch (ValidationException e) { log.warn("Failed to send event-receipt XML document", e); break MAINLOOP; } catch (MarshalException e) { log.warn("Failed to send event-receipt XML document", e); break MAINLOOP; } catch (IOException e) { log.warn("Failed to send event-receipt XML document", e); break MAINLOOP; } } } else if (isTracing) { log.debug("The agent sent an empty event stream"); } } // end main loop try { if (isTracing) log.debug("stopping record handler"); chunker.stop(); if (isTracing) log.debug("record handler stopped"); } catch (InterruptedException e) { log.warn("The thread was interrupted while trying to close the record handler", e); } // regardless of any errors, be sure to release the socket. // try { if (isTracing) log.debug("closing connnection"); m_connection.close(); if (isTracing) log.debug("connnection closed "); } catch (IOException e) { log.warn("An I/O exception occured while closing the TCP/IP connection", e); } if (isTracing) log.debug("Thread exiting"); } // end run}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -