📄 publisher.java
字号:
/** * If an exception occurs it means it could not publish the entry * @throws XmlBlasterException */ public void shutdown() throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(ME+": shutdown"); timeout.removeTimeoutListener(this.timeoutHandle); this.isActive = false; this.forceShutdown = true; // in case doPublish is looping due to an exception synchronized (this) { if (replSource != null) { replSource.setEngine(null); replSource = null; replSourceEngine = null; publisher.shutdown(); publisher = null; } this.isShutdown = false; this.access.disconnect(new DisconnectQos(this.global)); this.global.shutdown(); } } /** * Fail-safe sending files. * @return Comman separated list of send file names */ public synchronized void publish() { while (true) { try { doPublish(); break; } catch (XmlBlasterException ex) { log.severe(ME+": publish: exception " + ex.getMessage()); try { Thread.sleep(this.pollInterval); } catch (Exception e) {} } if (this.forceShutdown) break; } } /** * Create a comma separated list of file names. * @param infos * @param max Max file names to collect * @return */ public String toString(FileInfo[] infos, int max) { StringBuffer sb = new StringBuffer(); if (max <= 0) max = infos.length; if (max > infos.length) max = infos.length; for (int i=0; i<max; i++) { if (i>0) sb.append(","); sb.append(infos[i].getRelativeName()); } return sb.toString(); } private String preparePubQos(String origQos) throws XmlBlasterException { if (replSourceEngine == null || origQos == null) return origQos; MsgQosData msgQosData = global.getMsgQosFactory().readObject(origQos); MsgQosData preparedMsgQosData = replSourceEngine.preparePubQos(msgQosData); return preparedMsgQosData.toXml(); } /** * Publish file or files to xmlBlaster. * @return An empty string if nothing was sent, is never null * @throws XmlBlasterException */ private FileInfo[] doPublish() throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(ME+": doPublish"); Set entries = this.directoryManager.getEntries(); if (entries == null || entries.size() < 1) return new FileInfo[0]; FileInfo[] infos = (FileInfo[])entries.toArray(new FileInfo[entries.size()]); for (int i=0; i < infos.length; i++) { if (this.maximumFileSize <= 0L || infos[i].getSize() <= this.maximumFileSize) { if (infos[i].getSize() > maximumChunkSize) { // log.warning("Not implemented yet: the size '" + infos[i].getSize() + "' is bigger than the maximum chunk size (" + maximumChunkSize + ")"); InputStream is = directoryManager.getContentStream(infos[i]); Global glob = access.getGlobal(); MsgKeyData keyData = glob.getMsgKeyFactory().readObject(publishKey); MsgQosData qosData = glob.getMsgQosFactory().readObject(publishQos); qosData.addClientProperty(ContribConstants.FILENAME_ATTR, infos[i].getRelativeName()); qosData.addClientProperty(ContribConstants.FILE_DATE, infos[i].getTimestamp()); qosData.addClientProperty(Constants.addJmsPrefix(XBConnectionMetaData.JMSX_MAX_CHUNK_SIZE, log), maximumChunkSize); access.publishStream(is, keyData, qosData, maximumChunkSize, replSourceEngine); if (log.isLoggable(Level.FINE)) log.fine(ME+": Successfully published file " + infos[i].getRelativeName() + " with size=" +infos[i].getSize()); } else if (infos[i].getSize() > Integer.MAX_VALUE) { log.severe(ME+": doPublish: sizes bigger than '" + Integer.MAX_VALUE + "' are currently not implemented"); } else { byte[] content = this.directoryManager.getContent(infos[i]); String pubQos = preparePubQos(this.publishQos); MsgUnit msgUnit = new MsgUnit(this.publishKey, content, pubQos); msgUnit.getQosData().addClientProperty(ContribConstants.FILENAME_ATTR, infos[i].getRelativeName()); msgUnit.getQosData().addClientProperty(ContribConstants.FILE_DATE, infos[i].getTimestamp()); this.access.publish(msgUnit); if (log.isLoggable(Level.FINE)) log.fine(ME+": Successfully published file " + infos[i].getRelativeName() + " with size=" +infos[i].getSize()); } while (true) { // must repeat until it works or until shut down try { boolean success = true; this.directoryManager.deleteOrMoveEntry(infos[i].getName(), success); break; } catch (XmlBlasterException ex) { log.severe(ME+": Moving " + infos[i].getName() + " failed, we try again without further publishing (please fix manually): " + ex.getMessage()); try { Thread.sleep(this.pollInterval); } catch (Exception e){} } if (this.forceShutdown) break; } } else { // delete or move to 'discarded' log.warning(ME+": doPublish: the file '" + infos[i].getName() + "' is too long (" + infos[i].getSize() + "'): I will remove it without publishing"); boolean success = false; try { this.directoryManager.deleteOrMoveEntry(infos[i].getName(), success); } catch (XmlBlasterException ex) { log.warning(ME+": doPublish: could not handle file '" + infos[i].getName() + "' which was too big: check file and directories permissions and fix it manually: I will continue working anyway. " + ex.getMessage()); } } } return infos; } /** * @see org.xmlBlaster.util.I_Timeout#timeout(java.lang.Object) */ public void timeout(Object userData) { try { if (log.isLoggable(Level.FINER)) log.finer(ME+": timeout"); publish(); } catch (Throwable ex) { ex.printStackTrace(); log.severe(ME+": timeout: " + ex.getMessage()); } finally { if (this.pollInterval >= 0) this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null); } } public void activate() throws Exception { if (!this.isActive) { this.isActive = true; if (this.pollInterval >= 0) this.timeoutHandle = timeout.addTimeoutListener(this, this.pollInterval, null); } } /* (non-Javadoc) * @see org.xmlBlaster.util.admin.I_AdminService#deActivate() */ public void deActivate() { timeout.removeTimeoutListener(this.timeoutHandle); this.timeoutHandle = null; this.isActive = false; } /* (non-Javadoc) * @see org.xmlBlaster.util.admin.I_AdminService#isActive() */ public boolean isActive() { return this.isActive; } public String triggerScan() { try { //this.timeoutHandle = timeout.addTimeoutListener(this, 0, null); // Hack: I need to call it twice to be effective, why? (Marcel 2006-01) for (int i=0; i<2; i++) { FileInfo[] infos = doPublish(); if (infos.length > 0) { return "Published matching files '" + toString(infos, 10) + "'"; } } if (this.delaySinceLastFileChange > 0) return "No matching file found to publish, note that it may take delaySinceLastFileChange=" + this.delaySinceLastFileChange + " millis until the file is sent."; else return "No matching file found to publish."; } catch (XmlBlasterException e) { throw new IllegalArgumentException(e.getMessage()); } } /** * @return Returns the directoryName. */ public String getDirectoryName() { return this.directoryName; } /** * @param directoryName The directoryName to set. */ public void setDirectoryName(String directoryName) { this.directoryName = directoryName; reCreateDirectoryManager(); } /** * @return Returns the fileFilter. */ public String getFileFilter() { return this.fileFilter; } /** * @param fileFilter The fileFilter to set. */ public void setFileFilter(String fileFilter) { this.fileFilter = fileFilter; reCreateDirectoryManager(); } /** * @return Returns the filterType. */ public String getFilterType() { return this.filterType; } /** * @param filterType The filterType to set. */ public void setFilterType(String filterType) { this.filterType = filterType; reCreateDirectoryManager(); } /** * @return Returns the maximumFileSize. */ public long getMaximumFileSize() { return this.maximumFileSize; } /** * @param maximumFileSize The maximumFileSize to set. */ public void setMaximumFileSize(long maximumFileSize) { this.maximumFileSize = maximumFileSize; } /** * @return Returns the pollInterval. */ public long getPollInterval() { return this.pollInterval; } /** * @param pollInterval The pollInterval to set. */ public void setPollInterval(long pollInterval) { this.pollInterval = pollInterval; if (this.pollInterval < 0) deActivate(); } /** * @return Returns the copyOnMove. */ public boolean isCopyOnMove() { return this.copyOnMove; } /** * @param copyOnMove The copyOnMove to set. */ public void setCopyOnMove(boolean copyOnMove) { this.copyOnMove = copyOnMove; reCreateDirectoryManager(); } /** * @return Returns the delaySinceLastFileChange. */ public long getDelaySinceLastFileChange() { return this.delaySinceLastFileChange; } /** * @param delaySinceLastFileChange The delaySinceLastFileChange to set. */ public void setDelaySinceLastFileChange(long delaySinceLastFileChange) { this.delaySinceLastFileChange = delaySinceLastFileChange; reCreateDirectoryManager(); } /** * @return Returns the discarded. */ public String getDiscarded() { return this.discarded; } /** * @param discarded The discarded to set. */ public void setDiscarded(String discarded) { this.discarded = discarded; reCreateDirectoryManager(); } /** * @return Returns the lockExtention. */ public String getLockExtention() { return this.lockExtention; } /** * @param lockExtention The lockExtention to set. */ public void setLockExtention(String lockExtention) { this.lockExtention = lockExtention; reCreateDirectoryManager(); } /** * @return Returns the sent. */ public String getSent() { return this.sent; } /** * @param sent The sent to set. */ public void setSent(String sent) { this.sent = sent; reCreateDirectoryManager(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -