📄 srdiindex.java
字号:
} } catch (DBException de) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed to retrieve SrdiIndex record", de); } } // if record is null, readRecord returns an empty list return readRecord(record).list; } /** * inserts a pkey into a map with a value of value.toUpperCase() * *@param primaryKey primary key *@param value value *@return The Map */ private Map getIndexMap(String primaryKey, String value) { if (primaryKey == null) { return null; } if (value == null) { value = ""; } Map map = new HashMap(1); map.put(primaryKey, value.toUpperCase()); return map; } /** * remove entries pointing to peer id from cache * *@param pid peer id to remove */ public synchronized void remove(PeerID pid) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug(" Adding " + pid + " to peer GC table"); } gcPeerTBL.add(pid); } /** * Query SrdiIndex * * @param attribute Attribute String to query on * @param value value of the attribute string * @return an enumeration of canonical paths */ public synchronized Vector query(String primaryKey, String attribute, String value, int threshold) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + indexName + "] Querying for " + threshold + " " + primaryKey + "/" + attribute + " = '" + value + "'"); } // return nothing if (primaryKey == null) { return new Vector(); } Vector res; // a blind query if (attribute == null) { res = query(primaryKey); } else { res = new Vector(); IndexQuery iq = Cm.getIndexQuery(value); try { srdiIndexer.search(iq, primaryKey + attribute, new SearchCallback(cacheDB, res, threshold, gcPeerTBL)); } catch (Exception ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception while searching in index", ex); } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + indexName + "] Returning " + res.size() + " results for " + primaryKey + "/" + attribute + " = '" + value + "'"); } return res; } /** * Query SrdiIndex * *@param primaryKey primary key *@return an enumeration of peerids */ public synchronized Vector query(String primaryKey) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + indexName + "] Querying for " + primaryKey); } Vector res = new Vector(); try { Map map = srdiIndexer.getIndexers(); Iterator it = map.keySet().iterator(); while (it != null && it.hasNext()) { String indexName = (String) it.next(); // seperate the index name from attribute if (indexName.startsWith(primaryKey)) { NameIndexer idxr = (NameIndexer) map.get(indexName); idxr.query(null, new SearchCallback(cacheDB, res, Integer.MAX_VALUE, gcPeerTBL)); } } } catch (Exception ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception while searching in index", ex); } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + indexName + "] Returning " + res.size() + " results for " + primaryKey); } return res; } private static final class SearchCallback implements BTreeCallback { private BTreeFiler cacheDB = null; private int threshold; private Vector results; private Set table; SearchCallback(BTreeFiler cacheDB, Vector results, int threshold, Set table) { this.cacheDB = cacheDB; this.threshold = threshold; this.results = results; this.table = table; } /** * @inheritDoc */ public boolean indexInfo(Value val, long pos) { if (results.size() >= threshold) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("SearchCallback.indexInfo reached Threshold :" + threshold); } return false; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Found " + val.toString()); } Record record = null; try { record = cacheDB.readRecord(pos); } catch (DBException ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception while reading indexed", ex); } return false; } if (record == null) { return true; } long t0 = System.currentTimeMillis(); SrdiIndexRecord rec = readRecord(record); ArrayList res = rec.list; // if (LOG.isEnabledFor(Level.DEBUG)) { // LOG.debug("Got result back in : " + (System.currentTimeMillis() - t0) + "ms."); // } copyIntoVector(results, res, table); return true; } } private static final class GcCallback implements BTreeCallback { private BTreeFiler cacheDB = null; private Indexer idxr = null; private List list; private Set table; GcCallback(BTreeFiler cacheDB, Indexer idxr, List list, Set table) { this.cacheDB = cacheDB; this.idxr = idxr; this.list = list; this.table = table; } /** * @inheritDoc */ public boolean indexInfo(Value val, long pos) { Record record = null; synchronized(cacheDB) { try { record = cacheDB.readRecord(pos); } catch (DBException ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception while reading indexed", ex); } return false; } if (record == null) { return true; } SrdiIndexRecord rec = readRecord(record); ArrayList res = rec.list; boolean changed = false; for (int i = 0; i < res.size(); i++) { Entry entry = (Entry) res.get(i); if (isExpired(entry.expiration) || table.contains(entry.peerid)) { res.remove(i); changed = true; } } if (changed) { if (res.size() == 0) { try { cacheDB.deleteRecord(rec.key); list.add(new Long(pos)); } catch (DBException e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception while deleting empty record", e); } } } else { // write it back byte[] data = getData(rec.key, res); Value recordValue = new Value(data); try { cacheDB.writeRecord(pos, recordValue); } catch (DBException ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception while writing back record", ex); } } } } } return true; } } /** * copies the content of ArrayList into a vector expired entries are not * copied * *@param to Vector to copy into *@param from ArrayList to copy from */ private static void copyIntoVector(Vector to, ArrayList from, Set table) { for (int i = 0; i < from.size(); i++) { Entry entry = (Entry) from.get(i); boolean expired = isExpired(entry.expiration); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Entry peerid : " + entry.peerid + " Expires at : " + entry.expiration); LOG.debug("Entry expired " + expired); } if (!to.contains(entry.peerid) && !expired) { if (!table.contains(entry.peerid)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("adding Entry :" + entry.peerid + " to list"); } to.add(entry.peerid); } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Skipping gc marked entry :" + entry.peerid); } } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Skipping expired Entry :" + entry.peerid); } } } } /** * Converts a List of {@link Entry} into a byte[] * *@param key record key *@param list ArrayList to convert *@return byte [] */ private static byte[] getData(Key key, List list) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); dos.writeUTF(key.toString()); dos.writeInt(list.size()); Iterator eachEntry = list.iterator(); while (eachEntry.hasNext()) { Entry anEntry = (Entry) eachEntry.next(); dos.writeUTF(anEntry.peerid.toString()); dos.writeLong(anEntry.expiration); } dos.close(); return bos.toByteArray(); } catch (IOException ie) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Exception while reading Entry", ie); } } return null; } /** * Reads the content of a record into ArrayList *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -