📄 udpccmessenger.java
字号:
/* * @(#)$Id: UdpCCMessenger.java,v 1.24 2004/10/14 19:49:04 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package runtime.services.network.udpcc;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.net.InetAddress;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.util.HashMap;import java.util.Hashtable;import org.apache.log4j.Logger;import runtime.schedulers.ASyncCore;import services.LocalNode;import services.Output;import services.network.Payload;import services.network.udp.UDPClient;import services.network.udp.UDPwithAckClient;import services.network.udp.UDPwithAckMessenger;import services.stats.StatCollector;import services.stats.StatVars;import util.logging.LogMessage;import util.logging.StructuredLogMessage;import util.network.serialization.DataByteBuffer;import util.network.serialization.SerializationManager;/** * Class AsyncMessenger * */public class UdpCCMessenger implements UDPwithAckMessenger, UdpCC.SendCB, UdpCC.Serializer, UdpCC.Sink, ASyncCore.TimerCB { private static Logger logger = Logger.getLogger(UdpCCMessenger.class); protected Object syncObject; protected ASyncCore networkCore; protected ASyncCore eventCore; protected InetAddress myAddress; protected int timeout, maxSize, debugLevel; protected boolean internalSerialization; protected UdpCC sender; protected Hashtable sendChannels; protected ByteArrayOutputStream byteStream; protected HashMap listenersTable, listenersClients; /** * Constructor AsyncMessenger * * @param syncObject * @param networkCore * @param eventCore * @param myAddress * @param timeout * @param internalSerialization * @param maxSize * @param sendPort * @param screenStatPeriod * @param debugLevel * @param recentlySeenSize * @param timeoutFactor * @param timeoutOffset */ public UdpCCMessenger(Object syncObject, ASyncCore networkCore, ASyncCore eventCore, InetAddress myAddress, int timeout, boolean internalSerialization, int maxSize, int sendPort, int screenStatPeriod, int debugLevel, int recentlySeenSize, double timeoutFactor, double timeoutOffset) { this.syncObject = syncObject; this.networkCore = networkCore; this.eventCore = eventCore; this.myAddress = myAddress; this.timeout = timeout; this.internalSerialization = internalSerialization; this.maxSize = maxSize; this.debugLevel = debugLevel; InetSocketAddress localSocketAddress = new InetSocketAddress(myAddress, sendPort); this.sender = new UdpCC(networkCore, localSocketAddress, this, this); sender.set_debug_level(debugLevel); UdpCC.BW_STATS_PERIOD = screenStatPeriod; UdpCC.MAX_RECENTLY_SEEN_SIZE = recentlySeenSize; UdpCC.timeout_factor = timeoutFactor; UdpCC.timeout_diff = timeoutOffset; this.sendChannels = new Hashtable(); sendChannels.put(localSocketAddress, sender); this.byteStream = new ByteArrayOutputStream(); this.listenersTable = new HashMap(); this.listenersClients = new HashMap(); } /** * Method send * Only Called from EventCore * * @param destination * @param payload */ public void send(InetSocketAddress destination, Payload payload) { send(null, destination, payload, null, null); } /** * Method send * Only Called from EventCore * * @param source * @param destination * @param payload */ public void send(InetSocketAddress source, InetSocketAddress destination, Payload payload) { send(source, destination, payload, null, null); } /** * Method send * Only Called from EventCore * * @param destination * @param payload * @param client * @param ackData */ public void send(InetSocketAddress destination, Payload payload, UDPwithAckClient client, Object ackData) { send(null, destination, payload, client, ackData); } /** * Method getRoundTripLatency * Only Called from EventCore * * @param destination * @return */ public long getRoundTripLatency(InetSocketAddress destination) { return sender.latency_mean(destination); } /** * Method getRoundTripLatency * Only Called from EventCore * * @param source * @param destination * @return */ public long getRoundTripLatency(InetSocketAddress source, InetSocketAddress destination) { UdpCC sourceSender = (UdpCC) sendChannels.get(source); if (sourceSender != null) { return sourceSender.latency_mean(destination); } else { return 0; } } /** * Method send * Only Called from EventCore * * @param source * @param destination * @param payload * @param client * @param ackData */ public void send(InetSocketAddress source, InetSocketAddress destination, Payload payload, UDPwithAckClient client, Object ackData) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(payload, "Network UDP Send Start", new Object[]{"s", source, "d", destination, "c", client}, new Object[]{ "p", payload, "a", ackData})); } AckObject ackObject = null; if (client != null) { ackObject = new AckObject(client, ackData); } // Check if there is a real message if (payload == null) { logger.error( new StructuredLogMessage( payload, "Attempt to send null message", null, null)); } // Check if it is local if ((destination.getAddress().equals(LocalNode.myIPAddress)) && (listenersTable.get(new Integer(destination.getPort())) != null)) { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(payload, "Local Send", null, null)); } StatCollector.addSample( StatVars.NETWORK_OUT, StatVars.UDP_NETWORK, StatVars.UDP_DATA_INTERNAL, SerializationManager.getPayloadSize(payload)); runEvent(destination.getPort(), destination, payload); StatCollector.addSample( StatVars.NETWORK_IN, StatVars.UDP_NETWORK, StatVars.UDP_DATA_INTERNAL, SerializationManager.getPayloadSize(payload)); } else { if (Output.debuggingEnabled) { logger.debug(new StructuredLogMessage(payload, "Network Send", null, null)); } networkCore.register_timer(0, this, new SendObject(source, destination, payload, ackObject)); } } /** * Class AckObject * */ public class AckObject { public UDPwithAckClient client; public Object ackData; /** * Constructor AckObject * * @param client * @param ackData */ public AckObject(UDPwithAckClient client, Object ackData) { this.client = client; this.ackData = ackData; } } /** * Method listen * Only Called from EventCore * * @param portNumber * @param client * @return */ public boolean listen(Integer portNumber, UDPClient client) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Listening on: ", LocalNode.myIPAddress, ":", portNumber})); } InetSocketAddress listenSocket = new InetSocketAddress(myAddress, portNumber.intValue()); UdpCC listener = (UdpCC) sendChannels.get(listenSocket); if (listener == null) { listener = new UdpCC(networkCore, listenSocket, this, this); listener.set_debug_level(debugLevel); sendChannels.put(listenSocket, listener); } else { if (listenersTable.get(portNumber) != null) { return false; } } listenersTable.put(portNumber, listener); listenersClients.put(portNumber, client); return true; } /** * Method release * Only Called from EventCore * * @param portNumber */ public void release(Integer portNumber) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{"Releasing listener on:", LocalNode.myIPAddress, ":", portNumber})); } UdpCC listener = (UdpCC) listenersTable.remove(portNumber); listenersClients.remove(portNumber); sendChannels.remove(new InetSocketAddress(myAddress, portNumber.intValue())); if (listener != null) { networkCore.register_timer(0, this, (new ReleaseObject(listener))); } } /** * Method runEvent * Only Called from EventCore * * @param localPort * @param remoteIPAddress * @param data */ protected void runEvent(int localPort, InetSocketAddress remoteIPAddress, Payload data) { UDPClient client = (UDPClient) listenersClients.get(new Integer(localPort)); runEvent(client, localPort, remoteIPAddress, data); } /** * Method runEvent * Only Called from EventCore * * @param client * @param localPort * @param remoteIPAddress * @param data */ protected void runEvent(UDPClient client, int localPort, InetSocketAddress remoteIPAddress, Payload data) { // run the event if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( data, "Preparing to Execute Callback Event", new Object[]{"c", client, "l", String.valueOf(localPort), "r", remoteIPAddress}, new Object[]{"p", data})); } if (client != null) { synchronized (syncObject) { if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( data, "Executing Callback Event", null, null)); } client.handleUDPNetwork(remoteIPAddress, data); if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( data, "Callback Event Finished", null, null)); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -