📄 bamboo.java
字号:
routerStageConfig += "location_cache_size " + locationCacheSize + "\n"; } routerStageConfig += "</initargs>\n </Router-" + dustDevilHostName + ">\n"; String routerCallbackStageConfig = "<RouterCallback>\n class bamboo.router.RouterCallbackInterface\n <initargs>\n debug_level 0\n </initargs>\n </RouterCallback>\n"; StringBuffer parameterBuffer = new StringBuffer(2000); parameterBuffer.append("<sandstorm>\n"); parameterBuffer.append(globalConfig); parameterBuffer.append("<stages>\n"); parameterBuffer.append(networkStageConfig); parameterBuffer.append(routerStageConfig); parameterBuffer.append(routerCallbackStageConfig); parameterBuffer.append("</stages>\n"); parameterBuffer.append("</sandstorm>"); String parameters = new String(parameterBuffer); Reader configFile = new StringReader(parameters); try { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(null, "Bamboo Start and Join", new Object[]{"n", String.valueOf( numGateways)}, null)); } bambooRuntime.startEnvironment(LocalNode.myIPAddress, dustDevilPort, configFile); bambooSubsystem.bind(this, dustDevilPort, LocalNode.myIPAddress, 1); } catch (Exception exception) { logger.error( new LogMessage(new Object[]{"Unable to start/bind to Bamboo"}), exception); throw new RuntimeException( "Unable to start/bind to Bamboo/DustDevil runtime"); } LocalNode.myUDPMessenger.listen(new Integer(port), this); } /** * Method leave */ public void leave() {} /** * Method registerClient * * @param client * @param applicationID * @param lookupReturnAddress */ public void registerClient(LocationServiceClient client, long applicationID, InetSocketAddress lookupReturnAddress) { localClients.add(client); Long appID = new Long(applicationID); applications.put(appID, lookupReturnAddress); applicationsCallback.put(appID, client); } /** * Method reset */ public void reset() { init(); } /** * Method setRouteMaintenance * * @param doRouteMaintenance */ public void setRouteMaintenance(boolean doRouteMaintenance) { if (doRouteMaintenance == false) { throw new RuntimeException("Can not disable Route Maintenance"); } } /** * Method handleTimeout * * @param item * @param currentTime */ public void handleTimeout(Object item, double currentTime) { Lookup message = Lookup.allocate(((Lookup) item)); timeoutManager.addTimeout(message, new Integer(message.getID()), timeout, this); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_LOOKUP, SerializationManager.getPayloadSize(message) + BAMBOO_BYTE_OVERHEAD); StatCollector.addSample(StatVars.MISC_A, StatVars.LOCATION_SERVICE, StatVars.LOOKUP_TIMEOUTS, 1); if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(message.getSearchID(), "Lookup Timeout and Resend", null, new Object[]{"p", item})); } bambooSubsystem.sendMessage(message.getSearchID(), message.getApplicationID(), false, iterative, message); } /** * Method handleUDPNetwork * * @param source * @param data */ public void handleUDPNetwork(InetSocketAddress source, Payload data) { if (data instanceof LookupResponse) { StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_LOOKUPRESPONSE, SerializationManager.getPayloadSize(data) + BAMBOO_BYTE_OVERHEAD); processLookupResponse((LookupResponse) data, source.equals(address)); } } /** * Method handleLeafsetChange */ public void handleLeafsetChange() { // after graceful leave call all global clients that leave has occured Iterator clientIterator = localClients.iterator(); while (clientIterator.hasNext()) { LocationServiceClient client = (LocationServiceClient) clientIterator.next(); client.locationMapChange(); } } /** * Method handleMessageUpCall * * @param message * @param local * @return */ public boolean handleMessageUpCall(Payload message, boolean local) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(message, "Message Received", null, new Object[]{"p", message})); } if (message instanceof Lookup) { ((Lookup) message).updateHopCount((byte) 1); StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_LOOKUPMIDWAY, SerializationManager.getPayloadSize(message) + BAMBOO_BYTE_OVERHEAD); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_LOOKUPMIDWAY, SerializationManager.getPayloadSize(message) + BAMBOO_BYTE_OVERHEAD); return false; } if (message instanceof Message) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( message, "Message Begin Processing", new Object[]{"m", String.valueOf( ((Message) message).getID()), "d", ((Message) message).getDestination()}, null)); } ((Message) message).updateHopCount((byte) 1); StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_MESSAGEMIDWAY, SerializationManager.getPayloadSize(message) + BAMBOO_BYTE_OVERHEAD); boolean processUpCalls = ((Message) message).getProvideUpCalls(); if (processUpCalls) { boolean stopSending = processMessageUpCall(((Message) message).getDestination(), (Message) message, local); if (stopSending == false) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( message, "Completed Message Processing", null, null)); } StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_MESSAGEMIDWAY, SerializationManager.getPayloadSize(message) + BAMBOO_BYTE_OVERHEAD); } return stopSending; } } return false; } /** * Method handleMessageDelivery * * @param message * @param local */ public void handleMessageDelivery(Payload message, boolean local) { if (message instanceof Lookup) { ((Lookup) message).updateHopCount((byte) 1); StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_LOOKUP, SerializationManager.getPayloadSize(message) + BAMBOO_BYTE_OVERHEAD); StatCollector.addSample(StatVars.MISC_A, StatVars.LOCATION_SERVICE, StatVars.HOP_COUNT, ((Lookup) message).getHopCount()); processLookupRequest((Lookup) message); return; } if (message instanceof Message) { ((Message) message).updateHopCount((byte) 1); StatCollector.addSample(StatVars.NETWORK_IN, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_MESSAGE, SerializationManager.getPayloadSize(message) + BAMBOO_BYTE_OVERHEAD); StatCollector.addSample(StatVars.MISC_A, StatVars.LOCATION_SERVICE, StatVars.HOP_COUNT, ((Message) message).getHopCount()); processMessageUpCall(null, (Message) message, local); return; } logger.error(new StructuredLogMessage(message, "Invalid Message Type", new Object[]{"m", message.getClass().getName()}, null)); } private boolean processMessageUpCall(BitID destination, Message message, boolean local) { LocationServiceClient client = (LocationServiceClient) applicationsCallback.get( new Long(message.getApplicationID())); if (client != null) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( message, "Executing Message Callback", null, null)); } return client.routeUpCall(destination, message.getMessage(), local); } return false; } private void processLookupRequest(Lookup message) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(message, "Responding to Lookup", null, null)); } LookupResponse response = LookupResponse.allocate( message.getID(), address, (InetSocketAddress) applications.get( new Long( message.getApplicationID()))); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.LOCATION_SERVICE, StatVars.BAMBOO_LOOKUPRESPONSE, SerializationManager.getPayloadSize(response)); LocalNode.myUDPMessenger.send(message.getSourceSocketAddress(), response); } private void processLookupResponse(LookupResponse message, boolean local) { Integer lookupID = new Integer(message.getID()); LocationServiceClient client = (LocationServiceClient) lookupRequests.remove(lookupID); Object requestID = lookupRequestsID.remove(lookupID); if (timeout > 0) { timeoutManager.removeTimeout(lookupID); } if (client != null) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( message, "Executing Lookup Callback", new Object[]{"d", lookupID, "c", client, "i", requestID}, null)); } client.lookupResult(message.getAnswer(), requestID, local); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -