📄 momeventengine.java
字号:
PublishKey pubKey = new PublishKey(this.glob, oid); MsgUnit msg = new MsgUnit(this.glob, pubKey.toXml(), message, qos); return this.con.publish(msg).toXml(); } public boolean registerAlertListener(I_Update update, Map attrs) throws Exception { if (this.eventHandler != null) return false; this.eventHandler = update; return true; } public void shutdown() { log.fine("Closing xmlBlaster connection"); if (this.con != null && this.shutdownMom) { this.con.disconnect(null); this.con = null; this.glob = null; } } /** * @see org.xmlBlaster.contrib.I_ChangePublisher#getJmsSession() */ public XBSession getJmsSession() { return new XBSession(this.glob, XBSession.AUTO_ACKNOWLEDGE, false); } /** * Compresses the message if needed. * @param buffer The buffer to compress * @param props The properties to update with the compressed flag (uncompressed size) * @param compressSizeLimit The limit for compression. If less than one no compression * is done. If the size of the buffer is less than this limit it is not compressed either. * @return the compressed buffer or the input buffer if no compression was needed. * @deprecated */ public static byte[] compressOLD(byte[] buffer, Map props, int compressSizeLimit, String zipType) { if (compressSizeLimit < 1L) return buffer; if (buffer.length < compressSizeLimit) return buffer; int uncompressedLength = buffer.length; // check if not already compressed if (props != null && props.containsKey(DbWatcherConstants._COMPRESSION_TYPE)) { log.fine("The message is already compressed, will not compress it"); return buffer; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] ret = null; try { GZIPOutputStream zippedStream = new GZIPOutputStream(baos); ObjectOutputStream objectOutputStream = new ObjectOutputStream(zippedStream); objectOutputStream.writeObject(buffer); objectOutputStream.flush(); zippedStream.finish(); ret = baos.toByteArray(); objectOutputStream.close(); if (ret.length >= uncompressedLength) { log.fine("The compressed size is bigger than the original. Will not compress since it does not make sense"); return buffer; } if (props != null) { props.put(DbWatcherConstants._UNCOMPRESSED_SIZE, "" + uncompressedLength); props.put(DbWatcherConstants._COMPRESSION_TYPE, DbWatcherConstants.COMPRESSION_TYPE_GZIP); } return ret; } catch(IOException ex) { log.severe("An exception occured when compressing: '" + ex.getMessage() + "' will not compress"); return buffer; } } /** * Compresses the message if needed. * @param buffer The buffer to compress * @param props The properties to update with the compressed flag (uncompressed size) * @param compressSizeLimit The limit for compression. If less than one no compression * is done. If the size of the buffer is less than this limit it is not compressed either. * @return the compressed buffer or the input buffer if no compression was needed. */ public static byte[] compress(byte[] buffer, Map props, int compressSizeLimit, String zipType) { if (compressSizeLimit < 1L) return buffer; if (buffer.length < compressSizeLimit) return buffer; int uncompressedLength = buffer.length; // check if not already compressed if (props != null && props.containsKey(DbWatcherConstants._COMPRESSION_TYPE)) { log.fine("The message is already compressed, will not compress it"); return buffer; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] ret = null; try { GZIPOutputStream zippedStream = new GZIPOutputStream(baos); zippedStream.write(buffer); zippedStream.finish(); ret = baos.toByteArray(); if (ret.length >= uncompressedLength) { log.fine("The compressed size is bigger than the original. Will not compress since it does not make sense"); return buffer; } if (props != null) { props.put(DbWatcherConstants._UNCOMPRESSED_SIZE, "" + uncompressedLength); props.put(DbWatcherConstants._COMPRESSION_TYPE, DbWatcherConstants.COMPRESSION_TYPE_GZIP); } return ret; } catch(IOException ex) { log.severe("An exception occured when compressing: '" + ex.getMessage() + "' will not compress"); return buffer; } } private final static String dumpProps(Map clientProperties) { if (clientProperties == null) return ""; Iterator iter = clientProperties.values().iterator(); StringBuffer buf = new StringBuffer(512); while (iter.hasNext()) { ClientProperty prop = (ClientProperty)iter.next(); buf.append(prop.toXml()).append("\n"); } return buf.toString(); } /** * * @param buffer * @param clientProperties * @deprecated you should use the one with InputStream instead since less memory hungry (for big messages) * @return */ public static byte[] decompressXX(byte[] buffer, Map clientProperties) { if (clientProperties == null) return buffer; Object obj = clientProperties.get(DbWatcherConstants._COMPRESSION_TYPE); if (obj == null) { log.fine("The client property '" + DbWatcherConstants._COMPRESSION_TYPE + "' was not found. Will not expand"); return buffer; } if (obj instanceof String) obj = new ClientProperty(DbWatcherConstants._COMPRESSION_TYPE, null, null, (String)obj); ClientProperty prop = (ClientProperty)obj; String compressionType = prop.getStringValue().trim(); if (DbWatcherConstants.COMPRESSION_TYPE_GZIP.equals(compressionType)) { obj = clientProperties.get(DbWatcherConstants._UNCOMPRESSED_SIZE); if (obj == null) { log.severe("Can not expand message since no uncompressed size defined (will return it unexpanded)"); return buffer; } if (obj instanceof String) obj = new ClientProperty(DbWatcherConstants._UNCOMPRESSED_SIZE, null, null, (String)obj); prop = (ClientProperty)obj; ByteArrayInputStream bais = new ByteArrayInputStream(buffer); byte[] ret = null; try { GZIPInputStream zippedStream = new GZIPInputStream(bais); ObjectInputStream objectInputStream = new ObjectInputStream(zippedStream); ret = (byte[])objectInputStream.readObject(); objectInputStream.close(); // in case we cascade it still works fine clientProperties.remove(DbWatcherConstants._COMPRESSION_TYPE); clientProperties.remove(DbWatcherConstants._UNCOMPRESSED_SIZE); return ret; } catch(IOException ex) { log.severe("An IOException occured when uncompressing. Will not expand: " + ex.getMessage() + ": props='" + dumpProps(clientProperties) + "' and content '" + new String(buffer) + "'"); if (log.isLoggable(Level.FINE)) ex.printStackTrace(); return buffer; } catch(ClassNotFoundException ex) { log.severe("A ClassCastException occured when uncompressing. Will not expand: " + ex.getMessage()); if (log.isLoggable(Level.FINE)) ex.printStackTrace(); return buffer; } } else { log.warning("The compression type '" + compressionType + "' is unknown: will not decompress"); return buffer; } } public static InputStream decompress(InputStream is, Map clientProperties) { if (clientProperties == null) return is; Object obj = clientProperties.get(DbWatcherConstants._COMPRESSION_TYPE); if (obj == null) { log.fine("The client property '" + DbWatcherConstants._COMPRESSION_TYPE + "' was not found. Will not expand"); return is; } if (obj instanceof String) obj = new ClientProperty(DbWatcherConstants._COMPRESSION_TYPE, null, null, (String)obj); ClientProperty prop = (ClientProperty)obj; String compressionType = prop.getStringValue().trim(); if (DbWatcherConstants.COMPRESSION_TYPE_GZIP.equals(compressionType)) { obj = clientProperties.get(DbWatcherConstants._UNCOMPRESSED_SIZE); if (obj == null) { log.severe("Can not expand message since no uncompressed size defined (will return it unexpanded)"); return is; } if (obj instanceof String) obj = new ClientProperty(DbWatcherConstants._UNCOMPRESSED_SIZE, null, null, (String)obj); prop = (ClientProperty)obj; try { GZIPInputStream ret = new GZIPInputStream(is); clientProperties.remove(DbWatcherConstants._COMPRESSION_TYPE); clientProperties.remove(DbWatcherConstants._UNCOMPRESSED_SIZE); return ret; } catch (IOException ex) { log.severe("An Exception occured when trying to decompress the stream, probably not gzipped"); ex.printStackTrace(); return is; } } return is; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -