📄 asynctcprawconnection.java
字号:
/* * @(#)$Id: AsyncTCPRawConnection.java,v 1.12 2004/09/15 18:05:32 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. */package runtime.services.network.tcpraw;import java.net.InetSocketAddress;import java.net.Socket;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.util.LinkedList;import org.apache.log4j.Logger;import runtime.schedulers.ASyncCore;import services.Output;import services.network.tcpraw.TCPRawClient;import services.network.tcpraw.TCPRawConnection;import util.logging.LogMessage;import util.logging.StructuredLogMessage;/** * Class AsyncTCPRawConnection * */public class AsyncTCPRawConnection implements TCPRawConnection, ASyncCore.SelectableCB { private static Logger logger = Logger.getLogger(AsyncTCPRawConnection.class); private SocketChannel socketChannel; private Socket socket; private ASyncCore core; private SelectionKey selectionKey; private TCPRawClient client; private boolean error; private LinkedList buffer; private boolean needToWrite; /** * Constructor AsyncTCPRawConnection * * @param socketChannel * @param core * @param client */ public AsyncTCPRawConnection(SocketChannel socketChannel, ASyncCore core, TCPRawClient client) { this.socketChannel = socketChannel; this.core = core; this.client = client; this.socket = socketChannel.socket(); this.error = false; this.buffer = new LinkedList(); this.needToWrite = false; try { selectionKey = core.register_selectable(socketChannel, 0, this, this); } catch (Exception e) { disconnect(true); logger.error(new LogMessage(new Object[]{ "Unable to register selector for channel ", socketChannel}), e); } channelConnectReady(); } /** * Method getRemoteSocketAddress * @return */ public InetSocketAddress getRemoteSocketAddress() { return (InetSocketAddress) socket.getRemoteSocketAddress(); } /** * Method getLocalSocketAddress * @return */ public InetSocketAddress getLocalSocketAddress() { return (InetSocketAddress) socket.getLocalSocketAddress(); } /** * Method isClosed * @return */ public boolean isClosed() { return socket.isClosed(); } /** * Method isConnected * @return */ public boolean isConnected() { return socket.isConnected(); } /** * Method isConnectionPending * @return */ public boolean isConnectionPending() { return ((isClosed() == false) && (isConnected() == false)); } /** * Method isError * @return */ public boolean isError() { return error; } /** * Method read * * @param destinationBuffer * @return */ public int read(ByteBuffer destinationBuffer) { if (isConnected() && (error == false)) { try { int bytesRead = socketChannel.read(destinationBuffer); if (bytesRead == -1) { disconnect(true); } return bytesRead; } catch (Exception e) { logger.error(new LogMessage(new Object[]{ "Unable to read from channel ", socketChannel}), e); return -1; } } else { return -1; } } /** * Method write * * @param sourceBuffer * @return */ public int write(ByteBuffer sourceBuffer) { if (error == false) { try { // If the buffer is empty, register the WRITE for selection if (isConnected()) { if ( !needToWrite) { selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); needToWrite = true; } } else { needToWrite = true; } // Add byte buffer to buffer buffer.addLast(sourceBuffer); if (sourceBuffer != null) { return sourceBuffer.limit() - sourceBuffer.position(); } else { return -1; } } catch (Exception e) { logger.error(new LogMessage(new Object[]{ "Unable to prepare write to channel ", socketChannel}), e); return -1; } } else { return -1; } } /** * Method select_cb * * @param key * @param user_data */ public void select_cb(SelectionKey key, Object user_data) { try { if (key.isConnectable()) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Selector on connect for channel ", socketChannel})); } channelConnectReady(); if ( !key.isValid()) { return; } } if ((needToWrite && key.isWritable() && (socketChannel.isConnected()))) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Selector on write for channel ", socketChannel})); } channelWriteReady(); if ( !key.isValid()) { return; } } if ((key.isReadable() && (socketChannel.isConnected()))) { if (Output.debuggingEnabled) { logger.debug(new LogMessage(new Object[]{ "Selector on read for channel ", socketChannel})); } channelReadReady(); if ( !key.isValid()) { return; } } } catch (Exception e) { logger.error(new LogMessage(new Object[]{ "Unable to process selector on channel ", socketChannel}), e); } } private void channelConnectReady() { try { if (socketChannel.finishConnect()) { selectionKey.interestOps(SelectionKey.OP_READ); if (needToWrite) { selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } else { selectionKey.interestOps(SelectionKey.OP_CONNECT); } } catch (Exception e) { disconnect(true); logger.error(new LogMessage(new Object[]{ "Unable to finish connection on channel ", socketChannel}), e); if (client != null) { Integer eventID = StructuredLogMessage.getReference(); if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( eventID, "Beginning TCPRaw Error Handler", null, null)); } client.handleTCPRawNetworkError(this); if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( eventID, "Completed TCPRaw Error Handler", null, null)); } } } } private void channelWriteReady() { if (error == false) { if (isConnected()) { // Write as many buffers to the socket as possible while (buffer.size() > 0) { ByteBuffer curBuffer = (ByteBuffer) buffer.removeFirst(); if (curBuffer != null) { if (curBuffer.position() != curBuffer.limit()) { try { socketChannel.write(curBuffer); } catch (Exception e) { disconnect(true); logger.error(new LogMessage(new Object[]{ "Unable to write to channel ", socketChannel}), e); return; } } // This buffer did not finish, place back on buffer and break loop if (curBuffer.position() != curBuffer.limit()) { buffer.addFirst(curBuffer); break; } } } // No more data in buffer, deregister WRITE OP if (buffer.size() == 0) { selectionKey.interestOps(SelectionKey.OP_READ); needToWrite = false; } } } } private void channelReadReady() { // Tell the client data is ready try { if ((client != null) && (error == false)) { Integer eventID = StructuredLogMessage.getReference(); if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( eventID, "Beginning TCPRaw Data Handler", null, null)); } client.handleTCPRawNetwork(this); if (Output.debuggingEnabled) { logger.debug( new StructuredLogMessage( eventID, "Completed TCPRaw Data Handler", null, null)); } } } catch (Exception e) { disconnect(true); logger.error(new LogMessage(new Object[]{ "Unable to read from channel ", socketChannel}), e); } } /** * Method disconnect */ public void disconnect() { channelWriteReady(); disconnect(false); } /** * Method disconnect * * @param errorState */ public void disconnect(boolean errorState) { error = errorState; try { if (isConnected()) { selectionKey.interestOps(0); core.unregister_selectable(selectionKey); socketChannel.close(); } } catch (Exception e) { logger.error(new LogMessage(new Object[]{"Unable to close channel ", socketChannel}), e); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -