pipeclientconnectionimpl.java
来自「This is a resource based on j2me embedde」· Java 代码 · 共 332 行
JAVA
332 行
/* * * * Copyright 1990-2008 Sun Microsystems, Inc. All Rights Reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License version * 2 only, as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License version 2 for more details (a copy is * included at /legal/license.txt). * * You should have received a copy of the GNU General Public License * version 2 along with this work; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa * Clara, CA 95054 or visit www.sun.com if you need additional * information or have any questions. */package com.sun.midp.io.j2me.pipe;import com.sun.midp.io.j2me.pipe.serviceProtocol.PipeServiceProtocol;import com.sun.midp.io.ConnectionBaseAdapter;import com.sun.midp.io.pipe.PipeConnection;import com.sun.midp.links.ClosedLinkException;import com.sun.midp.links.Link;import com.sun.midp.links.LinkMessage;import java.io.IOException;import java.io.InputStream;import java.io.InterruptedIOException;import javax.microedition.io.Connection;import com.sun.midp.security.SecurityToken;import java.util.Vector;import javax.microedition.io.Connector;/** * Implementation of PipeConnection interface. Uses Links as bearer. Uses * com.sun.midp.io.j2me.pipe.serviceProtocol.* for organazing messaging over * bearer. */class PipeClientConnectionImpl extends ConnectionBaseAdapter implements PipeConnection { private static final boolean DEBUG = false; private static final String CLOSE_OUTPUT_COMMAND = "closeOutputStream"; private PipeServiceProtocol pipe; private SecurityToken token; private Object suiteId; private String serverName; private String version; private Link sendLink; private Link receiveLink; private Thread receiverThread; private Receiver receiver; private Vector receiveQueue = new Vector(1); private int topReceivedMsgOffset; private IOException receiveStatus = new IOException(); private int receiveQueueByteCount; private boolean receivedEOF; PipeClientConnectionImpl(SecurityToken token, PipeServiceProtocol pipe) { this.pipe = pipe; this.token = token; serverName = pipe.getServerName(); version = pipe.getServerVersionRequested(); } PipeClientConnectionImpl(Object suiteId, String serverName, String version, SecurityToken token) { this.token = token; this.suiteId = suiteId; this.serverName = serverName; this.version = version; } void establishTransfer(int mode) throws IOException { receiveLink = pipe.getInboundLink(); sendLink = pipe.getOutboundLink(); initStreamConnection(mode); if (mode == Connector.READ || mode == Connector.READ_WRITE) { receiver = new Receiver(); receiverThread = new Thread(receiver); receiveStatus = null; receiverThread.start(); } } public InputStream openInputStream() throws IOException { InputStream is = super.openInputStream(); return is; } protected void notifyClosedInput() { if (DEBUG) debugPrint("input closed"); super.notifyClosedInput(); receiveLink.close(); if (receiveStatus == null) { // input was closed by application, not because of receiver failure receiveStatus = new IOException(); } } protected void notifyClosedOutput() { if (DEBUG) debugPrint("output closed"); super.notifyClosedOutput(); sendLink.close(); } void establish(int mode) throws IOException { pipe = PipeServiceProtocol.getService(token); pipe.bindClient(serverName, version); establishTransfer(mode); } public Connection openPrim(String name, int mode, boolean timeouts) throws IOException { throw new IOException("This method should not be called because it should not exist. Please refactor"); } protected void disconnect() throws IOException { if (DEBUG) debugPrint("disconnected"); synchronized (receiver) { receiver.notify(); } } public int available() throws IOException { if (DEBUG) debugPrint("available " + receiveQueueByteCount + " bytes"); return receiveQueueByteCount; } protected synchronized int readBytes(byte[] b, int off, int len) throws IOException { if (DEBUG) debugPrint("readBytes len=" + len + ", can read " + (iStreams > 0)); if (iStreams == 0) { if (DEBUG) debugPrint("readBytes input closed. isEOF " + receivedEOF + ", status " + receiveStatus); if (receivedEOF) return -1; throw receiveStatus; } int originalOffset = off; synchronized (receiver) { while (len > 0) { if (receiveQueue.size() == 0) { // need more bytes, check if receiver is running if (receiveStatus == null && !receivedEOF) { // fine, now wait for data to come if (DEBUG) debugPrint("readBytes: waiting for Receiver"); try { receiver.wait(); } catch (InterruptedException ex) { try { closeInputStream(); } catch (IOException iOException) { // ignore } throw new InterruptedIOException(ex.toString()); } } // check receiver status once more finish processing if no more // data could be obtained if (receiveStatus != null || receivedEOF) { if (DEBUG) debugPrint("readBytes: Receiver finshed with " + receiveStatus); // we've got receiver stopped and no more data. close input try { closeInputStream(); } catch (IOException iOException) { // ignore } if (receivedEOF) { break; } else { throw receiveStatus; } } } // get bytes from receive queue byte[] bufferedMsg = (byte[]) receiveQueue.elementAt(0); int chunkSize = bufferedMsg.length - topReceivedMsgOffset; if (DEBUG) debugPrint("readBytes: fetching data. " + chunkSize + " bytes remain in next message"); if (chunkSize >= len) { chunkSize = len; } System.arraycopy(bufferedMsg, topReceivedMsgOffset, b, off, chunkSize); topReceivedMsgOffset += chunkSize; if (topReceivedMsgOffset == bufferedMsg.length) { receiveQueue.removeElementAt(0); topReceivedMsgOffset = 0; } if (DEBUG) debugPrint("readBytes len=" + len + ", read " + chunkSize); len -= chunkSize; off += chunkSize; receiveQueueByteCount -= chunkSize; } } if (DEBUG) debugPrint("readBytes: read " + (off - originalOffset) + " bytes"); int bytesRead = off - originalOffset; return bytesRead == 0 ? -1 : bytesRead; } private void debugPrint(String msg) { System.out.println("[pipe client conn " + Integer.toHexString(hashCode()) + "] " + msg); } protected int writeBytes(byte[] b, int off, int len) throws IOException { if (DEBUG) debugPrint("writeBytes len=" + len + " can write " + (oStreams > 0)); if (oStreams == 0) throw new IOException(); if (len == 0) return 0; LinkMessage lm = LinkMessage.newDataMessage(b, off, len); if (DEBUG) debugPrint("writeBytes: sending message"); sendLink.send(lm); if (DEBUG) debugPrint("writeBytes: wrote " + len + " bytes"); return len; } protected void closeOutputStream() throws IOException { if (sendLink.isOpen()) { try { LinkMessage lm = LinkMessage.newStringMessage(CLOSE_OUTPUT_COMMAND); sendLink.send(lm); } catch (IOException ex) { // ignore } } super.closeOutputStream(); } public String getRequestedServerVersion() { return version; } public String getServerName() { return serverName; } private class Receiver implements Runnable { public void run() { while (!receivedEOF && receiveStatus == null) { LinkMessage lm; byte[] data = null; try { if (DEBUG) debugPrint("Receiver waiting"); lm = receiveLink.receive(); if (DEBUG) debugPrint("Receiver got message"); if (lm.containsString()) { String command = lm.extractString(); if (CLOSE_OUTPUT_COMMAND.equals(command)) { receivedEOF = true; } else { // we should never get here but for the sake of consistency let's handle // this case receiveStatus = new IOException("Unsupported: " + command); } } else { data = lm.extractData(); } } catch (IOException iOException) { if (DEBUG) debugPrint("Receiver got exception " + iOException); receiveStatus = iOException; if (receiveStatus instanceof ClosedLinkException) receivedEOF = true; } synchronized (this) { if (data != null) { receiveQueue.addElement(data); receiveQueueByteCount += data.length; } notify(); } } if (DEBUG) debugPrint("Receiver finished"); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?