⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jtlsinputstream.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      int seqN = JTlsUtil.getSequenceNumber(elt);
      if (seqN == sequenceNumber) {
	// remove element from vector
	inputElements.removeElementAt(i);
	return elt;
      }
    }
    return null;
  }

  // We are called from TlsManager.processIncomingMessage(..);
  synchronized public void queueIncomingMessage(Message msg)
  {
    // Get message sequence number and note if an ACK
    // We can process acks because we do not queue them.
    int i = 0;
    int ackSeqn = 0;
    int msgSeqn = 0;

    Enumeration e = ((MessageImpl) msg).getElementsInFifoOrder();
    MessageElement elt = null;

    while (e.hasMoreElements()) {
      elt = (MessageElement)e.nextElement();
      ackSeqn = rcvdACK(elt);

      // See if we have an ACK
      if (ackSeqn != 0) {

	// See if this is a SACK
	byte[] data = elt.getBytesOffset();
	String sackStr = new String(data);
	String sackList = null;

	if (sackStr.indexOf(SACKKEY) != -1) {
	  // Extract SAC List
	  sackList = new String(data, SACKKEY.length(),
				data.length - SACKKEY.length());
	}

	if (LOG.isEnabledFor(Priority.INFO)) {
	  LOG.info("SACKLIST = " + sackList);
	}

	// take care of the ACK here;
	conn.jout.ackReceived(ackSeqn, sackList);
	msg = null;

	// wake up reader
	notifyAll();
	return;
      }

      // OK look for jxta message
      try {
	msgSeqn = JTlsUtil.getSequenceNumber(elt);
	if (msgSeqn != 0) {
	  // see if this is a duplicate
	  if (msgSeqn < mrrSequenceNumber) {

	    if (LOG.isEnabledFor(Priority.INFO)) {
	      LOG.info("RCVD DUPLICATE MSG: Discard seq# " + msgSeqn);
	    }

	    // force discard.
	    msgSeqn = -1;

	  }
	  break;
	}
      } catch (NumberFormatException n) {
	// Not a TLS element
	continue;
      }
    }

    // Make sure it is for us
    if (msgSeqn <= 0) {

      if (LOG.isEnabledFor(Priority.INFO) && msgSeqn == 0) {
	 LOG.info("TLS!! Received DUPLICATE or NON-TLS msg");
      }

      // jump start GC
      msg = null;

      // run other threads
      notifyAll();

      // Do not enqueue
      return;
    }

    // OK we must inqueue:
    //    Wait until someone dequeues if we are at the size limit
    while (inputQueue.size() >= MAXQUEUESIZE) {
      try {

	if (LOG.isEnabledFor(Priority.INFO) && (i++ % 10) == 0) {
	  LOG.info("NotifyAll(): enqueue WAIT, size = " + inputQueue.size());
	}

	// Wake up dequeuers
	notifyAll();

	wait();

      } catch (InterruptedException iex) {
	// just continue
      }
    }

    if (LOG.isEnabledFor(Priority.INFO)) {
      LOG.info("QUEUE incoming message:");
    }

    // This is a FIFO - Vector is synchronized
    //   1. Do not add duplicate messages
    //   2. Store in increasing sequence nos.
    i = inputQueue.size();
    int qIndex = -1;
    boolean duplicate = false;

    for (int j = 0; j < i; j++) {
      IQElt iq = (IQElt)inputQueue.elementAt(j);
      if (msgSeqn < iq.seqnum) {
	qIndex = j;
	break;
      } else if (msgSeqn == iq.seqnum) {
	duplicate = true;
	break;
      }
    }

    // Keep non-duplicates
    //  Note: sequenceNumber + 1 is our next expected
    //  sequence number.
    if (!duplicate && msgSeqn > mrrSequenceNumber) {

      IQElt iq = new IQElt();
      iq.seqnum = msgSeqn;
      iq.msg = msg;
      iq.elt = elt;
      iq.ackd = false;

      // append to input queue if qIndex == -1
      if (qIndex == -1) qIndex = inputQueue.size();

     //PDA requirement 18.02.2002
     // Vector.add -> Vector.insertElementAt
     // inputQueue.add(qIndex, iq);
     inputQueue.insertElementAt(iq, qIndex);

      if (LOG.isEnabledFor(Priority.INFO)) {
	 LOG.info("Enqueued msg with seq# " + msgSeqn +
		  " At index " + qIndex);
      }

    } else {
      // just toss
      msg = null;

      if (LOG.isEnabledFor(Priority.INFO)) {
	 LOG.info("TLS!! Discarded duplicate msg, seq# = " + msgSeqn);
      }

    }

    // Let dequeuers read
    notifyAll();

    if (LOG.isEnabledFor(Priority.INFO) &&
	SSLDebug.getDebug(SSLDebug.DEBUG_JXTA)) {
      LOG.info("NotifyAll(): N TLS Records queued = " + inputQueue.size());
    }

  }

  // Dequeue is done in local_read(..) just below.
  // Called from synchronized local_read(..)
  private IQElt dequeueMessage()
  {
    IQElt iQ = null;

    // Wait for incoming message here
    int wct = 0;
    while (inputQueue.size() == 0) {
      try {

	wct += 1;
	// Wake up enqueuer thread
	notifyAll();

	wait();

      } catch (InterruptedException e) {
	  // just continue
      }
    }

    if (inputQueue.size() > 0) {
      iQ = (IQElt)inputQueue.elementAt(0); // FIFO
      inputQueue.removeElementAt(0);

      if (LOG.isEnabledFor(Priority.INFO)) {
	LOG.info("DEQUEUED a TLS Record to process, seq# = " + iQ.seqnum);
	LOG.info("DEQUEUE waited " + wct + " times on empty input queue");
      }

    }

    // Wake up enqueue thread(s)
    notifyAll();

    return iQ;
  }

  // 1) If JTlsRecord is empty, then
  // read raw JXTA MSG element into the record.
  // 2) Return requested bytes to our callers.
  private MessageElement currentElt = null;
  synchronized private int local_read(byte[] a, int offset, int length)
    throws IOException
  {
    if (jtrec.size == 0 || jtrec.nextByte == jtrec.size) {
      // reset the record
      jtrec.resetRecord();	// GC as necessary(tlsRecord byte[])

      sequenceNumber += 1;	// next msg sequence number

      // Does our vector contain the required element with
      // this sequenceNumber?
      currentElt =  getVectorElement();

      // no, try and dequeue another message
      if (currentElt == null) {

	// Get the next element
	try {

	  // Place the next message on our InputStream in
	  // jmsg.
	  while (currentElt == null) {
	    IQElt iQ = null;

	    // read another message
	    // !! Here we will just GET the next message.
	    if ((iQ = dequeueMessage()) == null) {
	      continue;
	    } else {
	      // Return elt with expected sequence number
	      // or null. If null, read elements have been added to our
	      // inputElements queue.
	      currentElt = validateElement(iQ);

	      // Free all elements from jmsg. They have been queued
	      // in the inputElements Vector as noted above.
	      JTlsUtil.removeElements(iQ.msg);
	    }
	  }

	} catch (Exception e) {

	  if (LOG.isEnabledFor(Priority.INFO))
	    LOG.info("TLSInputStream exception: " + e);

	  return -1;
	}
      }
      // Send an ACK for the message that contained this TLS Record.
      mrrSequenceNumber = sequenceNumber;

      sendACK(0);

      // Get the length of the TLS Record
      jtrec.size = currentElt.getLength();

      if (tlsInputDebug) {
	if (LOG.isEnabledFor(Priority.INFO)) {
	  LOG.info("local_read: seq#" + sequenceNumber +
		   ", bytes = " + jtrec.size + "\n");
	}
      }

      // get byte array
      jtrec.tlsRecord = currentElt.getBytesOffset();

      if (LOG.isEnabledFor(Priority.INFO)) {

	LOG.info("Rcvd TLS Record[" + jtrec.size + " bytes]" +
		 " Seq# = " + sequenceNumber);

	// hex dump first 5 records
	if (sequenceNumber < 6) {
	  String hex = JTlsUtil.toHex(jtrec.tlsRecord, 0, jtrec.size);
	  LOG.info("HexDump:\n" + hex);
	}
      }

    }
    // return the requested TLS Record data
    // These calls should NEVER ask for more data than is in the
    // received TLS Record.

    int left = jtrec.size - jtrec.nextByte;
    int copyLen = (left < length ? left : length);

    System.arraycopy(jtrec.tlsRecord, jtrec.nextByte, a, offset, copyLen);
    jtrec.nextByte += copyLen;

    if (tlsInputDebug) {

      if (LOG.isEnabledFor(Priority.INFO)) {
	LOG.info("\nRequested " + length + ", Read " + copyLen + " bytes");
      }

    }

    return copyLen;
  }

  // Here we read the TLS Record data from the incoming JXTA message.
  // (We will really have a full jxta message available.)
  //
  // TLS  Record input only calls the following  methods.
  // They are called from SSLRecord.decode(SSLConn, Inputstream);
  //
  /*
   * Read a byte from the TLS Record which was received in
   * a JXTA Message.
   *
   * returns -1 on end of stream
   */
  public int read()
    throws java.io.IOException
  {
    byte[] a = new byte[1];
    int i = local_read(a, 0, 1);

    if (i != -1) {

      if (LOG.isEnabledFor(Priority.INFO) &&
	  SSLDebug.getDebug(SSLDebug.DEBUG_JXTA)) {
	LOG.info("Read() " + i);
      }

      i = (int)(a[0] & 0xFF);	// The byte

    }

    return i;
  }

  // Give requested bytes to TLS.
  public int read(byte[] a, int offset, int length)
    throws java.io.IOException {
      int i = local_read(a, offset, length);

      if (LOG.isEnabledFor(Priority.INFO)  &&
	  SSLDebug.getDebug(SSLDebug.DEBUG_JXTA)) {
	LOG.info("Read(byte[], int, " + length + "), bytes read = " + i);
      }

      return i;
  }

  // Read into a byte array
  public int read(byte[] a)
    throws java.io.IOException {
      int i = local_read(a, 0, a.length);
      return i;
  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -