⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 actualtcpmessenger.java

📁 High performance DB query
💻 JAVA
字号:
/* * @(#)$Id: ActualTCPMessenger.java,v 1.4 2004/07/02 23:59:22 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704.  Attention:  Intel License Inquiry. */package runtime.services.network.tcp;import execution.EndExecution;import java.io.ByteArrayOutputStream;import java.io.ObjectOutputStream;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.HashMap;import java.util.Hashtable;import java.util.Vector;import services.LocalNode;import services.network.Payload;import services.network.tcp.TCPClient;import services.network.tcp.TCPConnection;import services.network.tcp.TCPMessenger;/** * Class ActualTCPMessenger * */public class ActualTCPMessenger implements TCPMessenger {    private Object syncObject;    private HashMap listenersTable;    private Hashtable listenersClients;    private Vector listeners;    private Vector newConnections;    private Vector existingConnections;    private HashMap connectionTable;    private int objectInputBufferSize;    private ActualTCPThread runner;    /**     * Constructor ActualTCPMessenger     *     * @param syncObject     * @param sleepTime     * @param objectInputBufferSize     */    public ActualTCPMessenger(Object syncObject, long sleepTime,                              int objectInputBufferSize) {        this.syncObject = syncObject;        this.objectInputBufferSize = objectInputBufferSize;        this.listenersTable = new HashMap();        this.listenersClients = new Hashtable();        this.listeners = new Vector();        this.newConnections = new Vector();        this.existingConnections = new Vector();        this.connectionTable = new HashMap();        runner = new ActualTCPThread(this, listeners, listenersClients,                                     newConnections, existingConnections,                                     sleepTime, objectInputBufferSize);        runner.start();    }    /**     * Method listen     *     * @param portNumber     * @param client     * @return     */    public boolean listen(Integer portNumber, TCPClient client) {        // Retrieve existing listener if it exists        ServerSocketChannel theListener =            (ServerSocketChannel) listenersTable.get(portNumber);        if (theListener == null) {            try {                theListener = ServerSocketChannel.open();                theListener.socket().bind(                    new InetSocketAddress(                        LocalNode.myIPAddress, portNumber.intValue()));                listeners.add(theListener);                listenersTable.put(portNumber, theListener);                listenersClients.put(portNumber, client);                return true;            } catch (Exception exception) {                throw new RuntimeException(                    "UDPMESSENGER: Unable to listen on TCP port: " + portNumber                    + "(" + exception.getMessage() + ")");            }        } else {            return false;        }    }    /**     * Method release     *     * @param portNumber     */    public void release(Integer portNumber) {        ServerSocketChannel theListener =            (ServerSocketChannel) listenersTable.remove(portNumber);        if (theListener != null) {            listeners.remove(theListener);            listenersClients.remove(portNumber);            try {                theListener.close();            } catch (Exception e) {}        }    }    /**     * Method connect     *     * @param destination     * @param client     * @return     */    public TCPConnection connect(InetSocketAddress destination,                                 TCPClient client) {        SocketChannel theChannel;        try {            // Create the socket channel/socket            theChannel = SocketChannel.open();        } catch (Exception e) {            return null;        }        // Create the connection        ActualTCPConnection theConnection = new ActualTCPConnection(theChannel,                                                destination, client,                                                objectInputBufferSize);        // Add the connection to the new connections list (thread will iniate binding/opening)        newConnections.add(theConnection);        // Add the connection info the connectionTable to enable sharing        connectionTable.put(destination, theConnection);        return theConnection;    }    /**     * Method disconnect     *     * @param connection     */    public void disconnect(TCPConnection connection) {        try {            // Remove the connection from the existingConnections to stop polling for new data            existingConnections.remove(connection);            connectionTable.remove(                ((ActualTCPConnection) connection).getLocalSocketAddress());            // Close the channel            ((ActualTCPConnection) connection).getSocketChannel().close();        } catch (Exception e) {}    }    /**     * Method send     *     * @param destination     * @param client     * @param data     * @return     */    public TCPConnection send(InetSocketAddress destination, TCPClient client,                              Payload data) {        // Attempt to find existing connection        TCPConnection theConnection =            (TCPConnection) connectionTable.get(destination);        // Create new connection if old does not exist        if (theConnection == null) {            theConnection = connect(destination, client);        }        // Send the data        send(theConnection, data);        return theConnection;    }    /**     * Method send     *     * @param connection     * @param data     * @return     */    public boolean send(TCPConnection connection, Payload data) {        try {            // Convert object into byte array            ByteArrayOutputStream outBytes = new ByteArrayOutputStream();            ObjectOutputStream outObject = new ObjectOutputStream(outBytes);            outObject.writeObject(data);            byte theBytes[] = outBytes.toByteArray();            // Convert byte array into ByteBuffer            ByteBuffer outBuffer = ByteBuffer.allocate(theBytes.length);            outBuffer.get(theBytes);            ((ActualTCPConnection) connection).queueData(outBuffer);            return true;        } catch (Exception e) {            // Problem sending            return false;        }    }    /**     * Method runEvent     *     * @param connection     * @param data     */    protected void runEvent(TCPConnection connection, Payload data) {        // run the event        synchronized (syncObject) {            try {                ((ActualTCPConnection) connection).getClient().handleTCPNetwork(                    connection, data);            } catch (Exception exception) {                EndExecution.masterSync = exception;                runner.end();            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -