📄 actualtcpmessenger.java
字号:
/* * @(#)$Id: ActualTCPMessenger.java,v 1.4 2004/07/02 23:59:22 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package runtime.services.network.tcp;import execution.EndExecution;import java.io.ByteArrayOutputStream;import java.io.ObjectOutputStream;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.HashMap;import java.util.Hashtable;import java.util.Vector;import services.LocalNode;import services.network.Payload;import services.network.tcp.TCPClient;import services.network.tcp.TCPConnection;import services.network.tcp.TCPMessenger;/** * Class ActualTCPMessenger * */public class ActualTCPMessenger implements TCPMessenger { private Object syncObject; private HashMap listenersTable; private Hashtable listenersClients; private Vector listeners; private Vector newConnections; private Vector existingConnections; private HashMap connectionTable; private int objectInputBufferSize; private ActualTCPThread runner; /** * Constructor ActualTCPMessenger * * @param syncObject * @param sleepTime * @param objectInputBufferSize */ public ActualTCPMessenger(Object syncObject, long sleepTime, int objectInputBufferSize) { this.syncObject = syncObject; this.objectInputBufferSize = objectInputBufferSize; this.listenersTable = new HashMap(); this.listenersClients = new Hashtable(); this.listeners = new Vector(); this.newConnections = new Vector(); this.existingConnections = new Vector(); this.connectionTable = new HashMap(); runner = new ActualTCPThread(this, listeners, listenersClients, newConnections, existingConnections, sleepTime, objectInputBufferSize); runner.start(); } /** * Method listen * * @param portNumber * @param client * @return */ public boolean listen(Integer portNumber, TCPClient client) { // Retrieve existing listener if it exists ServerSocketChannel theListener = (ServerSocketChannel) listenersTable.get(portNumber); if (theListener == null) { try { theListener = ServerSocketChannel.open(); theListener.socket().bind( new InetSocketAddress( LocalNode.myIPAddress, portNumber.intValue())); listeners.add(theListener); listenersTable.put(portNumber, theListener); listenersClients.put(portNumber, client); return true; } catch (Exception exception) { throw new RuntimeException( "UDPMESSENGER: Unable to listen on TCP port: " + portNumber + "(" + exception.getMessage() + ")"); } } else { return false; } } /** * Method release * * @param portNumber */ public void release(Integer portNumber) { ServerSocketChannel theListener = (ServerSocketChannel) listenersTable.remove(portNumber); if (theListener != null) { listeners.remove(theListener); listenersClients.remove(portNumber); try { theListener.close(); } catch (Exception e) {} } } /** * Method connect * * @param destination * @param client * @return */ public TCPConnection connect(InetSocketAddress destination, TCPClient client) { SocketChannel theChannel; try { // Create the socket channel/socket theChannel = SocketChannel.open(); } catch (Exception e) { return null; } // Create the connection ActualTCPConnection theConnection = new ActualTCPConnection(theChannel, destination, client, objectInputBufferSize); // Add the connection to the new connections list (thread will iniate binding/opening) newConnections.add(theConnection); // Add the connection info the connectionTable to enable sharing connectionTable.put(destination, theConnection); return theConnection; } /** * Method disconnect * * @param connection */ public void disconnect(TCPConnection connection) { try { // Remove the connection from the existingConnections to stop polling for new data existingConnections.remove(connection); connectionTable.remove( ((ActualTCPConnection) connection).getLocalSocketAddress()); // Close the channel ((ActualTCPConnection) connection).getSocketChannel().close(); } catch (Exception e) {} } /** * Method send * * @param destination * @param client * @param data * @return */ public TCPConnection send(InetSocketAddress destination, TCPClient client, Payload data) { // Attempt to find existing connection TCPConnection theConnection = (TCPConnection) connectionTable.get(destination); // Create new connection if old does not exist if (theConnection == null) { theConnection = connect(destination, client); } // Send the data send(theConnection, data); return theConnection; } /** * Method send * * @param connection * @param data * @return */ public boolean send(TCPConnection connection, Payload data) { try { // Convert object into byte array ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); ObjectOutputStream outObject = new ObjectOutputStream(outBytes); outObject.writeObject(data); byte theBytes[] = outBytes.toByteArray(); // Convert byte array into ByteBuffer ByteBuffer outBuffer = ByteBuffer.allocate(theBytes.length); outBuffer.get(theBytes); ((ActualTCPConnection) connection).queueData(outBuffer); return true; } catch (Exception e) { // Problem sending return false; } } /** * Method runEvent * * @param connection * @param data */ protected void runEvent(TCPConnection connection, Payload data) { // run the event synchronized (syncObject) { try { ((ActualTCPConnection) connection).getClient().handleTCPNetwork( connection, data); } catch (Exception exception) { EndExecution.masterSync = exception; runner.end(); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -