📄 writerpoolprocessor.java
字号:
curi + " " + curi.getVia()); } InetAddress a = h.getIP(); if (a == null) { throw new NullPointerException("Address is null for " + curi + " " + curi.getVia() + ". Address " + ((h.getIpFetched() == CrawlHost.IP_NEVER_LOOKED_UP)? "was never looked up.": (System.currentTimeMillis() - h.getIpFetched()) + " ms ago.")); } return h.getIP().getHostAddress(); } /** * Version of getAttributes that catches and logs exceptions * and returns null if failure to fetch the attribute. * @param name Attribute name. * @return Attribute or null. */ public Object getAttributeUnchecked(String name) { Object result = null; try { result = super.getAttribute(name); } catch (AttributeNotFoundException e) { logger.warning(e.getLocalizedMessage()); } catch (MBeanException e) { logger.warning(e.getLocalizedMessage()); } catch (ReflectionException e) { logger.warning(e.getLocalizedMessage()); } return result; } /** * Max size we want files to be (bytes). * * Default is ARCConstants.DEFAULT_MAX_ARC_FILE_SIZE. Note that ARC * files will usually be bigger than maxSize; they'll be maxSize + length * to next boundary. * @return ARC maximum size. */ public int getMaxSize() { Object obj = getAttributeUnchecked(ATTR_MAX_SIZE_BYTES); return (obj == null)? DEFAULT_MAX_FILE_SIZE: ((Integer)obj).intValue(); } public String getPrefix() { Object obj = getAttributeUnchecked(ATTR_PREFIX); return (obj == null)? WriterPoolMember.DEFAULT_PREFIX: (String)obj; } public List getOutputDirs() { Object obj = getAttributeUnchecked(ATTR_PATH); List list = (obj == null)? Arrays.asList(DEFAULT_PATH): (StringList)obj; ArrayList<File> results = new ArrayList<File>(); for (Iterator i = list.iterator(); i.hasNext();) { String path = (String)i.next(); File f = new File(path); if (!f.isAbsolute()) { f = new File(getController().getDisk(), path); } if (!f.exists()) { try { f.mkdirs(); } catch (Exception e) { e.printStackTrace(); continue; } } results.add(f); } return results; } public boolean isCompressed() { Object obj = getAttributeUnchecked(ATTR_COMPRESS); return (obj == null)? DEFAULT_COMPRESS: ((Boolean)obj).booleanValue(); } /** * @return Returns the poolMaximumActive. */ public int getPoolMaximumActive() { Object obj = getAttributeUnchecked(ATTR_POOL_MAX_ACTIVE); return (obj == null)? WriterPool.DEFAULT_MAX_ACTIVE: ((Integer)obj).intValue(); } /** * @return Returns the poolMaximumWait. */ public int getPoolMaximumWait() { Object obj = getAttributeUnchecked(ATTR_POOL_MAX_WAIT); return (obj == null)? WriterPool.DEFAULT_MAXIMUM_WAIT: ((Integer)obj).intValue(); } public String getSuffix() { Object obj = getAttributeUnchecked(ATTR_SUFFIX); String sfx = (obj == null)? WriterPoolMember.DEFAULT_SUFFIX: (String)obj; if (sfx != null && sfx.trim(). equals(WriterPoolMember.HOSTNAME_VARIABLE)) { String str = "localhost.localdomain"; try { str = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException ue) { logger.severe("Failed getHostAddress for this host: " + ue); } sfx = str; } return sfx; } public long getMaxToWrite() { Object obj = getAttributeUnchecked(ATTR_MAX_BYTES_WRITTEN); return (obj == null)? 0: ((Long)obj).longValue(); } public void crawlEnding(String sExitMessage) { this.pool.close(); } public void crawlEnded(String sExitMessage) { // sExitMessage is unused. } /* (non-Javadoc) * @see org.archive.crawler.event.CrawlStatusListener#crawlStarted(java.lang.String) */ public void crawlStarted(String message) { // TODO Auto-generated method stub } protected String getCheckpointStateFile() { return this.getClass().getName() + ".state"; } public void crawlCheckpoint(File checkpointDir) throws IOException { int serial = getSerialNo().get(); if (this.pool.getNumActive() > 0) { // If we have open active Archive files, up the serial number // so after checkpoint, we start at one past current number and // so the number we serialize, is one past current serialNo. // All this serial number manipulation should be fine in here since // we're paused checkpointing (Revisit if this assumption changes). serial = getSerialNo().incrementAndGet(); } saveCheckpointSerialNumber(checkpointDir, serial); // Close all ARCs on checkpoint. try { this.pool.close(); } finally { // Reopen on checkpoint. setupPool(new AtomicInteger(serial)); } } public void crawlPausing(String statusMessage) { // sExitMessage is unused. } public void crawlPaused(String statusMessage) { // sExitMessage is unused. } public void crawlResuming(String statusMessage) { // sExitMessage is unused. } private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { stream.defaultReadObject(); ObjectPlusFilesInputStream coistream = (ObjectPlusFilesInputStream)stream; coistream.registerFinishTask( new Runnable() { public void run() { setupPool(new AtomicInteger()); } }); } protected WriterPool getPool() { return pool; } protected void setPool(WriterPool pool) { this.pool = pool; } protected long getTotalBytesWritten() { return totalBytesWritten; } protected void setTotalBytesWritten(long totalBytesWritten) { this.totalBytesWritten = totalBytesWritten; } /** * Called out of {@link #initialTasks()} when recovering a checkpoint. * Restore state. */ protected void checkpointRecover() { int serialNo = loadCheckpointSerialNumber(); if (serialNo != -1) { getSerialNo().set(serialNo); } } /** * @return Serial number from checkpoint state file or if unreadable, -1 * (Client should check for -1). */ protected int loadCheckpointSerialNumber() { int result = -1; // If in recover mode, read in the Writer serial number saved // off when we checkpointed. File stateFile = new File(getSettingsHandler().getOrder() .getController().getCheckpointRecover().getDirectory(), getCheckpointStateFile()); if (!stateFile.exists()) { logger.info(stateFile.getAbsolutePath() + " doesn't exist so cannot restore Writer serial number."); } else { DataInputStream dis = null; try { dis = new DataInputStream(new FileInputStream(stateFile)); result = dis.readShort(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { if (dis != null) { dis.close(); } } catch (IOException e) { e.printStackTrace(); } } } return result; } protected void saveCheckpointSerialNumber(final File checkpointDir, final int serialNo) throws IOException { // Write out the current state of the ARCWriter serial number. File f = new File(checkpointDir, getCheckpointStateFile()); DataOutputStream dos = new DataOutputStream(new FileOutputStream(f)); try { dos.writeShort(serialNo); } finally { dos.close(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -