📄 dustdevilemulatorimpl.ucjava
字号:
/* * @(#)$Id: DustDevilEmulatorImpl.ucjava,v 1.9 2005/07/19 00:00:34 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package overlay.location.bamboo;import seda.sandStorm.api.EventHandlerIF;import seda.sandStorm.api.QueueElementIF;import seda.sandStorm.api.EventHandlerException;import seda.sandStorm.api.ConfigDataIF;import seda.sandStorm.api.SinkIF;import seda.sandStorm.api.SinkException;import java.net.InetSocketAddress;import java.net.InetAddress;import java.util.HashMap;import java.util.Date;import java.io.Reader;import bamboo.lss.DustDevil;import ostore.dispatch.Classifier;import ostore.network.NetworkMessage;import ostore.network.NetworkMessageResult;import ostore.network.NetworkLatencyReq;import ostore.network.NetworkLatencyResp;import ostore.util.NodeId;import services.LocalNode;import services.timer.TimerClient;import services.network.Payload;import services.network.udp.UDPwithAckMessenger;import services.network.udp.UDPwithAckClient;import services.network.udp.UDPClient;import util.network.serialization.SerializationManager;/** * Class DustDevilEmulatorImpl * */public class DustDevilEmulatorImpl extends DustDevil implements DustDevilEmulator { private Classifier myClassifier; private InetSocketAddress myAddress; private Integer myPort; /** * Method create_network * * @param addr * @return */ public EventHandlerIF create_network(InetSocketAddress addr) { try { ostore.util.TypeTable.register_type( "overlay.location.bamboo.NetworkMessageEmulator"); } catch (Exception e) { throw new RuntimeException( "Unable to setup DustDevil Runtime network layer"); } NetworkEmulator theNetEmulator = new NetworkEmulator(); LocalNode.myUDPMessenger.listen(myPort, theNetEmulator); return theNetEmulator; } /** * Method create_timer_cb * * @param mtClassifier * @return */ public Classifier.TimerCB create_timer_cb(Classifier mtClassifier) { return new TimerEmulator(); } /** * Constructor DustDevilEmulatorImpl */ public DustDevilEmulatorImpl() { SerializationManager.registerClass( "overlay.location.bamboo.PayloadEmulator"); ostore.util.Clock.set_cb(new ClockEmulator()); set_acore_instance(new ASyncCoreEmulator()); } /** * Method startEnvironment * * @param address * @param port * @param parameters * @throws Exception */ public void startEnvironment( InetAddress address, int port, Reader parameters) throws Exception { NodeId myNodeID = new NodeId(port, address); myPort = new Integer(port); myAddress = new InetSocketAddress(address, port); myClassifier = Classifier.getClassifier(myNodeID); myClassifier.set_timer_cb(create_timer_cb(myClassifier)); main(parameters); } /** * Class ClockEmulator * */ public class ClockEmulator implements ostore.util.Clock.ClockCB { /** * Method date * * @param nodeId * @return */ public Date date(NodeId nodeId) { return new Date((long) LocalNode.myTimer.getCurrentTime() * 1000); } } /** * Class TimerEmulator * */ public class TimerEmulator implements Classifier.TimerCB, TimerClient { private HashMap canceledTimers; /** * Constructor TimerEmulator */ public TimerEmulator() { this.canceledTimers = new HashMap(); } /** * Method schedule * * @param millis * @param event * @return */ public Object schedule(long millis, QueueElementIF event) { LocalNode.myTimer.schedule((((double) millis) / 1000), event, this); return event; } /** * Method cancel * * @param token */ public void cancel(Object token) { canceledTimers.put(token, token); } /** * Method handleClock * * @param clockData */ public void handleClock(Object clockData) { Object item = canceledTimers.remove(clockData); if (item == null) { try { myClassifier.dispatch((QueueElementIF) clockData); } catch (Exception exception) { throw new RuntimeException("Dispatch clock message failed: " + exception.getClass() + exception.getMessage()); } } } } /** * Class NetworkEmulator * */ public class NetworkEmulator implements EventHandlerIF, UDPwithAckClient, UDPClient { private int counter; private HashMap ackData; /** * Constructor NetworkEmulator */ public NetworkEmulator() { this.counter = 0; this.ackData = new HashMap(); } /** * Method handleEvent * * @param queueElementIF * @throws EventHandlerException */ public void handleEvent(QueueElementIF queueElementIF) throws EventHandlerException { if (queueElementIF instanceof NetworkMessage) { NetworkMessage msg = (NetworkMessage) queueElementIF; PayloadEmulator message = new PayloadEmulator(msg); InetSocketAddress dst = new InetSocketAddress(msg.peer.address(), msg.peer.port()); if (msg.comp_q != null) { Object compData[] = new Object[]{msg.comp_q, msg.user_data}; Integer ackKey = new Integer(counter++); ackData.put(ackKey, compData); if (LocalNode.myUDPMessenger instanceof UDPwithAckMessenger) { ((UDPwithAckMessenger) LocalNode.myUDPMessenger).send( myAddress, dst, message, this, ackKey); } else { throw new RuntimeException( "Unable to emulate network, layer does not support acks"); } } else { LocalNode.myUDPMessenger.send(myAddress, dst, message); } } else { NetworkLatencyReq req = (NetworkLatencyReq) queueElementIF; long lat = ((UDPwithAckMessenger) LocalNode.myUDPMessenger).getRoundTripLatency( myAddress, new InetSocketAddress( req.node_id.address(), req.node_id.port())); try { req.comp_q.enqueue(new NetworkLatencyResp(true, lat, req.user_data)); } catch (SinkException e) {} } } /** * Method handleEvents * * @param queueElementIFs * @throws EventHandlerException */ public void handleEvents(QueueElementIF[] queueElementIFs) throws EventHandlerException { if (queueElementIFs != null) { for (int i = 0; i < queueElementIFs.length; i++) { handleEvent(queueElementIFs[i]); } } } /** * Method init * * @param configDataIF * @throws Exception */ public void init(ConfigDataIF configDataIF) throws Exception { ostore.dispatch.Filter theFilter = new ostore.dispatch.Filter(); Class theClass = Class.forName("ostore.network.NetworkMessage"); theFilter.requireType(theClass); java.lang.reflect.Field inb = theClass.getField("inbound"); theFilter.requireValue(inb, new Boolean(false)); SinkIF mySink = configDataIF.getStage().getSink(); myClassifier.subscribe(theFilter, mySink); theFilter = new ostore.dispatch.Filter(); theClass = Class.forName("ostore.network.NetworkLatencyReq"); theFilter.requireType(theClass); myClassifier.subscribe(theFilter, mySink); } /** * Method destroy * @throws Exception */ public void destroy() throws Exception {} /** * Method handleUDPNetwork * * @param source * @param data */ public void handleUDPNetwork(InetSocketAddress source, Payload data) { PayloadEmulator message = (PayloadEmulator) data; try { NetworkMessage theMessage = message.getItem(); theMessage.inbound = true; theMessage.peer = new NodeId(source.getPort(), source.getAddress()); myClassifier.dispatch(theMessage); } catch (Exception exception) { throw new RuntimeException("Dispatch network message failed: " + exception.getClass() + exception.getMessage(), exception); } } /** * Method handleUDPNetworkAck * * @param data * @param success */ public void handleUDPNetworkAck(Object data, boolean success) { Object compData = ackData.remove(data); if (compData != null) { SinkIF comp_q = (SinkIF) ((Object[]) compData)[0]; NetworkMessageResult msg = new NetworkMessageResult(((Object[]) compData)[1], success); try { comp_q.enqueue(msg); } catch (Exception exception) { throw new RuntimeException( "Dispatch network ack message failed: " + exception.getClass() + exception.getMessage()); } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -