📄 connection.java
字号:
} else { if (packetQueue.size() > 0) return ((SMPPPacket)packetQueue.remove(0)); else return (readNextPacketInternal()); } } /** Read the next packet from the SMSC link. Internal version...handles * special case packets like bind responses and unbind request and * responses. * @return The read SMPP packet, or null if the connection timed out. * @throws java.io.IOException If an I/O error occurs while reading from * the network connection. */ private SMPPPacket readNextPacketInternal() throws java.io.IOException, SMPPProtocolException { try { SMPPPacket pak = null; int id = -1, st = -1; this.buf = link.read(this.buf); id = SMPPIO.bytesToInt(this.buf, 4, 4); pak = PacketFactory.newInstance(id); if (pak != null) { pak.readFrom(this.buf, 0); if (logger.isDebugEnabled()) { StringBuffer b = new StringBuffer("Packet Received: "); int l = pak.getLength(); int s = pak.getCommandStatus(); int n = pak.getSequenceNum(); b.append("id:").append(Integer.toHexString(id)); b.append(" len:").append(Integer.toString(l)); b.append(" st:").append(Integer.toString(s)); b.append(" sq:").append(Integer.toString(n)); logger.debug(b.toString()); } // Special case handling for certain packet types.. st = pak.getCommandStatus(); switch (pak.getCommandId()) { case SMPPPacket.BIND_TRANSMITTER_RESP: case SMPPPacket.BIND_RECEIVER_RESP: case SMPPPacket.BIND_TRANSCEIVER_RESP: handleBindResp((BindResp)pak); break; case SMPPPacket.UNBIND_RESP: handleUnbindResp((UnbindResp)pak); break; case SMPPPacket.UNBIND: handleUnbind((Unbind)pak); break; } if (st == 0) { // Fix up the alphabet for this connection type if the // packet needs it. DCS value 0 means the alphabet is in the // default encoding of the SMSC, which varies depending on // implementation. if (defaultAlphabet != null && pak.getDataCoding() == 0) pak.setAlphabet(defaultAlphabet); } } return (pak); } catch (BadCommandIDException x) { throw new SMPPProtocolException("Unrecognised command received", x); } } /** Handle an incoming bind response packet. Method is called by a few * methods in this class that read from the incoming connection. */ private void handleBindResp(BindResp resp) { int st = resp.getCommandStatus(); if (state == BINDING && st == 0) setState(BOUND); else if (st != 0) setState(UNBOUND); // Read the version of the protocol supported at the SMSC. Number n = (Number)resp.getOptionalParameter(Tag.SC_INTERFACE_VERSION); if (n != null) { SMPPVersion smscVersion = SMPPVersion.getVersion(n.intValue()); if (logger.isDebugEnabled()) { logger.debug("SMSC reports its supported SMPP version as " + smscVersion.toString()); } // Downgrade this connection's version if the SMSC's version is // lower. if (smscVersion.isOlder(this.interfaceVersion)) { logger.info("Downgrading this connection's SMPP version to " + smscVersion.toString()); setInterfaceVersion(smscVersion); } } else { // Spec requires us to assume the SMSC does not support optional // parameters this.supportOptionalParams = false; logger.warn("Disabling optional parameter support as no sc_interface_version parameter was received"); } // Set the link timeout try { long linkTimeout = APIConfig.getInstance().getLong(APIConfig.LINK_TIMEOUT); link.setTimeout(linkTimeout); if (logger.isDebugEnabled()) { logger.debug("Set the link timeout to " + linkTimeout); } } catch (PropertyNotFoundException x) { if (logger.isDebugEnabled()) { logger.debug("No link timeout specified in configuration"); } } catch (java.lang.UnsupportedOperationException x) { logger.warn("Configuration specified a link timeout but the link implementation does not support it"); } } /** Handle an incoming unbind packet. */ private void handleUnbind(Unbind req) { logger.info("SMSC requested unbind"); setState(UNBINDING); } /** Handle an incoming unbind response packet. */ private void handleUnbindResp(UnbindResp resp) { if (state == UNBINDING && resp.getCommandStatus() == 0) { logger.info("Successfully unbound"); setState(UNBOUND); } } /** Set the event dispatcher for this connection object. Before using the * new event dispatcher, this method will call * {@link EventDispatcher#init} to initialise the dispatcher. It will then * iterate through all the observers registered with the current event * dispatcher and register them with the new one. * <p>It is not a particularly good idea to set the event dispatcher after * communications have begun. However, the observer copy is guarded against * multi-threaded access to the current event dispatcher. During the copy, * however, events will continue to be delievered via the current * dispatcher. Only <b>after</b> the copy is complete will the new event * dispatcher become the active one and events begin being delivered by * it.</p> * <p>The caller of this method can be sure that, once this method returns, * all new events will be handled by the new event dispatcher. However, * there may be events that occurred before, or during the operation of, the * call to this method which will be delivered by the old dispatcher. Once * the new event dispatcher is in place, the {@link EventDispatcher#destroy} * method will be called on the old dispatcher.</p> */ public void setEventDispatcher(EventDispatcher eventDispatcher) { if (eventDispatcher == null) throw new NullPointerException("Event dispatcher cannot be null"); eventDispatcher.init(); // Copy all current observers to the new event dispatcher.. synchronized (this.eventDispatcher) { Iterator iter = this.eventDispatcher.observerIterator(); while (iter.hasNext()) eventDispatcher.addObserver((ConnectionObserver)iter.next()); } EventDispatcher old = this.eventDispatcher; // ..and swap out the old dispatcher. this.eventDispatcher = eventDispatcher; // Clean up the old dispatcher. old.destroy(); } /** Add a connection observer to receive SMPP events from this connection. * If this connection is not using asynchronous communication, this method * call has no effect. * @param ob the ConnectionObserver implementation to add. */ public void addObserver(ConnectionObserver ob) { if (eventDispatcher != null) eventDispatcher.addObserver(ob); } /** Remove a connection observer from this Connection. */ public void removeObserver(ConnectionObserver ob) { if (eventDispatcher != null) eventDispatcher.removeObserver(ob); } /** Listener thread method for asynchronous communication. */ public void run() { SMPPPacket pak = null; int smppEx = 0, id = 0, st = 0; SMPPEvent exitEvent = null; int tooManyIOEx = 5; logger.info("Receiver thread started"); APIConfig cfg = APIConfig.getInstance(); try { tooManyIOEx = cfg.getInt(APIConfig.TOO_MANY_IO_EXCEPTIONS); } catch (PropertyNotFoundException x) { // just stick with the default logger.debug("Didn't find I/O exception config. Using default of " + tooManyIOEx); } eventDispatcher.notifyObservers(this, new ReceiverStartEvent(this)); try { while (state != UNBOUND) { try { pak = readNextPacketInternal(); if (pak == null) { // XXX Send an event to the application?? continue; } } catch (SocketTimeoutException x) { if (state == BINDING) { // bind timeout has expired logger.error("Socket timeout in BINDING state."); exitEvent = new ReceiverExitEvent(this, null, state); ((ReceiverExitEvent)exitEvent).setReason(ReceiverExitEvent.BIND_TIMEOUT); setState(UNBOUND); } else { // is it okay to ignore this ?? logger.info("Ignoring SocketTimeoutException"); } continue; } catch (EOFException x) { // The network connection has disappeared! Wah! logger.error("EOFException received in daemon thread.", x); // Will be caught by the general handler lower in this // method. throw x; } catch (IOException x) { logger.warn("I/O Exception caught", x); ReceiverExceptionEvent ev = new ReceiverExceptionEvent(this, x, state); eventDispatcher.notifyObservers(this, ev); smppEx++; if (smppEx > tooManyIOEx) { logger.warn("Too many IOExceptions in receiver thread", x); throw x; } continue; } // Reset smppEx back to zero if we reach here, as packet // reception was successful. smppEx = 0; id = pak.getCommandId(); st = pak.getCommandStatus(); // Handle special case packets.. switch (id) { case SMPPPacket.DELIVER_SM: if (ackDeliverSm) ackDeliverSm((DeliverSM)pak); break; case SMPPPacket.ENQUIRE_LINK: if (ackQryLinks) ackEnquireLink((EnquireLink)pak); break; } // Tell all the observers about the new packet logger.info("Notifying observers of packet received"); eventDispatcher.notifyObservers(this, pak); } // end while // Notify observers that the thread is exiting with no error.. exitEvent = new ReceiverExitEvent(this, null, state); } catch (Exception x) { logger.error("Fatal exception in receiver thread", x); exitEvent = new ReceiverExitEvent(this, x, state); setState(UNBOUND); } finally { // make sure other code doesn't try to restart the rcvThread.. rcvThread = null; } if (exitEvent != null) eventDispatcher.notifyObservers(this, exitEvent); // Clean up the event dispatcher. eventDispatcher.destroy(); } /** * @deprecated #ackEnquireLink */ public void ackLinkQuery(EnquireLink el) throws java.io.IOException { ackEnquireLink(el); } /** Get a new instance of an SMPP packet. The packet will be initialised so * that it uses the same SMPP version as this connection and it's sequence * number will be initialised to using this connection's sequence numbering * scheme. * @param commandId the SMPP command ID of the packet to retrieve. * @return a subclass of {@link ie.omk.smpp.message.SMPPPacket} * corresponding to SMPP command <code>commandId</code>. * @throws ie.omk.smpp.BadCommandIDException if the command ID is not * recognised. * @throws ie.omk.smpp.NotSupportedException if the Connection is currently * using an SMPP version which does not support SMPP command * <code>commandId</code>. */ public SMPPPacket newInstance(int commandId) throws BadCommandIDException, VersionException { if (!this.interfaceVersion.isSupported(commandId)) throw new VersionException("Command is not supported in this SMPP version"); SMPPPacket response = PacketFactory.newInstance(commandId); response.setVersion(this.interfaceVersion); if (this.seqNumScheme != null) response.setSequenceNum(this.seqNumScheme.nextNumber()); if (defaultAlphabet != null) response.setAlphabet(defaultAlphabet, 0); return (response); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -