📄 srdiindex.java
字号:
*@param record Btree Record *@return ArrayList of entries */ public static SrdiIndexRecord readRecord(Record record) { ArrayList result = new ArrayList(); Key key = null; if (record == null) { return new SrdiIndexRecord(null, result); } if (record.getValue().getLength() <= 0) { return new SrdiIndexRecord(null, result); } InputStream is = record.getValue().getInputStream(); try { DataInputStream ois = new DataInputStream(is); key = new Key(ois.readUTF()); int size = ois.readInt(); for (int i = 0; i < size; i++) { try { String idstr = ois.readUTF(); PeerID pid = (PeerID) IDFactory.fromURI(new URI(idstr)); long exp = ois.readLong(); Entry entry = new Entry(pid, exp); result.add(entry); } catch (URISyntaxException badID) { continue; } } ois.close(); } catch (EOFException eofe) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Empty record", eofe); } } catch (IOException ie) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception while reading Entry", ie); } } return new SrdiIndexRecord(key, result); } /** * Empties the index completely. * The entries are abandonned to the GC. */ public synchronized void clear() { // FIXME changing the behavior a bit // instead of dropping all srdi entries, we let them expire // if that is not a desired behavior the indexer could be dropped // simply close it, and remove all index db created try { srdiIndexer.close(); cacheDB.close(); } catch (Exception e) { // bad bits we are done if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("failed to close index", e); } } } /** * Garbage Collect expired entries */ public synchronized void garbageCollect() { try { Map map = srdiIndexer.getIndexers(); Iterator it = map.keySet().iterator(); List list = new ArrayList(); while (it.hasNext()) { String indexName = (String) it.next(); NameIndexer idxr = (NameIndexer) map.get(indexName); idxr.query(null, new GcCallback(cacheDB, srdiIndexer, list, gcPeerTBL)); srdiIndexer.purge(list); } gcPeerTBL.clear(); } catch (Exception ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failure during SRDI Garbage Collect", ex); } } } /** * Remove expired entries from an ArrayList * *@param list The ArrayLsit *@return ArrayList without any expired entries */ private static ArrayList removeExpired(ArrayList list) { for (int i = 0; i < list.size(); i++) { Entry entry = (Entry) list.get(i); boolean expired = isExpired(entry.expiration); if (expired) { list.remove(i); i--; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Entry peerid :" + entry.peerid + " Expires at :" + entry.expiration); LOG.debug("Entry expired " + expired); } } } return list; } private static boolean isExpired(long expiration) { return (expiration < System.currentTimeMillis()); } /** * stop the current running thread */ public synchronized void stop() { stop = true; // wakeup and die try { Thread temp = gcThread; if (temp != null) { synchronized (temp) { temp.notify(); } } } catch (Exception ignored) {// ignored } // Stop the database try { srdiIndexer.close(); cacheDB.close(); gcPeerTBL.clear(); } catch (Exception ex) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Unable to stop the Srdi Indexer", ex); } } } /** * {@inheritDoc} * * <p/>Periodic thread for GC */ public void run() { try { while (!stop) { try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Waiting for " + interval + "ms before garbage collection"); } synchronized (gcThread) { gcThread.wait(interval); } } catch (InterruptedException woken) { // The only reason we are woken is to stop. Thread.interrupted(); continue; } if (stop) { break; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Garbage collection started"); } garbageCollect(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Garbage collection completed"); } } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all); } } finally { synchronized (this) { gcThread = null; } } } /** * Flushes the Srdi directory for a specified group * this method should only be called before initialization of a given group * calling this method on a running group would have undefined results * *@param group group context */ public static void clearSrdi(PeerGroup group) { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Clearing SRDI for " + group.getPeerGroupName()); } try { String pgdir = null; if (group == null) { pgdir = "srdi-index"; } else { pgdir = group.getPeerGroupID().getUniqueValue().toString(); } File rootDir = new File( new File( new File(group.getStoreHome()), "cm" ), pgdir); rootDir = new File(rootDir, "srdi"); if (rootDir.exists()) { // remove it along with it's content String[] list = rootDir.list(); for (int i = 0; i < list.length; i++) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Removing : " + list[i]); } File file = new File(rootDir, list[i]); if (!file.delete()) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Unable to delete the file"); } } } rootDir.delete(); } } catch (Throwable t) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Unable to clear Srdi", t); } } } /** * An entry in the index tables. */ public final static class Entry { public PeerID peerid; public long expiration; /** * Peer Pointer reference * *@param peerid PeerID for this entry *@param expiration the expiration for this entry */ public Entry(PeerID peerid, long expiration) { this.peerid = peerid; this.expiration = expiration; } /** * {@inheritDoc} */ public boolean equals(Object obj) { if (obj instanceof Entry) { return (peerid.equals(((Entry) obj).peerid)); } return false; } /** * {@inheritDoc} */ public int hashCode() { return peerid.hashCode(); } } /** * an SrdiIndexRecord wrapper */ public final static class SrdiIndexRecord { public Key key; public ArrayList list; /** * SrdiIndex record * *@param key record key *@param list record entries */ public SrdiIndexRecord(Key key, ArrayList list) { this.key = key; this.list = list; } /** * {@inheritDoc} */ public boolean equals(Object obj) { if (obj instanceof SrdiIndexRecord) { return (key.equals(((SrdiIndexRecord) obj).key)); } return false; } /** *{@inheritDoc} */ public int hashCode() { return key.hashCode(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -