📄 srdiindex.java
字号:
/** * inserts a pkey into a map with a value of value * * @param primaryKey primary key * @param value value * @return The Map */ private Map<String, String> getIndexMap(String primaryKey, String value) { if (primaryKey == null) { return null; } if (value == null) { value = ""; } Map<String, String> map = new HashMap<String, String>(1); map.put(primaryKey, value); return map; } /** * remove entries pointing to peer id from cache * * @param pid peer id to remove */ public synchronized void remove(PeerID pid) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(" Adding " + pid + " to peer GC table"); } gcPeerTBL.add(pid); } /** * Query SrdiIndex * * @param attribute Attribute String to query on * @param value value of the attribute string * @return an enumeration of canonical paths * @param primaryKey primary key * @param threshold max number of results */ public synchronized List<PeerID> query(String primaryKey, String attribute, String value, int threshold) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("[" + indexName + "] Querying for " + threshold + " " + primaryKey + "/" + attribute + " = \'" + value + "\'"); } // return nothing if (primaryKey == null) { return Collections.emptyList(); } List<PeerID> res; // a blind query if (attribute == null) { res = query(primaryKey); } else { res = new ArrayList<PeerID>(); IndexQuery iq = Cm.getIndexQuery(value); try { srdiIndexer.search(iq, primaryKey + attribute, new SearchCallback(cacheDB, res, threshold, gcPeerTBL)); } catch (Exception ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception while searching in index", ex); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "[" + indexName + "] Returning " + res.size() + " results for " + primaryKey + "/" + attribute + " = \'" + value + "\'"); } return res; } /** * Query SrdiIndex * * @param primaryKey primary key * @return A list of Peer IDs. */ public synchronized List<PeerID> query(String primaryKey) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("[" + indexName + "] Querying for " + primaryKey); } List<PeerID> res = new ArrayList<PeerID>(); try { Map<String, NameIndexer> map = srdiIndexer.getIndexers(); for (Map.Entry<String, NameIndexer> index : map.entrySet()) { String indexName = index.getKey(); // seperate the index name from attribute if (indexName.startsWith(primaryKey)) { NameIndexer idxr = index.getValue(); idxr.query(null, new SearchCallback(cacheDB, res, Integer.MAX_VALUE, gcPeerTBL)); } } } catch (Exception ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception while searching in index", ex); } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("[" + indexName + "] Returning " + res.size() + " results for " + primaryKey); } return res; } private static final class SearchCallback implements BTreeCallback { private final BTreeFiler cacheDB; private final int threshold; private final List<PeerID> results; private final Set<PeerID> excludeTable; SearchCallback(BTreeFiler cacheDB, List<PeerID> results, int threshold, Set<PeerID> excludeTable) { this.cacheDB = cacheDB; this.threshold = threshold; this.results = results; this.excludeTable = excludeTable; } /** * @inheritDoc */ public boolean indexInfo(Value val, long pos) { if (results.size() >= threshold) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("SearchCallback.indexInfo reached Threshold :" + threshold); } return false; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Found " + val); } Record record = null; try { record = cacheDB.readRecord(pos); } catch (DBException ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception while reading indexed", ex); } return false; } if (record != null) { long t0 = TimeUtils.timeNow(); SrdiIndexRecord rec = readRecord(record); if (Logging.SHOW_FINEST && LOG.isLoggable(Level.FINEST)) { LOG.finest("Got result back in : " + (TimeUtils.timeNow() - t0) + "ms."); } copyIntoList(results, rec.list, excludeTable); } return results.size() < threshold; } } private static final class GcCallback implements BTreeCallback { private final BTreeFiler cacheDB; private final Indexer idxr; private final List<Long> list; private final Set<PeerID> table; GcCallback(BTreeFiler cacheDB, Indexer idxr, List<Long> list, Set<PeerID> table) { this.cacheDB = cacheDB; this.idxr = idxr; this.list = list; this.table = table; } /** * @inheritDoc */ public boolean indexInfo(Value val, long pos) { Record record = null; synchronized (cacheDB) { try { record = cacheDB.readRecord(pos); } catch (DBException ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception while reading indexed", ex); } return false; } if (record == null) { return true; } SrdiIndexRecord rec = readRecord(record); List<Entry> res = rec.list; boolean changed = false; Iterator<Entry> eachEntry = res.iterator(); while(eachEntry.hasNext()) { Entry entry = eachEntry.next(); if (entry.isExpired() || table.contains(entry.peerid)) { changed = true; eachEntry.remove(); } } if (changed) { if (res.isEmpty()) { try { cacheDB.deleteRecord(rec.key); list.add(pos); } catch (DBException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception while deleting empty record", e); } } } else { // write it back byte[] data = getData(rec.key, res); Value recordValue = new Value(data); try { cacheDB.writeRecord(pos, recordValue); } catch (DBException ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception while writing back record", ex); } } } } } return true; } } /** * copies the content of List into a list. Expired entries are not * copied * * @param to list to copy into * @param from list to copy from * @param table table of PeerID's */ private static void copyIntoList(List<PeerID> to, List<Entry> from, Set<PeerID> table) { for (Entry entry : from) { boolean expired = entry.isExpired(); if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("Entry peerid : " + entry.peerid + (expired ? " EXPIRED " : (" Expires at : " + entry.expiration))); } if (!to.contains(entry.peerid) && !expired) { if (!table.contains(entry.peerid)) { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("adding Entry :" + entry.peerid + " to list"); } to.add(entry.peerid); } else { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("Skipping gc marked entry :" + entry.peerid); } } } else { if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer("Skipping expired Entry :" + entry.peerid); } } } } /** * Converts a List of {@link Entry} into a byte[] * * @param key record key * @param list List to convert * @return byte [] */ private static byte[] getData(Key key, List<Entry> list) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); dos.writeUTF(key.toString()); dos.writeInt(list.size()); for (Entry anEntry : list) { dos.writeUTF(anEntry.peerid.toString()); dos.writeLong(anEntry.expiration); } dos.close(); return bos.toByteArray(); } catch (IOException ie) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Exception while reading Entry", ie); } } return null; } /** * Reads the content of a record into List * * @param record Btree Record * @return List of entries */ public static SrdiIndexRecord readRecord(Record record) { List<Entry> result = new ArrayList<Entry>(); Key key = null; if (record == null) { return new SrdiIndexRecord(null, result); } if (record.getValue().getLength() <= 0) { return new SrdiIndexRecord(null, result); } InputStream is = record.getValue().getInputStream(); try { DataInputStream ois = new DataInputStream(is); key = new Key(ois.readUTF()); int size = ois.readInt();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -