⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 actualudpthread.java

📁 High performance DB query
💻 JAVA
字号:
/* * @(#)$Id: ActualUDPThread.java,v 1.4 2004/07/02 23:59:22 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704.  Attention:  Intel License Inquiry. */package runtime.services.network.udp;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;import java.util.HashMap;import java.util.LinkedList;import java.util.Vector;import org.apache.log4j.Logger;import services.Output;import services.network.Payload;import services.stats.StatCollector;import services.stats.StatVars;import util.logging.LogMessage;/** * Class ActualUDPThread * */public class ActualUDPThread extends Thread implements Runnable {    private static Logger logger = Logger.getLogger(ActualUDPThread.class);    private ActualUDPMessenger eventProcessor;    private Vector listeners;    private LinkedList queuedData;    private DatagramChannel sendChannel;    private HashMap sendChannels;    private ByteArrayOutputStream byteStream;    private ObjectOutputStream objectStream;    private long sleepTime;    private int objectInputBufferSize;    private boolean run;    /**     * Constructor ActualUDPThread     *     * @param eventProcessor     * @param listeners     * @param sendChannels     * @param queuedData     * @param sleepTime     * @param objectInputBufferSize     */    public ActualUDPThread(ActualUDPMessenger eventProcessor, Vector listeners,                           HashMap sendChannels, LinkedList queuedData,                           long sleepTime, int objectInputBufferSize) {        this.eventProcessor = eventProcessor;        this.listeners = listeners;        this.queuedData = queuedData;        this.sleepTime = sleepTime;        this.objectInputBufferSize = objectInputBufferSize;        this.run = true;        try {            byteStream = new ByteArrayOutputStream();            sendChannel = DatagramChannel.open();            sendChannel.configureBlocking(false);        } catch (Exception exception) {            throw new RuntimeException(                "UDPTHREAD: Unable to setup outbound UDP channel: "                + exception.getClass().getName() + ": "                + exception.getMessage());        }        this.sendChannels = sendChannels;        sendChannels.put(sendChannel.socket().getLocalSocketAddress(),                         sendChannel);    }    private boolean processOutbound() {        QueuedData data;        if (queuedData.size() > 0) {            data = (QueuedData) queuedData.removeFirst();        } else {            return false;        }        try {            // Convert object to bytes            byteStream.reset();            objectStream = new ObjectOutputStream(byteStream);            objectStream.writeObject(data.getData());            byte[] bytes = byteStream.toByteArray();            // Place data in ByteBuffer            ByteBuffer buffer = ByteBuffer.allocate(bytes.length);            buffer.put(bytes);            buffer.position(0);            // Send Data            DatagramChannel theSendChannel = null;            if (data.getSource() == null) {                theSendChannel = sendChannel;            } else {                theSendChannel =                    (DatagramChannel) sendChannels.get(data.getSource());                if (theSendChannel == null) {                    theSendChannel = DatagramChannel.open();                    theSendChannel.socket().bind(data.getSource());                    sendChannels.put(data.getSource(), theSendChannel);                }            }            if (theSendChannel.send(buffer, data.getDestination()) == 0) {                throw new RuntimeException("Send Error");            }            theSendChannel.disconnect();            StatCollector.addSample(StatVars.NETWORK_OUT, StatVars.UDP_NETWORK,                                    StatVars.UDP_DATA, bytes.length);            if (data != null) {                QueuedData.free(data);            } else {                if (Output.debuggingEnabled) {                    logger.debug(new LogMessage(new Object[]{                        "Attempted to free null data object after sending ",                        String.valueOf(bytes.length)}));                }            }        } catch (Exception exception) {            throw new RuntimeException(                "UDPTHREAD: Unable to send on outbound UDP channel: "                + exception.getClass().getName() + ": "                + exception.getMessage());        }        return false;    }    private boolean channelReceive(DatagramChannel curChannel) {        ByteBuffer bytes = ByteBuffer.allocate(objectInputBufferSize);        try {            InetSocketAddress remoteSocket =                (InetSocketAddress) curChannel.receive(bytes);            if ((remoteSocket != null) && (bytes.position() != 0)) {                ObjectInputStream ois = new ObjectInputStream(                                            new ByteArrayInputStream(                                                bytes.array()));                Payload data = (Payload) ois.readObject();                eventProcessor.runEvent(curChannel.socket().getLocalPort(),                                        remoteSocket, data);                StatCollector.addSample(StatVars.NETWORK_IN,                                        StatVars.UDP_NETWORK,                                        StatVars.UDP_DATA, bytes.position());                return true;            }        } catch (Exception exception) {            throw new RuntimeException("UDPTHREAD: Unable to receive: "                                       + exception.getClass().getName()                                       + exception.getMessage());        }        return false;    }    private boolean processListeners() {        boolean skip = false;        int curIndex = 0;        DatagramChannel curChannel;        while (true) {            // Get the next new channel            synchronized (listeners) {                if (curIndex < listeners.size()) {                    curChannel =                        (DatagramChannel) listeners.elementAt(curIndex);                } else {                    return skip;                }            }            if (channelReceive(curChannel)) {                skip = true;            }            curIndex++;        }    }    /**     * Method run     */    public void run() {        while (run) {            boolean skip = false;            if (processOutbound()) {                skip = true;            }            if (processListeners()) {                skip = true;            }            if ( !skip) {                try {                    Thread.sleep(sleepTime);                } catch (InterruptedException e) {}            }        }    }    /**     * Method end     */    public void end() {        run = false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -