⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bridgeunifiedmodule.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
  {    if (userName != null && password != null)      cnx = cnxFact.createConnection(userName, password);    else      cnx = cnxFact.createConnection();    cnx.setExceptionListener(this);    if (clientID != null)      cnx.setClientID(clientID);    producerSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);    producer = producerSession.createProducer(dest);    consumerSession = cnx.createSession(true, 0);  }  /**   * Sets the JMS consumer on the foreign destination.    *   * @exception JMSException  If the JMS consumer could not be created.   */  protected void setConsumer() throws JMSException  {    if (consumer != null)      return;    try {      if (dest instanceof Queue)        consumer = consumerSession.createConsumer(dest, selector);      else        consumer = consumerSession.createDurableSubscriber((Topic) dest,                                                           agentId.toString(),                                                           selector,                                                           false);    }    catch (JMSException exc) {      throw exc;    }    catch (Exception exc) {      throw new JMSException("JMS resources do not allow to create consumer: "                             + exc);    }  }  /**   * Unsets the JMS consumer.    */  protected void unsetConsumer()  {    try {      if (dest instanceof Topic)        consumerSession.unsubscribe(agentId.toString());      consumer.close();    }    catch (Exception exc) {}    consumer = null;  }  /**   * Acknowledges a message successfuly delivered to the foreign JMS server.   */  protected void acknowledge(Message message)  {    Channel.sendTo(agentId, new BridgeAckNot(message.getIdentifier()));  }  /**    * The <code>StartupDaemon</code> thread is responsible for retrieving   * the needed JMS administered objects from the JNDI server.   */  protected class StartupDaemon extends Daemon  {    /** Constructs a <code>StartupDaemon</code> thread. */    protected StartupDaemon()    {      super(agentId.toString() + ":StartupDaemon");      setDaemon(false);    }    /** The daemon's loop. */    public void run()    {      javax.naming.Context jndiCtx = null;      try {        canStop = true;        // Administered objects still to be retrieved: getting them from        // JNDI.        if (cnxFact == null || dest == null) {          if (jndiFactory == null || jndiUrl == null)            jndiCtx = new javax.naming.InitialContext();          else {            java.util.Hashtable env = new java.util.Hashtable();            env.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, jndiFactory);            env.put(javax.naming.Context.PROVIDER_URL, jndiUrl);            jndiCtx = new javax.naming.InitialContext(env);          }          cnxFact = (ConnectionFactory) jndiCtx.lookup(cnxFactName);          dest = (Destination) jndiCtx.lookup(destName);        }        try {          doConnect();        }        catch (AbstractMethodError exc) {          usable = false;          notUsableMessage = "Retrieved administered objects types not "                           + "compatible with the 'unified' communication "                           + " mode: " + exc;        }        catch (ClassCastException exc) {          usable = false;          notUsableMessage = "Retrieved administered objects types not "                           + "compatible with the chosen communication mode: "                           + exc;        }        catch (JMSSecurityException exc) {          usable = false;          notUsableMessage = "Provided user identification does not allow "                           + "to connect to the foreign JMS server: "                           + exc;        }        catch (JMSException exc) {          reconnectionDaemon.reconnect();        }        catch (Throwable exc) {          usable = false;          notUsableMessage = "" + exc;        }      }      catch (javax.naming.NameNotFoundException exc) {        usable = false;        if (cnxFact == null)          notUsableMessage = "Could not retrieve ConnectionFactory ["                             + cnxFactName                             + "] from JNDI: " + exc;        else if (dest == null)          notUsableMessage = "Could not retrieve Destination ["                             + destName                             + "] from JNDI: " + exc;      }      catch (javax.naming.NamingException exc) {        usable = false;        notUsableMessage = "Could not access JNDI: " + exc;      }      catch (ClassCastException exc) {        usable = false;        notUsableMessage = "Error while retrieving administered objects "                           + "through JNDI possibly because of missing "                           + "foreign JMS client libraries in classpath: "                           + exc;      }      catch (Exception exc) {        usable = false;        notUsableMessage = "Error while retrieving administered objects "                           + "through JNDI: "                            + exc;      }      finally {        // Closing the JNDI context.        try {          jndiCtx.close();        }        catch (Exception exc) {}        finish();      }    }    /** Shuts the daemon down. */    public void shutdown()    {}    /** Releases the daemon's resources. */    public void close()    {}  }  /**    * The <code>ReconnectionDaemon</code> thread is responsible for reconnecting   * the bridge module with the foreign JMS server in case of disconnection.   */  protected class ReconnectionDaemon extends Daemon  {    /** Number of reconnection trials of the first step. */    private int attempts1 = 30;    /** Retry interval (in milliseconds) of the first step. */    private long interval1 = 1000L;    /** Number of reconnection trials of the second step. */    private int attempts2 = 55;    /** Retry interval (in milliseconds) of the second step. */    private long interval2 = 5000L;    /** Retry interval (in milliseconds) of the third step. */    private long interval3 = 60000L;    /** Constructs a <code>ReconnectionDaemon</code> thread. */    protected ReconnectionDaemon()    {      super(agentId.toString() + ":ReconnectionDaemon");      setDaemon(false);    }    /** Notifies the daemon to start reconnecting. */    protected void reconnect() {      if (running)        return;      consumer = null;      start();    }     /** The daemon's loop. */    public void run()    {      int attempts = 0;      long interval;      Message msg;      try {        while (running) {          canStop = true;           attempts++;          if (attempts <= 30)            interval = interval1;          else if (attempts <= 55)                     interval = interval2;          else            interval = interval3;          try {            Thread.sleep(interval);            doConnect();                        // Setting the listener, if any.            if (listener)              setMessageListener();            // Starting the consumer daemon:            consumerDaemon.start();            // Sending the pending messages, if any:            while (! qout.isEmpty())              send((Message) qout.remove(0));          }          catch (Exception exc) {            continue;          }          canStop = false;          break;        }      }      finally {        finish();      }    }    /** Shuts the daemon down. */    public void shutdown()    {}    /** Releases the daemon's resources. */    public void close()    {}  }   /**    * The <code>ConsumerDaemon</code> thread allows to call   * <code>MessageConsumer.receive()</code> for requesting a foreign JMS   * message without blocking the JORAM server.   */  protected class ConsumerDaemon extends Daemon  {    /** Counter of pending "receive" requests. */    private int requests = 0;    /** Constructs a <code>ReceiverDaemon</code> thread. */    protected ConsumerDaemon()    {      super(agentId.toString() + ":ConsumerDaemon");      setDaemon(false);    }    /** Notifies the daemon of a new "receive" request. */    protected synchronized void receive()    {      requests++;      if (running)        return;      start();    }    /** The daemon's loop. */    public void run()    {      try {        Message momMessage;        BridgeDeliveryNot notif;        setConsumer();        cnx.start();        while (requests > 0 && running) {          canStop = true;           // Expecting a message:          try {            momMessage = MessageConverterModule.convert(consumer.receive());            consumerSession.commit();          }          // Conversion error: denying the message.          catch (MessageFormatException messageExc) {            consumerSession.rollback();            continue;          }          // Processing the delivery.          canStop = false;          notif = new BridgeDeliveryNot(momMessage);          Channel.sendTo(agentId, notif);          requests--;        }      }      // Connection loss?      catch (JMSException exc) {}      finally {        finish();      }    }    /** Shuts the daemon down. */    public void shutdown()    {}    /** Releases the daemon's resources. */    public void close()    {}  }  /** Deserializes a <code>BridgeUnifiedModule</code> instance. */  private void readObject(java.io.ObjectInputStream in)               throws java.io.IOException, ClassNotFoundException  {    in.defaultReadObject();    qout = new Vector();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -