📄 receiver.java
字号:
pdu = tryReceivePDUWithTimeout(connection, null, timeout);
}
return pdu;
}
/**
* Called from session to receive a response for previously sent request.
*
* @param expectedPDU the template for expected PDU; the PDU returned
* must have the same sequence number
* @return the received PDU or null if none
* @see ReceiverBase#tryReceivePDUWithTimeout(Connection,PDU,long)
*/
public synchronized PDU receive(PDU expectedPDU)
throws UnknownCommandIdException, TimeoutException, NotSynchronousException, PDUException, IOException {
PDU pdu = null;
if (!asynchronous) {
pdu = tryReceivePDUWithTimeout(connection, expectedPDU);
}
return pdu;
}
/**
* This method tries to receive one PDU from the connection.
* It is called in cycle from <code>tryReceivePDUWithTimeout</code> until
* timeout expires. <code>tryReceivePDUWithTimeout</code> is called
* either from <code>receiveAsync</code> as asynchronous receive on
* background or from <code>receive</code> as synchronous receive.
* It either gets pdu from the queue or tries to receive it from connection
* using <code>receivePDUFromConnection</code> depending on
* the value of the <code>receiver</code> flag. The method
* checks if the actualy received PUD is equal to <code>expectedPDU</code>.
*
* @exception IOException exception during communication
* @exception PDUException incorrect format of PDU
* @exception TimeoutException rest of PDU not received for too long time
* @exception UnknownCommandIdException PDU with unknown id was received
* @see ReceiverBase#tryReceivePDUWithTimeout(Connection,PDU,long)
* @see #receiveAsync()
* @see ReceiverBase#run()
*/
protected PDU tryReceivePDU(Connection connection, PDU expectedPDU)
throws UnknownCommandIdException, TimeoutException, PDUException, IOException {
PDU pdu = null;
if (receiver) {
debug.write(DRXTXD2, "Is receiver/transciever => trying to get from queue.");
synchronized (pduQueue) {
if (expectedPDU == null) { // i.e. any pdu is acceptable
if (!pduQueue.isEmpty()) {
pdu = (PDU) pduQueue.dequeue();
}
} else {
pdu = (PDU) pduQueue.dequeue(expectedPDU);
}
if (pdu == null) {
try {
pduQueue.wait(getQueueWaitTimeout());
} catch (InterruptedException e) {
// we don't care
debug.write(DRXTX, "tryReceivePDU got interrupt waiting for queue");
}
}
}
} else {
debug.write(DRXTX, "Is transmitter only => trying to receive from connection.");
pdu = receivePDUFromConnection(connection, unprocessed);
if (pdu != null) {
if ((expectedPDU == null) || !pdu.equals(expectedPDU)) {
debug.write(DRXTX, "This is not the pdu we expect, processing" + pdu.debugString());
enqueue(pdu);
pdu = null;
}
}
}
return pdu;
}
/**
* This method receives a PDU from connection and stores it into
* <code>pduQueue</code>. It's called from the <code>ReceiverBase</code>'s
* p<code>process</code> method which is called in loop from
* <code>ProcessingThread</code>'s <code>run</code> method.
* <p>
* If an exception occurs during receiving, depending on type
* of the exception this method either just reports the exception to
* debug & event objects or stops processing to indicate
* that it isn't able to process the exception. The function
* <code>setTermException</code> is then called with the caught exception.
*
* @see ReceiverBase#run()
*/
protected void receiveAsync() {
PDU pdu = null;
try {
debug.write(DRXTXD2, "Receiver.receiveAsync() going to receive pdu.");
pdu = receivePDUFromConnection(connection, unprocessed);
// we must catch every exception as this is thread running
// on the background and we don't want the thread to be terminated
} catch (InvalidPDUException e) {
// thrown when enough data were received but further parsing
// required more than indicated by CommandLength, i.e. pdu is
// corrupted or further parsing didn't find terminating zero
// of a c-string i.e. pdu is corrupted
// must send generic nack anyway
event.write(e, "Receiver.receiveAsync(): received PDU is invalid.");
PDU expdu = e.getPDU();
int seqNr = expdu == null ? 0 : expdu.getSequenceNumber();
sendGenericNack(Data.ESME_RINVMSGLEN, seqNr);
} catch (UnknownCommandIdException e) {
// if received unknown pdu, we must send generic nack
event.write(e, "Receiver.receiveAsync(): Unknown command id.");
sendGenericNack(Data.ESME_RINVCMDID, e.getSequenceNumber());
} catch (TimeoutException e) {
// too long had unprocessed data
debug.write(DRXTX, "Receiver.receiveAsync() too long had an uncomplete message.");
} catch (PDUException e) {
// something wrong with the PDU
event.write(e, "Receiver.receiveAsync()");
} catch (Exception e) {
// don't know what happen, let's end the show
event.write(e, "Receiver.receiveAsync()");
stopProcessing(e);
}
if (pdu != null) {
debug.write(DRXTX, "Receiver.receiveAsync(): PDU received, processing " + pdu.debugString());
if (asynchronous) {
process(pdu);
} else {
enqueue(pdu);
}
}
}
/**
* Passes the <code>pdu</code> to the <code>pduListener</code>.
*
* @param pdu the PDU to pass to the processor as an
* <code>ServerPDUEvent</code>
* @see Queue
* @see ServerPDUEventListener
*/
private void process(PDU pdu) {
debug.write(DRXTX, "receiver passing pdu to ServerPDUEventListener");
if (pduListener != null) {
ServerPDUEvent pduReceived = new ServerPDUEvent(this, connection, pdu);
pduListener.handleEvent(pduReceived);
debug.write(DRXTX, "ServerPDUEventListener received pdu");
} else {
debug.write(
DRXTX,
"async receiver doesn't have ServerPDUEventListener, " + "discarding " + pdu.debugString());
}
}
/**
* Puts the <code>pdu</code> into the <code>pduQueue</code>.
*
* @param pdu the PDU to put into the queue
* @see Queue
*/
private void enqueue(PDU pdu) {
debug.write(DRXTX, "receiver enqueuing pdu.");
synchronized (pduQueue) {
pduQueue.enqueue(pdu);
pduQueue.notifyAll();
}
// HNK -- If you just received unbind_resp, don't block on
// socket in TCPIPConnection.receive(), call stopProcessing()
if (Data.UNBIND_RESP == pdu.getCommandId()) {
stopProcessing(null);
}
}
/**
* Sends <code>GenericNack</code> PDU via transmitter if there is one.
* The <code>GenericNack</code> is sent in case that the message is
* corrupted or has unknown command id. If the sending of
* <code>GenericNack</code> fails, this method calls
* <code>stopProcessing</code> and thus stops the receiving
* thread.
*
* @param commandStatus the error code
* @param sequenceNumber the sequence number of received wrong PDU
* @see GenericNack
* @see Transmitter
*/
private void sendGenericNack(int commandStatus, int sequenceNumber) {
if (transmitter != null) {
try {
GenericNack gnack = new GenericNack(commandStatus, sequenceNumber);
transmitter.send(gnack);
} catch (IOException gnacke) {
event.write(gnacke, "Receiver.run(): IOException sending generic_nack.");
} catch (Exception gnacke) {
event.write(gnacke, "Receiver.run(): an exception sending generic_nack.");
stopProcessing(gnacke);
}
}
}
/**
* Sets queue waiting timeout.
*
* @param timeout the new queue timeout
* @see #queueWaitTimeout
*/
public void setQueueWaitTimeout(long timeout) {
queueWaitTimeout = timeout;
}
/**
* Returns current queue waiting timeout.
*
* @return the current queue timeout
* @see #queueWaitTimeout
*/
public long getQueueWaitTimeout() {
return queueWaitTimeout;
}
// ProcessingThread's getThreadName override
public String getThreadName() {
return RECEIVER_THREAD_NAME;
}
}
/*
* $Log: Receiver.java,v $
* Revision 1.2 2003/12/16 14:38:31 sverkera
* Bugfix from smsforum:
* Without this code, although you just received a successful unbind_resp from the smsc, the connection object makes a blocking read call with a timeout of 60 seconds. After 60 seconds, when control returns the receiver is then shut down by the session. This is not acceptable for cases where you have a connection pool because
* an attempt to retireConnection(conn) makes an attempt to unbind which blocks for a minute and causes all other threads sharing the pool to block. This defeats the purpose of connection pooling.
*
* Revision 1.1 2003/07/23 00:28:39 sverkera
* Imported
*
*
* Old changelog:
* 13-07-01 ticp@logica.com start(), stop(), setQueueWaitTimeout() &
* getQueueWaitTimeout() made not synchronized;
* receive(long) & receive(PDU) made synchronized
* so the receiver no longer locks up
* 13-07-01 ticp@logica.com bug fixed in tryReceivePDU which caused that the PDUs
* were never removed from the queue - now dequeue(expected)
* is now used instead of find(expected)
* 13-07-01 ticp@logica.com loads of debug lines corrected; some added
* 08-08-01 ticp@logica.com added support for Session's asynchronous processing capability
* 26-09-01 ticp@logica.com debug code categorized to groups
* 01-10-01 ticp@logica.com added function getThreadName for ProcessingThread
* thread name initialisation.
* 02-10-01 ticp@logica.com instead of importing full packages only the used
* classes are iported
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -