📄 basicprovider.java
字号:
responsibleNode}, null)); } sendMessage(responsibleNode, (GetMessage) request, StatVars.PROVIDER_GET); return; } if (request instanceof PutMessage) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(request, "Sending Put Message", new Object[]{"m", requestID, "d", responsibleNode}, null)); } sendMessage(responsibleNode, (PutMessage) request, StatVars.PROVIDER_PUT); return; } if (request instanceof RenewMessage) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(request, "Sending Renew Message", new Object[]{"m", requestID, "d", responsibleNode}, null)); } sendMessage(responsibleNode, (RenewMessage) request, StatVars.PROVIDER_RENEW); return; } } /** * Method routeUpCall * * @param destination * @param message * @param local * @return */ public boolean routeUpCall(BitID destination, Payload message, boolean local) { SendMessage theMessage = (SendMessage) message; StatCollector.addSample(StatVars.NETWORK_IN, StatVars.PROVIDER, StatVars.PROVIDER_SENDMESSAGE, SerializationManager.getPayloadSize(message)); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(theMessage, "Processing Route UpCall", new Object[]{"d", (destination == null) ? "null" : destination.toString(), "m", theMessage.getID(), "t", Integer.toHexString( theMessage.getItem().hashCode())}, new Object[]{ "p", theMessage.getItem()})); } // Retrieve part of the call back tree ArrayList clients = new ArrayList(); String commonID = theMessage.getNS() + delimiter + theMessage.getRID(); Iterator entries; if (destination == null) { addItemToStorage(theMessage.getNS(), theMessage.getRID(), theMessage.getIID(), theMessage.getItem(), theMessage.getLifetime(), theMessage.getLocationID()); entries = messageCallbacks.headMap(commonID).entrySet().iterator(); } else { entries = messageUpCallCallbacks.headMap(commonID).entrySet().iterator(); } while (entries.hasNext()) { // Check if the client's request actually matches the new item Map.Entry theEntry = (Map.Entry) entries.next(); if (commonID.startsWith((String) theEntry.getKey())) { ArrayList clientArray = (ArrayList) theEntry.getValue(); for (int i = 0; i < clientArray.size(); i++) { clients.add(clientArray.get(i)); } } } boolean stopSending = false; for (int i = 0; i < clients.size(); i++) { // Call the client boolean stopLocalSending = ((ProviderClient) clients.get( i)).messageResult( theMessage.getNS(), theMessage.getRID(), theMessage.getItem(), (destination != null), local); if (stopLocalSending) { stopSending = true; } } // Message has been changed, do not process further, the client handled the next step if (stopSending) { if ((storeUpCalls) && (destination != null)) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(theMessage.getItem(), "Adding to Storage", new Object[]{"c", commonID}, null)); } addItemToStorage(theMessage.getNS(), theMessage.getRID(), theMessage.getIID(), theMessage.getItem(), theMessage.getLifetime(), theMessage.getLocationID()); } return true; } StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.PROVIDER, StatVars.PROVIDER_SENDMESSAGE, SerializationManager.getPayloadSize(message)); return false; } /** * Method locationMapChange */ public void locationMapChange() { if (rehashStorageOnChange) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( this, "Location Map Changed - Beginning Data Movement", null, null)); } // Retrieve current time to calculate remaining lifetime for items that must move double curTime = LocalNode.myTimer.getCurrentTime(); // Scan through all items in storage Iterator items = scanStorage("").iterator(); while (items.hasNext()) { StoredItem theItem = (StoredItem) items.next(); if (removeImmediately) { storageManager.remove(theItem.storageID); StoredItem.free(theItem); } put(theItem.locationID, theItem.ns, theItem.rid, theItem.iid, theItem.object, (long) (theItem.expiration - curTime), null); } if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(this, "Finished Data Movement", null, null)); } } else { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( this, "Location Map Changed - No Data Movement", null, null)); } } } /** * Method handleUDPNetwork * * @param source * @param payload */ public void handleUDPNetwork(InetSocketAddress source, Payload payload) { if (payload instanceof GetMessage) { StatCollector.addSample( StatVars.NETWORK_IN, StatVars.PROVIDER, StatVars.PROVIDER_GET, SerializationManager.getPayloadSize(payload)); processGetMessage(source, (GetMessage) payload); } if (payload instanceof GetResponseMessage) { StatCollector.addSample( StatVars.NETWORK_IN, StatVars.PROVIDER, StatVars.PROVIDER_GETRESPONSE, SerializationManager.getPayloadSize(payload)); processGetResponseMessage((GetResponseMessage) payload); } if (payload instanceof PutMessage) { StatCollector.addSample( StatVars.NETWORK_IN, StatVars.PROVIDER, StatVars.PROVIDER_PUT, SerializationManager.getPayloadSize(payload)); processPutMessage(source, (PutMessage) payload); } if (payload instanceof PutResponseMessage) { StatCollector.addSample( StatVars.NETWORK_IN, StatVars.PROVIDER, StatVars.PROVIDER_PUTRESPONSE, SerializationManager.getPayloadSize(payload)); processPutResponseMessage((PutResponseMessage) payload); } if (payload instanceof RenewMessage) { StatCollector.addSample( StatVars.NETWORK_IN, StatVars.PROVIDER, StatVars.PROVIDER_RENEW, SerializationManager.getPayloadSize(payload)); processRenewMessage(source, (RenewMessage) payload); } if (payload instanceof RenewResponseMessage) { StatCollector.addSample( StatVars.NETWORK_IN, StatVars.PROVIDER, StatVars.PROVIDER_RENEWRESPONSE, SerializationManager.getPayloadSize(payload)); processRenewResponseMessage((RenewResponseMessage) payload); } if (payload instanceof SendMessage) { routeUpCall(((SendMessage) payload).getLocationID(), payload, false); } } private String computeHighString(String ns) { String newStr = ns.concat(lastCharString); return newStr; } private BitID computeStorageID(String ns, String rid, int iid) { BitID theID = new BitID(); // Hash the ns+rid+iid... bits (idBits - 1) to 0 String commonID = ns + delimiter + rid + delimiter + iid; BitID.editHashBitID(theID, commonID, 0, idBits); return theID; } private int nsBits(String ns) { int lastSlash = ns.lastIndexOf(nsSizeDelimiter); if (lastSlash > 0) { String nsBitStr = ns.substring(lastSlash + 1); try { return Integer.parseInt(nsBitStr); } catch (Exception e) { return defaultNSBits; } } return defaultNSBits; } private BitID computeLocationID(String ns, String rid) { int nsBits = nsBits(ns); BitID theID = new BitID(); // Detemine namespace size // Hash the ns bits (idBits - 1) to (idBits - nsBits) BitID.editHashBitID(theID, ns, (idBits - nsBits), nsBits); // Hash the ns+rid... bits (idBits - nsBits - 1) to 0 String partialID; if (ns.indexOf(nsIgnoreChar) == 0) { partialID = rid; } else { partialID = ns + delimiter + rid; } BitID.editHashBitID(theID, partialID, 0, (idBits - nsBits)); return theID; } private void issueLookup(BitID locationID, Integer requestID, ProviderClient theClient, ProviderMessage theMessage) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(requestID, "Issuing Lookup", new Object[]{"d", locationID.toString()}, null)); } requestClients.put(requestID, theClient); waitingRequests.put(requestID, theMessage); locationService.lookup(locationID, applicationID, this, requestID); } private StoredItemCollection scanStorage(String prefix) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Starting LSCAN of: ", prefix})); } // Retrieve the current time to check expirations with double curTime = LocalNode.myTimer.getCurrentTime(); // Retrieve the portion of the directory that maps from ns to ns+1 (includes all sub ns'es) String highString = computeHighString(prefix); SortedMap results = storageDirectory.subMap(prefix, highString); // Iterator over the sids (bit ids) anding the actual item to an array Iterator storageIDs = results.values().iterator(); StoredItemCollection items = new StoredItemCollection(); while (storageIDs.hasNext()) { BitID itemID = (BitID) storageIDs.next(); StoredItem theItem = (StoredItem) storageManager.retrieve(itemID); if (theItem == null) { logger.warn(new StructuredLogMessage(this, "Bad Stored Item", new Object[]{"I", itemID}, null)); continue; } // Check if the item has expired, if so remove it, else add it result array if (theItem.expiration < curTime) { storageIDs.remove(); storageManager.remove(itemID); StoredItem.free(theItem); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Item ", theItem.ns, ".", theItem.rid, ".", String.valueOf( theItem.iid), " expired at ", String.valueOf( theItem.expiration)})); } } else { // Check if this item is correct, i.e. no hash collisions String itemCommonID = theItem.ns + delimiter + theItem.rid + delimiter + theItem.iid; if (itemCommonID.regionMatches(true, 0, prefix, 0, prefix.length())) { items.addItem(theItem); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Item ", theItem.ns, ".", theItem.rid, ".", String.valueOf( theItem.iid), " was added to LSCAN ", prefix})); } } else if (Output.debuggingEnabled) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -