📄 receiver.java
字号:
* ReceiverBase.run()<br> * Receiver.receiveAsync()<br> * ReceiverBase.receivePDUFromConnection<br> * Connection.receive()</code> * * @param timeout for how long is tried to receive a PDU * @return the received PDU or null if none received for the spec. timeout * * @exception IOException exception during communication * @exception PDUException incorrect format of PDU * @exception TimeoutException rest of PDU not received for too long time * @exception UnknownCommandIdException PDU with unknown id was received * @see ReceiverBase#tryReceivePDUWithTimeout(Connection,PDU,long) */ public synchronized PDU receive(long timeout) throws UnknownCommandIdException, TimeoutException, NotSynchronousException, PDUException, IOException { PDU pdu = null; if (!asynchronous) { pdu = tryReceivePDUWithTimeout(connection,null,timeout); } return pdu; } /** * Called from session to receive a response for previously sent request. * * @param expectedPDU the template for expected PDU; the PDU returned * must have the same sequence number * @return the received PDU or null if none * @see ReceiverBase#tryReceivePDUWithTimeout(Connection,PDU,long) */ public synchronized PDU receive(PDU expectedPDU) throws UnknownCommandIdException, TimeoutException, NotSynchronousException, PDUException, IOException { PDU pdu = null; if (!asynchronous) { pdu = tryReceivePDUWithTimeout(connection,expectedPDU); } return pdu; } /** * This method tries to receive one PDU from the connection. * It is called in cycle from <code>tryReceivePDUWithTimeout</code> until * timeout expires. <code>tryReceivePDUWithTimeout</code> is called * either from <code>receiveAsync</code> as asynchronous receive on * background or from <code>receive</code> as synchronous receive. * It either gets pdu from the queue or tries to receive it from connection * using <code>receivePDUFromConnection</code> depending on * the value of the <code>receiver</code> flag. The method * checks if the actualy received PUD is equal to <code>expectedPDU</code>. * * @exception IOException exception during communication * @exception PDUException incorrect format of PDU * @exception TimeoutException rest of PDU not received for too long time * @exception UnknownCommandIdException PDU with unknown id was received * @see ReceiverBase#tryReceivePDUWithTimeout(Connection,PDU,long) * @see #receiveAsync() * @see ReceiverBase#run() */ protected PDU tryReceivePDU(Connection connection, PDU expectedPDU) throws UnknownCommandIdException, TimeoutException, PDUException, IOException { PDU pdu = null; if (receiver) { debug.write(DRXTXD2,"Is receiver/transciever => trying to get from queue."); synchronized (pduQueue) { if (expectedPDU == null) { // i.e. any pdu is acceptable if (!pduQueue.isEmpty()) { pdu = (PDU)pduQueue.dequeue(); } } else { pdu = (PDU)pduQueue.dequeue(expectedPDU); } if (pdu == null) { try { pduQueue.wait(getQueueWaitTimeout()); } catch (InterruptedException e) { // we don't care debug.write(DRXTX,"tryReceivePDU got interrupt waiting for queue"); } } } } else { debug.write(DRXTX,"Is transmitter only => trying to receive from connection."); pdu = receivePDUFromConnection(connection,unprocessed); if (pdu != null) { if ((expectedPDU==null) || !pdu.equals(expectedPDU)) { debug.write(DRXTX,"This is not the pdu we expect, processing"+pdu.debugString()); enqueue(pdu); pdu = null; } } } return pdu; } /** * This method receives a PDU from connection and stores it into * <code>pduQueue</code>. It's called from the <code>ReceiverBase</code>'s * p<code>process</code> method which is called in loop from * <code>ProcessingThread</code>'s <code>run</code> method. * <p> * If an exception occurs during receiving, depending on type * of the exception this method either just reports the exception to * debug & event objects or stops processing to indicate * that it isn't able to process the exception. The function * <code>setTermException</code> is then called with the caught exception. * * @see ReceiverBase#run() */ protected void receiveAsync() { PDU pdu = null; try { debug.write(DRXTXD2,"Receiver.receiveAsync() going to receive pdu."); pdu = receivePDUFromConnection(connection,unprocessed); // we must catch every exception as this is thread running // on the background and we don't want the thread to be terminated } catch (InvalidPDUException e) { // thrown when enough data were received but further parsing // required more than indicated by CommandLength, i.e. pdu is // corrupted or further parsing didn't find terminating zero // of a c-string i.e. pdu is corrupted // must send generic nack anyway event.write(e, "Receiver.receiveAsync(): received PDU is invalid."); PDU expdu = e.getPDU(); int seqNr = expdu == null ? 0 : expdu.getSequenceNumber(); sendGenericNack(Data.ESME_RINVMSGLEN,seqNr); } catch (UnknownCommandIdException e) { // if received unknown pdu, we must send generic nack event.write(e, "Receiver.receiveAsync(): Unknown command id."); sendGenericNack(Data.ESME_RINVCMDID,e.getSequenceNumber()); } catch (TimeoutException e) { // too long had unprocessed data debug.write(DRXTX,"Receiver.receiveAsync() too long had an uncomplete message."); } catch (PDUException e) { // something wrong with the PDU event.write(e, "Receiver.receiveAsync()"); } catch (Exception e) { // don't know what happen, let's end the show event.write(e,"Receiver.receiveAsync()"); stopProcessing(e); } if (pdu != null) { debug.write(DRXTX,"Receiver.receiveAsync(): PDU received, processing " + pdu.debugString()); if (asynchronous) { process(pdu); } else { enqueue(pdu); } } } /** * Passes the <code>pdu</code> to the <code>pduListener</code>. * * @param pdu the PDU to pass to the processor as an * <code>ServerPDUEvent</code> * @see Queue * @see ServerPDUEventListener */ private void process(PDU pdu) { debug.write(DRXTX,"receiver passing pdu to ServerPDUEventListener"); if (pduListener != null) { ServerPDUEvent pduReceived = new ServerPDUEvent(this,connection,pdu); pduListener.handleEvent(pduReceived); } else { debug.write(DRXTX,"async receiver doesn't have ServerPDUEventListener, "+ "discarding "+pdu.debugString()); } } /** * Puts the <code>pdu</code> into the <code>pduQueue</code>. * * @param pdu the PDU to put into the queue * @see Queue */ private void enqueue(PDU pdu) { debug.write(DRXTX,"receiver enqueuing pdu."); synchronized (pduQueue) { pduQueue.enqueue(pdu); pduQueue.notifyAll(); } } /** * Sends <code>GenericNack</code> PDU via transmitter if there is one. * The <code>GenericNack</code> is sent in case that the message is * corrupted or has unknown command id. If the sending of * <code>GenericNack</code> fails, this method calls * <code>stopProcessing</code> and thus stops the receiving * thread. * * @param commandStatus the error code * @param sequenceNumber the sequence number of received wrong PDU * @see GenericNack * @see Transmitter */ private void sendGenericNack(int commandStatus, int sequenceNumber) { if (transmitter != null) { try { GenericNack gnack = new GenericNack(commandStatus,sequenceNumber); transmitter.send(gnack); } catch (IOException gnacke) { event.write(gnacke, "Receiver.run(): IOException sending generic_nack."); } catch (Exception gnacke) { event.write(gnacke, "Receiver.run(): an exception sending generic_nack."); stopProcessing(gnacke); } } } /** * Sets queue waiting timeout. * * @param timeout the new queue timeout * @see #queueWaitTimeout */ public void setQueueWaitTimeout(long timeout) { queueWaitTimeout = timeout; } /** * Returns current queue waiting timeout. * * @return the current queue timeout * @see #queueWaitTimeout */ public long getQueueWaitTimeout() { return queueWaitTimeout; } // ProcessingThread's getThreadName override public String getThreadName() { return RECEIVER_THREAD_NAME; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -