⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpccfragmessenger.java

📁 High performance DB query
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * @(#)$Id: UdpCCFragMessenger.java,v 1.6 2004/10/31 23:17:06 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704.  Attention:  Intel License Inquiry. */package runtime.services.network.udpcc;import java.net.InetAddress;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Set;import org.apache.log4j.Logger;import runtime.schedulers.ASyncCore;import services.LocalNode;import services.Output;import services.network.Payload;import services.network.udp.UDPClient;import util.logging.StructuredLogMessage;/** * Class UdpCCFragMessenger * * Each message has a 12 byte header * Bytes 1-4, int, messageID * Byte 5, Fragment number * Byte 6-8, 3 least significant bytes of an int, starting byte for this fragment * Byte 9, Total number of fragments * Byte 10-12, 3 least significant bytes of an int, total number of bytes for this message */public class UdpCCFragMessenger extends UdpCCMessenger {    private static Logger logger = Logger.getLogger(UdpCCFragMessenger.class);    protected static final int FRAGMENT_OVERHEAD = 12;    protected static final int START_INFO_MASK = 0x0FFFFFF;    protected static final int START_FRAGMENT_NUM_OFFSET = 24;    protected static final int TOTAL_INFO_MASK = 0x00FFFFF;    protected static final int TOTAL_FRAGMENT_NUM_OFFSET = 24;    protected static final Integer SIGNAL_CLEANUP = new Integer(0);    protected int mtu;    protected int messageCounter;    protected HashMap partialMessages;    protected long cleanupInterval;    protected long maxTimeBetweenFragments;    /**     * Constructor UdpCCFragMessenger     *     * @param syncObject     * @param networkCore     * @param eventCore     * @param myAddress     * @param timeout     * @param internalSerialization     * @param maxSize     * @param sendPort     * @param screenStatPeriod     * @param debugLevel     * @param recentlySeenSize     * @param mtu     * @param cleanupInterval     * @param maxTimeBetweenFragments     * @param timeoutFactor     * @param timeoutOffset     */    public UdpCCFragMessenger(Object syncObject, ASyncCore networkCore,                              ASyncCore eventCore, InetAddress myAddress,                              int timeout, boolean internalSerialization,                              int maxSize, int sendPort, int screenStatPeriod,                              int debugLevel, int recentlySeenSize, int mtu,                              long cleanupInterval,                              long maxTimeBetweenFragments,                              double timeoutFactor, double timeoutOffset) {        super(syncObject, networkCore, eventCore, myAddress, timeout,              internalSerialization, maxSize, sendPort, screenStatPeriod,              debugLevel, recentlySeenSize, timeoutFactor, timeoutOffset);        this.mtu = mtu;        this.messageCounter = 0;        this.partialMessages = new HashMap();        this.cleanupInterval = cleanupInterval;        this.maxTimeBetweenFragments = maxTimeBetweenFragments;        networkCore.register_timer(cleanupInterval, this, SIGNAL_CLEANUP);    }    /**     * Method processNetworkCoreSend     *     * @param sendObject     */    public void processNetworkCoreSend(SendObject sendObject) {        Fragment[] fragments = produceFragments(sendObject.payload);        FragmentInfo callbackData = (sendObject.ackObject == null)                                    ? null                                    : new FragmentInfo(fragments.length,                                                       sendObject.ackObject);        if (sendObject.source == null) {            for (int i = 0; i < fragments.length; i++) {                sender.send(fragments[i], sendObject.destination, timeout,                            this, callbackData);            }            if (Output.debuggingEnabled) {                logger.debug(                    new StructuredLogMessage(                        sendObject.payload, "Passed to UDPCC Default Sender",                        null, null));            }        } else {            UdpCC theSender = (UdpCC) sendChannels.get(sendObject.source);            if (theSender == null) {                theSender = new UdpCC(networkCore, sendObject.source, this,                                      this);                theSender.set_debug_level(debugLevel);                sendChannels.put(sendObject.source, theSender);            }            for (int i = 0; i < fragments.length; i++) {                theSender.send(fragments[i], sendObject.destination, timeout,                               this, callbackData);            }            if (Output.debuggingEnabled) {                logger.debug(                    new StructuredLogMessage(                        sendObject.payload,                        "Passed to UDPCC Non-Default Sender", null, null));            }        }    }    /**     * Method produceFragments     *     * @param payload     * @return     */    public Fragment[] produceFragments(Object payload) {        if (Output.debuggingEnabled) {            logger.debug(new StructuredLogMessage(payload,                                                  "Beginning Fragment Creation",                                                  null, null));        }        ByteBuffer byteBuffer =            ByteBuffer.allocate(super.serialize_size(payload));        super.serialize(payload, byteBuffer);        int totalSize = byteBuffer.position();        byte numFragments = (byte) Math.ceil(((double) totalSize)                                             / ((double) (mtu                                                 - FRAGMENT_OVERHEAD)));        int messageID = messageCounter++;        Fragment[] fragments = null;        if (numFragments > 0) {            fragments = new Fragment[numFragments];            for (byte i = 0; i < numFragments; i++) {                int startByte = i * (mtu - FRAGMENT_OVERHEAD);                int endByte = ((i + 1) * (mtu - FRAGMENT_OVERHEAD) > totalSize)                              ? startByte                                + (totalSize - (i * (mtu - FRAGMENT_OVERHEAD)))                              : startByte + mtu - FRAGMENT_OVERHEAD;                fragments[i] = new Fragment(byteBuffer, startByte,                                            endByte - startByte, totalSize, i,                                            numFragments, messageID);            }        }        if (Output.debuggingEnabled) {            logger.debug(new StructuredLogMessage(payload,                                                  "Completed Fragment Creation",                                                  new Object[]{"z",                                                               String.valueOf(                                                               totalSize),                                                               "n",                                                               String.valueOf(                                                               numFragments),                                                               "m",                                                               String.valueOf(                                                               messageID)}, null));        }        return fragments;    }    /**     * Method serialize     *     * @param msg     * @param buf     */    public void serialize(Object msg, ByteBuffer buf) {        Integer bufferID = StructuredLogMessage.getReference();        if (msg instanceof Fragment) {            Fragment fragment = (Fragment) msg;            if (Output.debuggingEnabled) {                logger.debug(                    new StructuredLogMessage(                        bufferID, "Beginning Fragment Serialization",                        new Object[]{"m",                                     String.valueOf(fragment.messageID),                                     "n",                                     String.valueOf(fragment.fragmentNum),                                     "s",                                     String.valueOf(fragment.start),                                     "z",                                     String.valueOf(                                         fragment.fragmentLength)}, null));            }            try {                buf.putInt(fragment.messageID);                int startInfo = ((fragment.start & START_INFO_MASK)                                 | (fragment.fragmentNum                                    << START_FRAGMENT_NUM_OFFSET));                int totalInfo = ((fragment.totalLength & TOTAL_INFO_MASK)                                 | (fragment.totalFragments                                    << TOTAL_FRAGMENT_NUM_OFFSET));                buf.putInt(startInfo);                buf.putInt(totalInfo);                buf.put(fragment.buffer.array(), fragment.start,                        fragment.fragmentLength);                buf.limit(buf.position());            } catch (Exception exception) {                logger.error(                    new StructuredLogMessage(                        msg, "Fragment Serialization Error", null,                        null), exception);            }        } else {            logger.error(                new StructuredLogMessage(                    bufferID, "Can not serialize non Fragment!",                    new Object[]{"t",                                 msg.getClass().getName()}, null));        }        if (Output.debuggingEnabled) {            logger.debug(                new StructuredLogMessage(                    bufferID, "Completed Fragment Serialization",                    new Object[]{"z",                                 String.valueOf(buf.limit())}, null));        }    }    /**     * Method cb     *     * @param user_data     * @param success     */    public void cb(Object user_data, boolean success) {        if (user_data != null) {            if (Output.debuggingEnabled) {                logger.debug(                    new StructuredLogMessage(                        user_data, "UDPCC Fragment Status Callback",                        new Object[]{"s",                                     String.valueOf(success)}, new Object[]{"u",                                                                            user_data}));            }            FragmentInfo fragmentInfo = (FragmentInfo) user_data;            if (fragmentInfo.ackObject != null) {                if (success) {                    fragmentInfo.posAckedFragments++;                } else {                    fragmentInfo.negAckedFragments++;                }                if (success == false) {                    eventCore.register_timer(                        0, this,                        (new AckCallbackObject(fragmentInfo.ackObject, false)));                } else {                    if (fragmentInfo.posAckedFragments                            == fragmentInfo.numFragments) {                        eventCore.register_timer(                            0, this,                            (new AckCallbackObject(                                fragmentInfo.ackObject, true)));                    }                }            }        }    }    /**     * Method deserialize     * Only Called from NetworkCore     *     * @param buf     * @return     * @throws Exception     */    public Object deserialize(ByteBuffer buf) throws Exception {        Integer bufferID = StructuredLogMessage.getReference();        int size = buf.limit() - buf.position();        int messageID = 0;        int startInfo = 0;        int totalInfo = 0;        int startByte = 0;        int totalBytes = 0;        byte fragmentNum = 0;        byte totalFragments = 0;        Fragment fragment = null;        if (Output.debuggingEnabled) {            logger.debug(                new StructuredLogMessage(                    bufferID, "Beginning Fragment Deserialization",                    new Object[]{"z",                                 String.valueOf(size)}, null));        }        try {            messageID = buf.getInt();            startInfo = buf.getInt();            totalInfo = buf.getInt();            startByte = (startInfo & START_INFO_MASK);            fragmentNum = (byte) (startInfo >> START_FRAGMENT_NUM_OFFSET);            totalBytes = (totalInfo & TOTAL_INFO_MASK);            totalFragments = (byte) (totalInfo >> TOTAL_FRAGMENT_NUM_OFFSET);            fragment = new Fragment(buf, startByte, size - FRAGMENT_OVERHEAD,                                    totalBytes, fragmentNum, totalFragments,                                    messageID);        } catch (Exception exception) {            logger.error(                new StructuredLogMessage(                    fragment, "Fragment Deserialization Error", null,                    null), exception);        }        if (Output.debuggingEnabled) {            logger.debug(                new StructuredLogMessage(                    bufferID, "Completed Fragment Deserialization",                    new Object[]{"m",                                 String.valueOf(messageID),                                 "n",                                 String.valueOf(fragmentNum)}, null));        }        return fragment;    }    /**     * Method recv

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -