📄 basicprovider.java
字号:
* @param iid * @param item * @param lifetime * @param provideUpCalls * @param sendLocal */ public void send(InetSocketAddress location, BitID locationID, String ns, String rid, int iid, Payload item, long lifetime, boolean provideUpCalls, boolean sendLocal) { Integer requestID = new Integer(messageID++); SendMessage message = SendMessage.allocate(requestID, ns, rid, locationID, iid, lifetime, sendLocal, item); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(message, "Send Message Request", new Object[] { "h", location, "d", locationID, "n", ns, "r", rid, "i", String.valueOf(iid), "l", String.valueOf(lifetime), "m", requestID, "u", String.valueOf(provideUpCalls), "e", String.valueOf(sendLocal), "t", Integer.toHexString(item.hashCode()) }, new Object[]{"p", item})); } boolean stopSending = false; if (provideUpCalls && sendLocal) { stopSending = routeUpCall(locationID, message, true); } if ( !stopSending) { if (location == null) { locationService.send(locationID, applicationID, message, provideUpCalls); StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.PROVIDER, StatVars.PROVIDER_SENDMESSAGE, SerializationManager.getPayloadSize(message)); } else { sendMessage(location, message, StatVars.PROVIDER_SENDMESSAGE); } } } /** * Method renew * * @param ns * @param rid * @param iid * @param lifetime * @param client */ public void renew(String ns, String rid, int iid, long lifetime, ProviderClient client) { BitID locationID = computeLocationID(ns, rid); renew(null, locationID, ns, rid, iid, lifetime, client); } /** * Method renew * * @param locationNS * @param locationRID * @param ns * @param rid * @param iid * @param lifetime * @param client */ public void renew(String locationNS, String locationRID, String ns, String rid, int iid, long lifetime, ProviderClient client) { BitID locationID = computeLocationID(locationNS, locationRID); renew(null, locationID, ns, rid, iid, lifetime, client); } /** * Method renew * * @param locationID * @param ns * @param rid * @param iid * @param lifetime * @param client */ public void renew(BitID locationID, String ns, String rid, int iid, long lifetime, ProviderClient client) { renew(null, locationID, ns, rid, iid, lifetime, client); } /** * Method renew * * * * @param location * @param locationID * @param ns * @param rid * @param iid * @param lifetime * @param client */ public void renew(InetSocketAddress location, BitID locationID, String ns, String rid, int iid, long lifetime, ProviderClient client) { Integer requestID = new Integer(messageID++); RenewMessage theMessage = RenewMessage.allocate(requestID, ns, rid, locationID, iid, lifetime); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(theMessage, "Renew Request", new Object[] { "h", location, "d", locationID, "n", ns, "r", rid, "i", String.valueOf(iid), "l", String.valueOf(lifetime), "m", requestID, "c", client }, null)); } if (location == null) { issueLookup(locationID, requestID, client, theMessage); } else { sendMessage(location, theMessage, StatVars.PROVIDER_RENEW); } } /** * Method lscan * * @param ns * @param client */ public void lscan(String ns, ProviderClient client) { DataRequest request = new DataRequest(ns, client, DataRequest.ACTION_LSCAN); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(request, "LScan Request", new Object[]{"n", ns, "c", client}, null)); } LocalNode.myTimer.schedule(0, request, this); } private void addToTreeMap(String ns, ProviderClient client, boolean request, TreeMap map) { Object previousData = map.get(ns); // Check if inserting request or removing if (request) { // No previous requests for ns, create new array and insert if (previousData == null) { ArrayList newList = new ArrayList(); newList.add(client); map.put(ns, newList); } // Multiple previous request, add to existing array if (previousData instanceof ArrayList) { ((ArrayList) previousData).add(client); } } else if (previousData != null) { ((ArrayList) previousData).remove(client); // If array is empty, remove if (((ArrayList) previousData).size() == 0) { map.remove(ns); } } } /** * Method newData * * @param ns * @param client * @param request */ public void newData(String ns, ProviderClient client, boolean request) { if (request) { DataRequest dataRequest = new DataRequest(ns, client, DataRequest.ACTION_NEWDATA); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(dataRequest, "NewData Request", new Object[]{"n", ns, "c", client}, null)); } LocalNode.myTimer.schedule(0, dataRequest, this); } else { addToTreeMap(ns, client, false, newDataCallbacks); } } /** * Method message * * @param ns * @param client * @param includeUpCalls * @param request */ public void message(String ns, ProviderClient client, boolean includeUpCalls, boolean request) { if (includeUpCalls) { if (request) { DataRequest dataRequest = new DataRequest(ns, client, DataRequest.ACTION_UPCALLS); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(dataRequest, "UpCalls Request", new Object[]{"n", ns, "c", client}, null)); } LocalNode.myTimer.schedule(0, dataRequest, this); } else { addToTreeMap(ns, client, false, messageUpCallCallbacks); } } if (request) { DataRequest dataRequest = new DataRequest(ns, client, DataRequest.ACTION_MESSAGES); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(dataRequest, "MessageS Request", new Object[]{"n", ns, "c", client}, null)); } LocalNode.myTimer.schedule(0, dataRequest, this); } else { addToTreeMap(ns, client, false, messageCallbacks); } } /** * Method getNodeID * @return */ public BitID getNodeID() { return locationService.getLocationID(); } /** * Method getNodeSocketAddress * @return */ public InetSocketAddress getNodeSocketAddress() { return listeningSocketAddress; } /** * Method getNumBits * @return */ public int getNumBits() { return idBits; } /** * Method processLscanRequest * * @param request */ protected void processLscanRequest(DataRequest request) { String ns = request.ns; ProviderClient client = request.client; Iterator items = scanStorage(ns).iterator(); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(request, "Processing LScan Request", new Object[]{"n", ns, "c", client}, null)); } while (items.hasNext()) { StoredItem item = (StoredItem) items.next(); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(request, "Processing LScan Tuple", new Object[]{"n", item.ns, "r", item.rid}, new Object[]{ "p", item.object})); } client.lscanResult(item.ns, item.rid, item.object); } client.lscanResult(null, null, null); } /** * Method processNewDataRequest * * @param request */ protected void processNewDataRequest(DataRequest request) { addToTreeMap(request.ns, request.client, true, newDataCallbacks); } /** * Method processUpcallsRequest * * @param request */ protected void processUpcallsRequest(DataRequest request) { addToTreeMap(request.ns, request.client, true, messageUpCallCallbacks); } /** * Method processMessagesRequest * * @param request */ protected void processMessagesRequest(DataRequest request) { addToTreeMap(request.ns, request.client, true, messageCallbacks); } /** * Method lookupResult * * @param responsibleNode * @param requestID * @param local */ public void lookupResult(InetSocketAddress responsibleNode, Object requestID, boolean local) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(requestID, "Lookup Returned", new Object[]{"m", requestID, "s", responsibleNode}, null)); } Object request = waitingRequests.get(requestID); // If the request was not found, do not process if (request == null) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(requestID, "Unknown Lookup", new Object[]{"m", requestID, "s", responsibleNode}, null)); } return; } // Check what type of request and process if (request instanceof GetMessage) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(request, "Sending Get Message", new Object[]{"m", requestID, "d",
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -