📄 actualtcpthread.java
字号:
/* * @(#)$Id: ActualTCPThread.java,v 1.3 2004/07/02 23:59:22 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package runtime.services.network.tcp;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Hashtable;import java.util.Vector;import services.network.tcp.TCPClient;/** * Class ActualTCPThread * */public class ActualTCPThread extends Thread implements Runnable { private ActualTCPMessenger eventProcessor; private Hashtable listenersClients; private Vector listeners; private Vector newConnections; private Vector existingConnections; private long sleepTime; private int objectInputBufferSize; private boolean run; /** * Constructor ActualTCPThread * * @param eventProcessor * @param listeners * @param listenersClients * @param newConnections * @param existingConnections * @param sleepTime * @param objectInputBufferSize */ public ActualTCPThread(ActualTCPMessenger eventProcessor, Vector listeners, Hashtable listenersClients, Vector newConnections, Vector existingConnections, long sleepTime, int objectInputBufferSize) { this.eventProcessor = eventProcessor; this.listeners = listeners; this.listenersClients = listenersClients; this.newConnections = newConnections; this.existingConnections = existingConnections; this.sleepTime = sleepTime; this.objectInputBufferSize = objectInputBufferSize; this.run = true; } private void removeNewConnection(ActualTCPConnection connection) { synchronized (newConnections) { int index = newConnections.indexOf(connection); newConnections.remove(index); } } private void removeExistingConnection(ActualTCPConnection connection) { synchronized (existingConnections) { int index = existingConnections.indexOf(connection); existingConnections.remove(index); } } private void connectionReady(ActualTCPConnection connection, boolean removeOld) { if (removeOld) { removeNewConnection(connection); } existingConnections.add(connection); } private void connectionNotReady(ActualTCPConnection connection, boolean removeOld) { if (removeOld) { removeExistingConnection(connection); } newConnections.add(connection); } private void connectionCheckConnect(ActualTCPConnection curConnection) { try { if (curConnection.getSocketChannel().finishConnect()) { connectionReady(curConnection, true); } } catch (Exception e) { curConnection.setErrorFlag(true); } } private void connectionStartConnect(ActualTCPConnection curConnection) { try { // Make the channel non-blocking curConnection.getSocketChannel().configureBlocking(false); // Initiate connect if (curConnection.getSocketChannel().connect( curConnection.getRemoteSocketAddress())) { // Connected already, so move to connection list connectionReady(curConnection, true); } } catch (Exception exception) { throw new RuntimeException("TCPTHREAD: Unable to start connecting: " + exception.getMessage()); } } /** Start the connection process (bind/open) and then move to existingConnections when completed */ private void processNewConnections() { int curIndex = 0; ActualTCPConnection curConnection; while (true) { // Get the next new connection synchronized (newConnections) { if (curIndex < newConnections.size()) { curConnection = (ActualTCPConnection) newConnections.elementAt( curIndex); } else { return; } } // Check to make sure it is not closed if ( !(curConnection.isClosed())) { // Check if it is connected, if so move to existing list if (curConnection.isConnected()) { connectionReady(curConnection, true); // Do not increment index since this connection was removed, shifting remaining ones up by one curIndex = curIndex; continue; } // Check if it is in the process of connection, call finishConnect to check status if (curConnection.isConnectionPending()) { connectionCheckConnect(curConnection); // Move to next connection curIndex++; continue; } // Connection is open, but connect is not started connectionStartConnect(curConnection); // Move to next connection curIndex++; } else { // Connection was closed, remove from list removeNewConnection(curConnection); } } } private void connectionSend(ActualTCPConnection curConnection) { ByteBuffer sendData = curConnection.dequeueData(); if (sendData != null) { // Move data onto the socket out-bound buffer try { curConnection.getSocketChannel().write(sendData); } catch (Exception e) { connectionNotReady(curConnection, true); } // If the buffer was not completely emptied, requeue if (sendData.remaining() > 0) { curConnection.requeueData(sendData); } } } private boolean connectionReceive(ActualTCPConnection curConnection) { try {} catch (Exception e) { connectionNotReady(curConnection, true); } return false; } /** Send any data on the queue, Check for new data and perform callback when necessary */ private boolean processExistingConnections() { boolean skip = false; int curIndex = 0; ActualTCPConnection curConnection; while (true) { // Get the next new connection synchronized (existingConnections) { if (curIndex < existingConnections.size()) { curConnection = (ActualTCPConnection) existingConnections.elementAt( curIndex); } else { return skip; } } connectionSend(curConnection); // If data was received, set return value, skip, to true to force immediate re-polling if (connectionReceive(curConnection)) { skip = true; } } } private void connectionAccept(ServerSocketChannel curChannel) { try { SocketChannel newChannel = curChannel.accept(); if (newChannel != null) { newChannel.configureBlocking(false); TCPClient theClient = (TCPClient) listenersClients.get( new Integer(newChannel.socket().getLocalPort())); ActualTCPConnection newConnection = new ActualTCPConnection( newChannel, (InetSocketAddress) newChannel.socket().getRemoteSocketAddress(), theClient, objectInputBufferSize); connectionNotReady(newConnection, false); } } catch (Exception exception) { throw new RuntimeException( "TCPTHREAD: Unable to accept TCP connection: " + exception.getMessage()); } } /** Check if there are new connections */ private void processListeners() { int curIndex = 0; ServerSocketChannel curChannel; while (true) { // Get the next new connection synchronized (listeners) { if (curIndex < listeners.size()) { curChannel = (ServerSocketChannel) listeners.elementAt(curIndex); } else { return; } } connectionAccept(curChannel); } } /** * Method run */ public void run() { while (run) { boolean skip = false; processNewConnections(); // If data was received, force immediate re-polling if (processExistingConnections()) { skip = true; } processListeners(); if ( !skip) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) {} } } } /** * Method end */ public void end() { run = false; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -