⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 asynctcprawconnection.java

📁 High performance DB query
💻 JAVA
字号:
/* * @(#)$Id: AsyncTCPRawConnection.java,v 1.12 2004/09/15 18:05:32 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704.  Attention:  Intel License Inquiry. */package runtime.services.network.tcpraw;import java.net.InetSocketAddress;import java.net.Socket;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.LinkedList;import org.apache.log4j.Logger;import runtime.schedulers.ASyncCore;import services.Output;import services.network.tcpraw.TCPRawClient;import services.network.tcpraw.TCPRawConnection;import util.logging.LogMessage;import util.logging.StructuredLogMessage;/** * Class AsyncTCPRawConnection * */public class AsyncTCPRawConnection        implements TCPRawConnection, ASyncCore.SelectableCB {    private static Logger logger =        Logger.getLogger(AsyncTCPRawConnection.class);    private SocketChannel socketChannel;    private Socket socket;    private ASyncCore core;    private SelectionKey selectionKey;    private TCPRawClient client;    private boolean error;    private LinkedList buffer;    private boolean needToWrite;    /**     * Constructor AsyncTCPRawConnection     *     * @param socketChannel     * @param core     * @param client     */    public AsyncTCPRawConnection(SocketChannel socketChannel, ASyncCore core,                                 TCPRawClient client) {        this.socketChannel = socketChannel;        this.core = core;        this.client = client;        this.socket = socketChannel.socket();        this.error = false;        this.buffer = new LinkedList();        this.needToWrite = false;        try {            selectionKey = core.register_selectable(socketChannel, 0, this,                                                    this);        } catch (Exception e) {            disconnect(true);            logger.error(new LogMessage(new Object[]{                "Unable to register selector for channel ",                socketChannel}), e);        }        channelConnectReady();    }    /**     * Method getRemoteSocketAddress     * @return     */    public InetSocketAddress getRemoteSocketAddress() {        return (InetSocketAddress) socket.getRemoteSocketAddress();    }    /**     * Method getLocalSocketAddress     * @return     */    public InetSocketAddress getLocalSocketAddress() {        return (InetSocketAddress) socket.getLocalSocketAddress();    }    /**     * Method isClosed     * @return     */    public boolean isClosed() {        return socket.isClosed();    }    /**     * Method isConnected     * @return     */    public boolean isConnected() {        return socket.isConnected();    }    /**     * Method isConnectionPending     * @return     */    public boolean isConnectionPending() {        return ((isClosed() == false) && (isConnected() == false));    }    /**     * Method isError     * @return     */    public boolean isError() {        return error;    }    /**     * Method read     *     * @param destinationBuffer     * @return     */    public int read(ByteBuffer destinationBuffer) {        if (isConnected() && (error == false)) {            try {                int bytesRead = socketChannel.read(destinationBuffer);                if (bytesRead == -1) {                    disconnect(true);                }                return bytesRead;            } catch (Exception e) {                logger.error(new LogMessage(new Object[]{                    "Unable to read from channel ",                    socketChannel}), e);                return -1;            }        } else {            return -1;        }    }    /**     * Method write     *     * @param sourceBuffer     * @return     */    public int write(ByteBuffer sourceBuffer) {        if (error == false) {            try {                // If the buffer is empty, register the WRITE for selection                if (isConnected()) {                    if ( !needToWrite) {                        selectionKey.interestOps(SelectionKey.OP_READ                                                 | SelectionKey.OP_WRITE);                        needToWrite = true;                    }                } else {                    needToWrite = true;                }                // Add byte buffer to buffer                buffer.addLast(sourceBuffer);                if (sourceBuffer != null) {                    return sourceBuffer.limit() - sourceBuffer.position();                } else {                    return -1;                }            } catch (Exception e) {                logger.error(new LogMessage(new Object[]{                    "Unable to prepare write to channel ",                    socketChannel}), e);                return -1;            }        } else {            return -1;        }    }    /**     * Method select_cb     *     * @param key     * @param user_data     */    public void select_cb(SelectionKey key, Object user_data) {        try {            if (key.isConnectable()) {                if (Output.debuggingEnabled) {                    logger.debug(new LogMessage(new Object[]{                        "Selector on connect for channel ",                        socketChannel}));                }                channelConnectReady();                if ( !key.isValid()) {                    return;                }            }            if ((needToWrite && key.isWritable()                    && (socketChannel.isConnected()))) {                if (Output.debuggingEnabled) {                    logger.debug(new LogMessage(new Object[]{                        "Selector on write for channel ",                        socketChannel}));                }                channelWriteReady();                if ( !key.isValid()) {                    return;                }            }            if ((key.isReadable() && (socketChannel.isConnected()))) {                if (Output.debuggingEnabled) {                    logger.debug(new LogMessage(new Object[]{                        "Selector on read for channel ",                        socketChannel}));                }                channelReadReady();                if ( !key.isValid()) {                    return;                }            }        } catch (Exception e) {            logger.error(new LogMessage(new Object[]{                "Unable to process selector on channel ",                socketChannel}), e);        }    }    private void channelConnectReady() {        try {            if (socketChannel.finishConnect()) {                selectionKey.interestOps(SelectionKey.OP_READ);                if (needToWrite) {                    selectionKey.interestOps(SelectionKey.OP_READ                                             | SelectionKey.OP_WRITE);                }            } else {                selectionKey.interestOps(SelectionKey.OP_CONNECT);            }        } catch (Exception e) {            disconnect(true);            logger.error(new LogMessage(new Object[]{                "Unable to finish connection on channel ",                socketChannel}), e);            if (client != null) {                Integer eventID = StructuredLogMessage.getReference();                if (Output.debuggingEnabled) {                    logger.debug(                        new StructuredLogMessage(                            eventID, "Beginning TCPRaw Error Handler", null,                            null));                }                client.handleTCPRawNetworkError(this);                if (Output.debuggingEnabled) {                    logger.debug(                        new StructuredLogMessage(                            eventID, "Completed TCPRaw Error Handler", null,                            null));                }            }        }    }    private void channelWriteReady() {        if (error == false) {            if (isConnected()) {                // Write as many buffers to the socket as possible                while (buffer.size() > 0) {                    ByteBuffer curBuffer = (ByteBuffer) buffer.removeFirst();                    if (curBuffer != null) {                        if (curBuffer.position() != curBuffer.limit()) {                            try {                                socketChannel.write(curBuffer);                            } catch (Exception e) {                                disconnect(true);                                logger.error(new LogMessage(new Object[]{                                    "Unable to write to channel ",                                    socketChannel}), e);                                return;                            }                        }                        // This buffer did not finish, place back on buffer and break loop                        if (curBuffer.position() != curBuffer.limit()) {                            buffer.addFirst(curBuffer);                            break;                        }                    }                }                // No more data in buffer, deregister WRITE OP                if (buffer.size() == 0) {                    selectionKey.interestOps(SelectionKey.OP_READ);                    needToWrite = false;                }            }        }    }    private void channelReadReady() {        // Tell the client data is ready        try {            if ((client != null) && (error == false)) {                Integer eventID = StructuredLogMessage.getReference();                if (Output.debuggingEnabled) {                    logger.debug(                        new StructuredLogMessage(                            eventID, "Beginning TCPRaw Data Handler", null,                            null));                }                client.handleTCPRawNetwork(this);                if (Output.debuggingEnabled) {                    logger.debug(                        new StructuredLogMessage(                            eventID, "Completed TCPRaw Data Handler", null,                            null));                }            }        } catch (Exception e) {            disconnect(true);            logger.error(new LogMessage(new Object[]{                "Unable to read from channel ",                socketChannel}), e);        }    }    /**     * Method disconnect     */    public void disconnect() {        channelWriteReady();        disconnect(false);    }    /**     * Method disconnect     *     * @param errorState     */    public void disconnect(boolean errorState) {        error = errorState;        try {            if (isConnected()) {                selectionKey.interestOps(0);                core.unregister_selectable(selectionKey);                socketChannel.close();            }        } catch (Exception e) {            logger.error(new LogMessage(new Object[]{"Unable to close channel ",                                                     socketChannel}), e);        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -