📄 actualudpthread.java
字号:
/* * @(#)$Id: ActualUDPThread.java,v 1.4 2004/07/02 23:59:22 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package runtime.services.network.udp;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;import java.util.HashMap;import java.util.LinkedList;import java.util.Vector;import org.apache.log4j.Logger;import services.Output;import services.network.Payload;import services.stats.StatCollector;import services.stats.StatVars;import util.logging.LogMessage;/** * Class ActualUDPThread * */public class ActualUDPThread extends Thread implements Runnable { private static Logger logger = Logger.getLogger(ActualUDPThread.class); private ActualUDPMessenger eventProcessor; private Vector listeners; private LinkedList queuedData; private DatagramChannel sendChannel; private HashMap sendChannels; private ByteArrayOutputStream byteStream; private ObjectOutputStream objectStream; private long sleepTime; private int objectInputBufferSize; private boolean run; /** * Constructor ActualUDPThread * * @param eventProcessor * @param listeners * @param sendChannels * @param queuedData * @param sleepTime * @param objectInputBufferSize */ public ActualUDPThread(ActualUDPMessenger eventProcessor, Vector listeners, HashMap sendChannels, LinkedList queuedData, long sleepTime, int objectInputBufferSize) { this.eventProcessor = eventProcessor; this.listeners = listeners; this.queuedData = queuedData; this.sleepTime = sleepTime; this.objectInputBufferSize = objectInputBufferSize; this.run = true; try { byteStream = new ByteArrayOutputStream(); sendChannel = DatagramChannel.open(); sendChannel.configureBlocking(false); } catch (Exception exception) { throw new RuntimeException( "UDPTHREAD: Unable to setup outbound UDP channel: " + exception.getClass().getName() + ": " + exception.getMessage()); } this.sendChannels = sendChannels; sendChannels.put(sendChannel.socket().getLocalSocketAddress(), sendChannel); } private boolean processOutbound() { QueuedData data; if (queuedData.size() > 0) { data = (QueuedData) queuedData.removeFirst(); } else { return false; } try { // Convert object to bytes byteStream.reset(); objectStream = new ObjectOutputStream(byteStream); objectStream.writeObject(data.getData()); byte[] bytes = byteStream.toByteArray(); // Place data in ByteBuffer ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.position(0); // Send Data DatagramChannel theSendChannel = null; if (data.getSource() == null) { theSendChannel = sendChannel; } else { theSendChannel = (DatagramChannel) sendChannels.get(data.getSource()); if (theSendChannel == null) { theSendChannel = DatagramChannel.open(); theSendChannel.socket().bind(data.getSource()); sendChannels.put(data.getSource(), theSendChannel); } } if (theSendChannel.send(buffer, data.getDestination()) == 0) { throw new RuntimeException("Send Error"); } theSendChannel.disconnect(); StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.UDP_NETWORK, StatVars.UDP_DATA, bytes.length); if (data != null) { QueuedData.free(data); } else { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Attempted to free null data object after sending ", String.valueOf(bytes.length)})); } } } catch (Exception exception) { throw new RuntimeException( "UDPTHREAD: Unable to send on outbound UDP channel: " + exception.getClass().getName() + ": " + exception.getMessage()); } return false; } private boolean channelReceive(DatagramChannel curChannel) { ByteBuffer bytes = ByteBuffer.allocate(objectInputBufferSize); try { InetSocketAddress remoteSocket = (InetSocketAddress) curChannel.receive(bytes); if ((remoteSocket != null) && (bytes.position() != 0)) { ObjectInputStream ois = new ObjectInputStream( new ByteArrayInputStream( bytes.array())); Payload data = (Payload) ois.readObject(); eventProcessor.runEvent(curChannel.socket().getLocalPort(), remoteSocket, data); StatCollector.addSample(StatVars.NETWORK_IN, StatVars.UDP_NETWORK, StatVars.UDP_DATA, bytes.position()); return true; } } catch (Exception exception) { throw new RuntimeException("UDPTHREAD: Unable to receive: " + exception.getClass().getName() + exception.getMessage()); } return false; } private boolean processListeners() { boolean skip = false; int curIndex = 0; DatagramChannel curChannel; while (true) { // Get the next new channel synchronized (listeners) { if (curIndex < listeners.size()) { curChannel = (DatagramChannel) listeners.elementAt(curIndex); } else { return skip; } } if (channelReceive(curChannel)) { skip = true; } curIndex++; } } /** * Method run */ public void run() { while (run) { boolean skip = false; if (processOutbound()) { skip = true; } if (processListeners()) { skip = true; } if ( !skip) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) {} } } } /** * Method end */ public void end() { run = false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -