⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 receiver.java

📁 Short Message Peer to Peer
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
			pdu = tryReceivePDUWithTimeout(connection, null, timeout);
		}
		return pdu;
	}

	/**
	 * Called from session to receive a response for previously sent request.
	 *
	 * @param expectedPDU the template for expected PDU; the PDU returned
	 *                    must have the same sequence number
	 * @return the received PDU or null if none
	 * @see ReceiverBase#tryReceivePDUWithTimeout(Connection,PDU,long)
	 */
	public synchronized PDU receive(PDU expectedPDU)
		throws UnknownCommandIdException, TimeoutException, NotSynchronousException, PDUException, IOException {
		PDU pdu = null;
		if (!asynchronous) {
			pdu = tryReceivePDUWithTimeout(connection, expectedPDU);
		}
		return pdu;
	}

	/**
	 * This method tries to receive one PDU from the connection.
	 * It is called in cycle from <code>tryReceivePDUWithTimeout</code> until
	 * timeout expires. <code>tryReceivePDUWithTimeout</code> is called
	 * either from <code>receiveAsync</code> as asynchronous receive on
	 * background or from <code>receive</code> as synchronous receive.
	 * It either gets pdu from the queue or tries to receive it from connection
	 * using <code>receivePDUFromConnection</code> depending on
	 * the value of the <code>receiver</code> flag. The method
	 * checks if the actualy received PUD is equal to <code>expectedPDU</code>.
	 *
	 * @exception IOException exception during communication
	 * @exception PDUException incorrect format of PDU
	 * @exception TimeoutException rest of PDU not received for too long time
	 * @exception UnknownCommandIdException PDU with unknown id was received
	 * @see ReceiverBase#tryReceivePDUWithTimeout(Connection,PDU,long)
	 * @see #receiveAsync()
	 * @see ReceiverBase#run()
	 */
	protected PDU tryReceivePDU(Connection connection, PDU expectedPDU)
		throws UnknownCommandIdException, TimeoutException, PDUException, IOException {
		PDU pdu = null;
		if (receiver) {
			debug.write(DRXTXD2, "Is receiver/transciever => trying to get from queue.");
			synchronized (pduQueue) {
				if (expectedPDU == null) { // i.e. any pdu is acceptable
					if (!pduQueue.isEmpty()) {
						pdu = (PDU) pduQueue.dequeue();
					}
				} else {
					pdu = (PDU) pduQueue.dequeue(expectedPDU);
				}
				if (pdu == null) {
					try {
						pduQueue.wait(getQueueWaitTimeout());
					} catch (InterruptedException e) {
						// we don't care
						debug.write(DRXTX, "tryReceivePDU got interrupt waiting for queue");
					}
				}
			}
		} else {
			debug.write(DRXTX, "Is transmitter only => trying to receive from connection.");
			pdu = receivePDUFromConnection(connection, unprocessed);
			if (pdu != null) {
				if ((expectedPDU == null) || !pdu.equals(expectedPDU)) {
					debug.write(DRXTX, "This is not the pdu we expect, processing" + pdu.debugString());
					enqueue(pdu);
					pdu = null;
				}
			}
		}
		return pdu;
	}

	/**
	 * This method receives a PDU from connection and stores it into
	 * <code>pduQueue</code>. It's called from the <code>ReceiverBase</code>'s
	 * p<code>process</code> method which is called in loop from
	 * <code>ProcessingThread</code>'s <code>run</code> method.
	 * <p>
	 * If an exception occurs during receiving, depending on type
	 * of the exception this method either just reports the exception to
	 * debug & event objects or stops processing to indicate
	 * that it isn't able to process the exception. The function
	 * <code>setTermException</code> is then called with the caught exception.
	 * 
	 * @see ReceiverBase#run()
	 */
	protected void receiveAsync() {
		PDU pdu = null;
		try {
			debug.write(DRXTXD2, "Receiver.receiveAsync() going to receive pdu.");
			pdu = receivePDUFromConnection(connection, unprocessed);
			// we must catch every exception as this is thread running
			// on the background and we don't want the thread to be terminated
		} catch (InvalidPDUException e) {
			// thrown when enough data were received but further parsing
			// required more than indicated by CommandLength, i.e. pdu is
			// corrupted or further parsing didn't find terminating zero
			// of a c-string i.e. pdu is corrupted
			// must send generic nack anyway
			event.write(e, "Receiver.receiveAsync(): received PDU is invalid.");
			PDU expdu = e.getPDU();
			int seqNr = expdu == null ? 0 : expdu.getSequenceNumber();
			sendGenericNack(Data.ESME_RINVMSGLEN, seqNr);
		} catch (UnknownCommandIdException e) {
			// if received unknown pdu, we must send generic nack
			event.write(e, "Receiver.receiveAsync(): Unknown command id.");
			sendGenericNack(Data.ESME_RINVCMDID, e.getSequenceNumber());
		} catch (TimeoutException e) {
			// too long had unprocessed data
			debug.write(DRXTX, "Receiver.receiveAsync() too long had an uncomplete message.");
		} catch (PDUException e) {
			// something wrong with the PDU
			event.write(e, "Receiver.receiveAsync()");
		} catch (Exception e) {
			// don't know what happen, let's end the show
			event.write(e, "Receiver.receiveAsync()");
			stopProcessing(e);
		}
		if (pdu != null) {
			debug.write(DRXTX, "Receiver.receiveAsync(): PDU received, processing " + pdu.debugString());
			if (asynchronous) {
				process(pdu);
			} else {
				enqueue(pdu);
			}
		}
	}

	/**
	 * Passes the <code>pdu</code> to the <code>pduListener</code>.
	 *
	 * @param pdu the PDU to pass to the processor as an
	 *            <code>ServerPDUEvent</code>
	 * @see Queue
	 * @see ServerPDUEventListener
	 */
	private void process(PDU pdu) {
		debug.write(DRXTX, "receiver passing pdu to ServerPDUEventListener");
		if (pduListener != null) {
			ServerPDUEvent pduReceived = new ServerPDUEvent(this, connection, pdu);
			pduListener.handleEvent(pduReceived);
			debug.write(DRXTX, "ServerPDUEventListener received pdu");
		} else {
			debug.write(
				DRXTX,
				"async receiver doesn't have ServerPDUEventListener, " + "discarding " + pdu.debugString());
		}
	}

	/**
	 * Puts the <code>pdu</code> into the <code>pduQueue</code>.
	 *
	 * @param pdu the PDU to put into the queue
	 * @see Queue
	 */
	private void enqueue(PDU pdu) {
		debug.write(DRXTX, "receiver enqueuing pdu.");
		synchronized (pduQueue) {
			pduQueue.enqueue(pdu);
			pduQueue.notifyAll();
		}
		// HNK -- If you just received unbind_resp, don't block on
		// socket in TCPIPConnection.receive(), call stopProcessing()
		if (Data.UNBIND_RESP == pdu.getCommandId()) {
			stopProcessing(null);
		}
	}

	/**
	 * Sends <code>GenericNack</code> PDU via transmitter if there is one.
	 * The <code>GenericNack</code> is sent in case that the message is
	 * corrupted or has unknown command id. If the sending of
	 * <code>GenericNack</code> fails, this method calls
	 * <code>stopProcessing</code> and thus stops the receiving
	 * thread.
	 *
	 * @param commandStatus the error code
	 * @param sequenceNumber the sequence number of received wrong PDU
	 * @see GenericNack
	 * @see Transmitter
	 */
	private void sendGenericNack(int commandStatus, int sequenceNumber) {
		if (transmitter != null) {
			try {
				GenericNack gnack = new GenericNack(commandStatus, sequenceNumber);
				transmitter.send(gnack);
			} catch (IOException gnacke) {
				event.write(gnacke, "Receiver.run(): IOException sending generic_nack.");
			} catch (Exception gnacke) {
				event.write(gnacke, "Receiver.run(): an exception sending generic_nack.");
				stopProcessing(gnacke);
			}
		}
	}

	/**
	 * Sets queue waiting timeout.
	 *
	 * @param timeout the new queue timeout
	 * @see #queueWaitTimeout
	 */
	public void setQueueWaitTimeout(long timeout) {
		queueWaitTimeout = timeout;
	}

	/**
	 * Returns current queue waiting timeout.
	 *
	 * @return the current queue timeout
	 * @see #queueWaitTimeout
	 */
	public long getQueueWaitTimeout() {
		return queueWaitTimeout;
	}

	// ProcessingThread's getThreadName override
	public String getThreadName() {
		return RECEIVER_THREAD_NAME;
	}

}
/*
 * $Log: Receiver.java,v $
 * Revision 1.2  2003/12/16 14:38:31  sverkera
 * Bugfix from smsforum:
 * Without this code, although you just received a successful unbind_resp from the smsc, the connection object makes a blocking read call with a timeout of 60 seconds. After 60 seconds, when control returns the receiver is then shut down by the session. This is not acceptable for cases where you have a connection pool because
 * an attempt to retireConnection(conn) makes an attempt to unbind which blocks for a minute and causes all other threads sharing the pool to block. This defeats the purpose of connection pooling.
 *
 * Revision 1.1  2003/07/23 00:28:39  sverkera
 * Imported
 *
 * 
 * Old changelog:
 * 13-07-01 ticp@logica.com start(), stop(), setQueueWaitTimeout() &
 *						    getQueueWaitTimeout() made not synchronized;
 *						    receive(long) & receive(PDU) made synchronized
 *						    so the receiver no longer locks up
 * 13-07-01 ticp@logica.com bug fixed in tryReceivePDU which caused that the PDUs
 *						    were never removed from the queue - now dequeue(expected)
 *						    is now used instead of find(expected)
 * 13-07-01 ticp@logica.com loads of debug lines corrected; some added
 * 08-08-01 ticp@logica.com added support for Session's asynchronous processing capability
 * 26-09-01 ticp@logica.com debug code categorized to groups
 * 01-10-01 ticp@logica.com added function getThreadName for ProcessingThread
 *						    thread name initialisation.
 * 02-10-01 ticp@logica.com instead of importing full packages only the used
 *						    classes are iported
 */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -