⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 actualtcpthread.java

📁 High performance DB query
💻 JAVA
字号:
/* * @(#)$Id: ActualTCPThread.java,v 1.3 2004/07/02 23:59:22 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704.  Attention:  Intel License Inquiry. */package runtime.services.network.tcp;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Hashtable;import java.util.Vector;import services.network.tcp.TCPClient;/** * Class ActualTCPThread * */public class ActualTCPThread extends Thread implements Runnable {    private ActualTCPMessenger eventProcessor;    private Hashtable listenersClients;    private Vector listeners;    private Vector newConnections;    private Vector existingConnections;    private long sleepTime;    private int objectInputBufferSize;    private boolean run;    /**     * Constructor ActualTCPThread     *     * @param eventProcessor     * @param listeners     * @param listenersClients     * @param newConnections     * @param existingConnections     * @param sleepTime     * @param objectInputBufferSize     */    public ActualTCPThread(ActualTCPMessenger eventProcessor, Vector listeners,                           Hashtable listenersClients, Vector newConnections,                           Vector existingConnections, long sleepTime,                           int objectInputBufferSize) {        this.eventProcessor = eventProcessor;        this.listeners = listeners;        this.listenersClients = listenersClients;        this.newConnections = newConnections;        this.existingConnections = existingConnections;        this.sleepTime = sleepTime;        this.objectInputBufferSize = objectInputBufferSize;        this.run = true;    }    private void removeNewConnection(ActualTCPConnection connection) {        synchronized (newConnections) {            int index = newConnections.indexOf(connection);            newConnections.remove(index);        }    }    private void removeExistingConnection(ActualTCPConnection connection) {        synchronized (existingConnections) {            int index = existingConnections.indexOf(connection);            existingConnections.remove(index);        }    }    private void connectionReady(ActualTCPConnection connection,                                 boolean removeOld) {        if (removeOld) {            removeNewConnection(connection);        }        existingConnections.add(connection);    }    private void connectionNotReady(ActualTCPConnection connection,                                    boolean removeOld) {        if (removeOld) {            removeExistingConnection(connection);        }        newConnections.add(connection);    }    private void connectionCheckConnect(ActualTCPConnection curConnection) {        try {            if (curConnection.getSocketChannel().finishConnect()) {                connectionReady(curConnection, true);            }        } catch (Exception e) {            curConnection.setErrorFlag(true);        }    }    private void connectionStartConnect(ActualTCPConnection curConnection) {        try {            // Make the channel non-blocking            curConnection.getSocketChannel().configureBlocking(false);            // Initiate connect            if (curConnection.getSocketChannel().connect(                    curConnection.getRemoteSocketAddress())) {                // Connected already, so move to connection list                connectionReady(curConnection, true);            }        } catch (Exception exception) {            throw new RuntimeException("TCPTHREAD: Unable to start connecting: "                                       + exception.getMessage());        }    }    /** Start the connection process (bind/open) and then move to existingConnections when completed */    private void processNewConnections() {        int curIndex = 0;        ActualTCPConnection curConnection;        while (true) {            // Get the next new connection            synchronized (newConnections) {                if (curIndex < newConnections.size()) {                    curConnection =                        (ActualTCPConnection) newConnections.elementAt(                            curIndex);                } else {                    return;                }            }            // Check to make sure it is not closed            if ( !(curConnection.isClosed())) {                // Check if it is connected, if so move to existing list                if (curConnection.isConnected()) {                    connectionReady(curConnection, true);                    // Do not increment index since this connection was removed, shifting remaining ones up by one                    curIndex = curIndex;                    continue;                }                // Check if it is in the process of connection, call finishConnect to check status                if (curConnection.isConnectionPending()) {                    connectionCheckConnect(curConnection);                    // Move to next connection                    curIndex++;                    continue;                }                // Connection is open, but connect is not started                connectionStartConnect(curConnection);                // Move to next connection                curIndex++;            } else {                // Connection was closed, remove from list                removeNewConnection(curConnection);            }        }    }    private void connectionSend(ActualTCPConnection curConnection) {        ByteBuffer sendData = curConnection.dequeueData();        if (sendData != null) {            // Move data onto the socket out-bound buffer            try {                curConnection.getSocketChannel().write(sendData);            } catch (Exception e) {                connectionNotReady(curConnection, true);            }            // If the buffer was not completely emptied, requeue            if (sendData.remaining() > 0) {                curConnection.requeueData(sendData);            }        }    }    private boolean connectionReceive(ActualTCPConnection curConnection) {        try {}        catch (Exception e) {            connectionNotReady(curConnection, true);        }        return false;    }    /** Send any data on the queue, Check for new data and perform callback when necessary */    private boolean processExistingConnections() {        boolean skip = false;        int curIndex = 0;        ActualTCPConnection curConnection;        while (true) {            // Get the next new connection            synchronized (existingConnections) {                if (curIndex < existingConnections.size()) {                    curConnection =                        (ActualTCPConnection) existingConnections.elementAt(                            curIndex);                } else {                    return skip;                }            }            connectionSend(curConnection);            // If data was received, set return value, skip, to true to force immediate re-polling            if (connectionReceive(curConnection)) {                skip = true;            }        }    }    private void connectionAccept(ServerSocketChannel curChannel) {        try {            SocketChannel newChannel = curChannel.accept();            if (newChannel != null) {                newChannel.configureBlocking(false);                TCPClient theClient =                    (TCPClient) listenersClients.get(                        new Integer(newChannel.socket().getLocalPort()));                ActualTCPConnection newConnection =                    new ActualTCPConnection(                        newChannel,                        (InetSocketAddress) newChannel.socket().getRemoteSocketAddress(),                        theClient, objectInputBufferSize);                connectionNotReady(newConnection, false);            }        } catch (Exception exception) {            throw new RuntimeException(                "TCPTHREAD: Unable to accept TCP connection: "                + exception.getMessage());        }    }    /** Check if there are new connections */    private void processListeners() {        int curIndex = 0;        ServerSocketChannel curChannel;        while (true) {            // Get the next new connection            synchronized (listeners) {                if (curIndex < listeners.size()) {                    curChannel =                        (ServerSocketChannel) listeners.elementAt(curIndex);                } else {                    return;                }            }            connectionAccept(curChannel);        }    }    /**     * Method run     */    public void run() {        while (run) {            boolean skip = false;            processNewConnections();            // If data was received, force immediate re-polling            if (processExistingConnections()) {                skip = true;            }            processListeners();            if ( !skip) {                try {                    Thread.sleep(sleepTime);                } catch (InterruptedException e) {}            }        }    }    /**     * Method end     */    public void end() {        run = false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -