📄 basicprovider.java
字号:
logger.error(new LogMessage(new Object[]{ "Hash collision on item ", theItem.ns, ".", theItem.rid, ".", String.valueOf(theItem.iid), ", item was NOT added to LSCAN ", prefix})); } } } // Pass back an iterator to the array return items; } private void cleanupStorage() { // lscan automatically cleans up old entries in search range, so a complete lscan would clean up all old enties scanStorage(""); LocalNode.myTimer.schedule((cleanupPeriod / 1000), SIGNAL_CLEANUP, this); } private void checkForNewDataCallbacks(StoredItem theItem, boolean renew) { // Retrieve part of the call back tree ArrayList clientArrays = new ArrayList(); String commonID = theItem.ns + delimiter + theItem.rid; Iterator entries = newDataCallbacks.headMap(commonID).entrySet().iterator(); while (entries.hasNext()) { // Check if the client's request actually matches the new item Map.Entry theEntry = (Map.Entry) entries.next(); if (commonID.startsWith((String) theEntry.getKey())) { clientArrays.add(theEntry.getValue()); } } for (int i = 0; i < clientArrays.size(); i++) { // Call the client ArrayList clients = (ArrayList) clientArrays.get(i); for (int j = 0; j < clients.size(); j++) { ((ProviderClient) clients.get(j)).newDataResult(theItem.ns, theItem.rid, theItem.object, renew); } } } private String addItemToStorage(String ns, String rid, int iid, Payload item, long lifetime, BitID locationID) { // Create storage container and insert into storage double expiration = LocalNode.myTimer.getCurrentTime() + lifetime; BitID storageID = computeStorageID(ns, rid, iid); StoredItem theItem = StoredItem.allocate(ns, rid, iid, item, expiration, locationID, storageID); storageManager.store(storageID, theItem); // Add entry to the item directory String commonID = ns + delimiter + rid + delimiter + iid; storageDirectory.put(commonID, storageID); // Check for newData requests checkForNewDataCallbacks(theItem, false); return commonID; } private void sendMessage(InetSocketAddress responsibleNode, ProviderMessage theMessage, int statgroup) { StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.PROVIDER, statgroup, SerializationManager.getPayloadSize(theMessage)); LocalNode.myUDPMessenger.send(listeningSocketAddress, responsibleNode, theMessage); } private void processGetMessage(InetSocketAddress remoteAddr, GetMessage theMessage) { GetResponseMessage resultMessage; String requestItemID = theMessage.getNS() + delimiter + theMessage.getRID() + delimiter; StoredItemCollection results = scanStorage(requestItemID); if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Item located ", requestItemID, " from node ", remoteAddr})); } resultMessage = GetResponseMessage.allocate(theMessage.getID(), theMessage.getNS(), theMessage.getRID(), theMessage.getLocationID(), true, results); if (remoteAddr != null) { sendMessage(remoteAddr, resultMessage, StatVars.PROVIDER_GETRESPONSE); } else { processGetResponseMessage(resultMessage); } } private void processGetResponseMessage(GetResponseMessage theMessage) { GetMessage theRequest = (GetMessage) waitingRequests.remove(theMessage.getID()); if (theRequest != null) { ProviderClient client = (ProviderClient) requestClients.remove(theMessage.getID()); if (theMessage.getResult()) { // Get was processed, return results to client StoredItemCollection collection = (StoredItemCollection) theMessage.getItem(); Payload items[] = new Payload[collection.size()]; for (int i = 0; i < collection.size(); i++) { items[i] = collection.getItem(i).object; } client.getResult(theMessage.getNS(), theMessage.getRID(), items); } else { // Error, resubmit get(theMessage.getNS(), theMessage.getRID(), client); } GetMessage.free(theRequest); } GetResponseMessage.free(theMessage); } private void processPutMessage(InetSocketAddress remoteAddr, PutMessage theMessage) { // Check if the message is really for this node else send failure response BitID locationID = theMessage.getLocationID(); // should be computed on demand, but for efficiency stored // Send success response if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(theMessage, "Processed Put", new Object[] { "n", theMessage.getNS(), "r", theMessage.getRID(), "i", String.valueOf(theMessage.getIID()), "p", Integer.toHexString(theMessage.getItem().hashCode()), "l", String.valueOf(theMessage.getLifetime()), "d", locationID, "s", remoteAddr, "m", theMessage.getID(), }, new Object[]{"c", theMessage.getItem()})); } addItemToStorage(theMessage.getNS(), theMessage.getRID(), theMessage.getIID(), theMessage.getItem(), theMessage.getLifetime(), locationID); PutResponseMessage resultMessage = PutResponseMessage.allocate(theMessage.getID(), theMessage.getNS(), theMessage.getRID(), locationID, theMessage.getIID(), true); if (remoteAddr != null) { sendMessage(remoteAddr, resultMessage, StatVars.PROVIDER_PUTRESPONSE); } else { processPutResponseMessage(resultMessage); } } private void processPutResponseMessage(PutResponseMessage theMessage) { PutMessage theRequest = (PutMessage) waitingRequests.remove(theMessage.getID()); if (theRequest != null) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Recevied PutResponse:", theMessage.getID(), " NS: ", theRequest.getNS(), " RID: ", theRequest.getRID()})); } ProviderClient client = (ProviderClient) requestClients.remove(theMessage.getID()); if ( !(theMessage.getResult())) { // Error, resubmit put(theRequest.getNS(), theRequest.getRID(), theRequest.getIID(), theRequest.getItem(), theRequest.getLifetime(), client); } else { // May be null if due to locationMapChange if (client != null) { client.putResult(theRequest.getNS(), theRequest.getRID(), theRequest.getIID(), true); } PutMessage.free(theRequest); } } PutResponseMessage.free(theMessage); } private void processRenewMessage(InetSocketAddress remoteAddr, RenewMessage theMessage) { // Check if the message is really for this node else send failure response BitID locationID = theMessage.getLocationID(); // should be computed on demand, but for efficiency stored // Find existing item BitID storageID = computeStorageID(theMessage.getNS(), theMessage.getRID(), theMessage.getIID()); StoredItem theItem = (StoredItem) storageManager.retrieve(storageID); RenewResponseMessage resultMessage; if (theItem != null) { // Update expiration theItem.expiration = LocalNode.myTimer.getCurrentTime() + theMessage.getLifetime(); // Check for newData requests checkForNewDataCallbacks(theItem, true); // Send success response resultMessage = RenewResponseMessage.allocate(theMessage.getID(), theMessage.getNS(), theMessage.getRID(), locationID, theMessage.getIID(), true); } else { // Item was not found, send failure response resultMessage = RenewResponseMessage.allocate(theMessage.getID(), theMessage.getNS(), theMessage.getRID(), locationID, theMessage.getIID(), false); } // Send the message or process locally if appropriate if (remoteAddr != null) { sendMessage(remoteAddr, resultMessage, StatVars.PROVIDER_RENEWRESPONSE); } else { processRenewResponseMessage(resultMessage); } } private void processRenewResponseMessage(RenewResponseMessage theMessage) { RenewMessage theRequest = (RenewMessage) waitingRequests.remove(theMessage.getID()); if (theRequest != null) { ProviderClient client = (ProviderClient) requestClients.remove(theMessage.getID()); if (theMessage.getResult()) { client.renewalResult(theMessage.getNS(), theMessage.getRID(), theMessage.getIID(), true); } else { client.renewalResult(theMessage.getNS(), theMessage.getRID(), theMessage.getIID(), false); } RenewMessage.free(theRequest); } RenewResponseMessage.free(theMessage); } /** Removes all data and state */ public void reset() { // reinitialize data structures requestClients = new HashMap(); waitingRequests = new HashMap(); newDataCallbacks = new TreeMap(); messageCallbacks = new TreeMap(); messageUpCallCallbacks = new TreeMap(); storageDirectory = new TreeMap(); // reregister for location map changes locationService.registerClient(this, applicationID, listeningSocketAddress); } /** * Method guid_to_string * * @param i * @return */ public static String guid_to_string(BigInteger i) { // Print only the high-order 8 hexedecimal digits. String result = i.toString(16); while (result.length() < 40) { result = "0" + result; } result = result.substring(0, 8); return result; } /** * Method handleClock * * @param clockData */ public void handleClock(Object clockData) { if (clockData instanceof DataRequest) { DataRequest request = (DataRequest) clockData; if (request.action == DataRequest.ACTION_LSCAN) { processLscanRequest((DataRequest) clockData); } if (request.action == DataRequest.ACTION_NEWDATA) { processNewDataRequest((DataRequest) clockData); } if (request.action == DataRequest.ACTION_UPCALLS) { processUpcallsRequest((DataRequest) clockData); } if (request.action == DataRequest.ACTION_MESSAGES) { processMessagesRequest((DataRequest) clockData); } } if (clockData == SIGNAL_CLEANUP) { cleanupStorage(); } } private class DataRequest { private static final int ACTION_LSCAN = 0; private static final int ACTION_NEWDATA = 1; private static final int ACTION_UPCALLS = 2; private static final int ACTION_MESSAGES = 3; public String ns; public ProviderClient client; public int action; /** * Constructor DataRequest * * @param ns * @param client * @param action */ public DataRequest(String ns, ProviderClient client, int action) { this.ns = ns; this.client = client; this.action = action; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -