⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ngnetwork.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
//             // end of message sending, if it is an acknowledge remove it//             // from sendlist else wait for ack.//             if (sendlist.currentMessage().not == null) {//               logmon.log(BasicLevel.DEBUG, getName() + " remove ack sent");//               sendlist.removeCurrent();//             }//           }          Message msg = sendlist.nextMessage();          if (msg == null) {            bufout = null;            // There is no more message to send, unsubscribe this channel            // for write operation.            if (logmon.isLoggable(BasicLevel.DEBUG))              logmon.log(BasicLevel.DEBUG, getName() + " write-4x:" + msg);            channel.register(selector, SelectionKey.OP_READ, this);          } else {            if (logmon.isLoggable(BasicLevel.DEBUG))              logmon.log(BasicLevel.DEBUG, getName() + " write-4:" + msg);            mos.writeMessage(msg);            if (msg.not == null) {              logmon.log(BasicLevel.DEBUG, getName() + " remove ack sent");              sendlist.removeCurrent();            }          }        }      }    }    /**     * Method called each time the channel is Readable     */    private synchronized void read() throws Exception {      int bytes = channel.read(bufin);      if (logmon.isLoggable(BasicLevel.DEBUG))        logmon.log(BasicLevel.DEBUG, getName() + " reads: " + bytes);      if (bytes == 0) return;      if (bytes < 0) {        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG, getName() + " cnx remotely closed");	close();	return;      }      bufin.flip();      while (bytes > 0) {//         logmon.log(BasicLevel.FATAL,//                    "mis.getBuffer()=" + mis.getBuffer().length);//         logmon.log(BasicLevel.FATAL,//                    "mis.getCount()=" + mis.getCount());//         logmon.log(BasicLevel.FATAL,//                    "mis.length=" + mis.length);//         logmon.log(BasicLevel.FATAL,//                    "bytes=" + bytes);        	if (mis.length ==  -1) {//	if (mis.msg ==  null) {          // Reads the message header.	  if ((mis.getCount() + bytes) < 28) {            bufin.get(mis.getBuffer(), mis.getCount(), bytes);	    mis.setCount(mis.getCount() + bytes);	    bytes = 0;	  } else {	    bufin.get(mis.getBuffer(), mis.getCount(), 28-mis.getCount());	    bytes -= 28-mis.getCount();	    mis.setCount(28);            Message msg = mis.readHeader();            if (mis.length == 28) {              if (logmon.isLoggable(BasicLevel.DEBUG))                logmon.log(BasicLevel.DEBUG,                           getName() + ", ack received #" + msg.stamp);//            logmon.log(BasicLevel.FATAL, msg.toString());              doAck(msg.stamp);              msg.free();              // Reset data structures for next messages              mis.length = -1;              mis.msg = null;              mis.setCount(0);            }//             // Reads the message length// 	  try {// 	    for (; nbread <28; nbread++) {//               bufin.get(mis.buf, nbread, 28-nbread);// 	      bytes -= 1;// 	    }//             if (logmon.isLoggable(BasicLevel.DEBUG))//               logmon.log(BasicLevel.DEBUG, getName() + " get length: " + length);// 	    // Allocates byte array for message storage// 	    array = new byte[length];// 	    nbread = 0;// 	  } catch (BufferUnderflowException exc) {// 	    break; 	  }	} else {          // The message header is read, reads the notification if any.	  if ((mis.getCount() + bytes) < (mis.length-28)) {	    bufin.get(mis.getBuffer(), mis.getCount(), bytes);	    mis.setCount(mis.getCount() + bytes);	    bytes = 0;	  } else {	    bufin.get(mis.getBuffer(), mis.getCount(), mis.length-28-mis.getCount());	    bytes -= mis.length-28-mis.getCount();	    mis.setCount(mis.length-28);                        Message msg = mis.readMessage();            //  Keep message stamp in order to acknowledge it (be careful,            // the message get a new stamp to be delivered).            int stamp = msg.getStamp();            if (logmon.isLoggable(BasicLevel.DEBUG))              logmon.log(BasicLevel.DEBUG,                         getName() + ", message received #" + stamp);//          logmon.log(BasicLevel.FATAL, msg.toString());            deliver(msg);            ack(stamp);              	    // Reset data structures for next messages	    mis.length = -1;            mis.msg = null;	    mis.setCount(0);	  }	}      }      bufin.clear();    }    /**     * Class used to read messages through a stream.     */    final class MessageInputStream extends ByteArrayInputStream {      int length = -1;      Message msg = null;      MessageInputStream() {        super(new byte[512]);        count = 0;      }      public void reset() {        super.reset();        length = -1;        msg = null;      }      byte[] getBuffer() {        return buf;      }      int getCount() {        return count;      }      void setCount(int count) {        this.count = count;      }      Message readHeader() throws Exception {        // Reads boot timestamp of source server        length = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) +          ((buf[2] & 0xFF) <<  8) + ((buf[3] & 0xFF) <<  0);        msg = Message.alloc();        // Reads sender's AgentId        msg.from = new AgentId(          (short) (((buf[4] & 0xFF) <<  8) + (buf[5] & 0xFF)),          (short) (((buf[6] & 0xFF) <<  8) + (buf[7] & 0xFF)),          ((buf[8] & 0xFF) << 24) + ((buf[9] & 0xFF) << 16) +          ((buf[10] & 0xFF) <<  8) + ((buf[11] & 0xFF) <<  0));        // Reads adressee's AgentId        msg.to = new AgentId(          (short) (((buf[12] & 0xFF) <<  8) + (buf[13] & 0xFF)),          (short) (((buf[14] & 0xFF) <<  8) + (buf[15] & 0xFF)),          ((buf[16] & 0xFF) << 24) + ((buf[17] & 0xFF) << 16) +          ((buf[18] & 0xFF) <<  8) + ((buf[19] & 0xFF) <<  0));        // Reads source server id of message        msg.source = (short) (((buf[20] & 0xFF) <<  8) +                              ((buf[21] & 0xFF) <<  0));        // Reads destination server id of message        msg.dest = (short) (((buf[22] & 0xFF) <<  8) +                            ((buf[23] & 0xFF) <<  0));        // Reads stamp of message        msg.stamp = ((buf[24] & 0xFF) << 24) +          ((buf[25] & 0xFF) << 16) +          ((buf[26] & 0xFF) <<  8) +          ((buf[27] & 0xFF) <<  0);        if ((length -28) > buf.length)          buf = new byte[length -28];        count = 0;        return msg;      }      Message readMessage() throws Exception {        if (length > 28) {          // Reads notification attributes          boolean persistent = ((buf[28] & 0x01) == 0x01) ? true : false;          boolean detachable = ((buf[28] & 0x10) == 0x10) ? true : false;          pos = 1;          // Reads notification object          ObjectInputStream ois = new ObjectInputStream(this);          msg.not = (Notification) ois.readObject();          msg.not.persistent = persistent;          msg.not.detachable = detachable;          msg.not.detached = false;        } else {          msg.not = null;        }        return msg;      }    }    /**     * Removes all messages in sendList previous to the ack'ed one.     * Be careful, messages in sendList are not always in stamp order.     * Its method should not be synchronized, it scans the list from     * begin to end, and it removes always the first element. Other     * methods using sendList just adds element at the end.     */    final private void doAck(int ack) throws IOException {      Message msg = null;      try {        //  Suppress the acknowledged notification from waiting list,        // and deletes it.        msg = sendlist.removeMessage(ack);        AgentServer.getTransaction().begin();        msg.delete();        msg.free();        AgentServer.getTransaction().commit();        AgentServer.getTransaction().release();        if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG,                     getName() + ", remove msg#" + msg.getStamp());      } catch (NoSuchElementException exc) {        logmon.log(BasicLevel.WARN,                   getName() + ", can't ack, unknown msg#" + ack);      }    }    final private void ack(int stamp) throws Exception {      if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG,                     getName() + ", set ack msg#" + stamp);      Message ack = Message.alloc(AgentId.localId,                                  AgentId.localId(server.sid),                                  null);      ack.source = AgentServer.getServerId();      ack.dest = AgentServer.getServerDesc(server.sid).gateway;      ack.stamp = stamp;      send(ack);    }    void close() throws IOException {      if (logmon.isLoggable(BasicLevel.DEBUG))          logmon.log(BasicLevel.DEBUG, getName() + ", close");      try {        channel.keyFor(selector).cancel();      } catch (Exception exc) {}      try {        channel.close();      } catch (Exception exc) {        //      } finally {        channel = null;      }      nbwrite = 0;      bufout = null;    }        public String toString() {      StringBuffer strbuf = new StringBuffer();      strbuf.append('(').append(super.toString());      strbuf.append(',').append(name);      strbuf.append(',').append(channel);      strbuf.append(',').append(nbwrite);      strbuf.append(',').append(sendlist).append(')');      return strbuf.toString();    }  }  final class MessageVector {    /**     * The array buffer into which the components of the vector are     * stored. The capacity of the vector is the length of this array buffer,      * and is at least large enough to contain all the vector's elements.<p>     *     * Any array elements following the last element in the Vector are null.     */    private Message elementData[] = null;    /**     * The number of valid components in this <tt>MessageVector</tt> object.      */    private int elementCount = 0;    /**     * The actual item in this <tt>MessageVector</tt> object.     */    private int current = -1;    /**     * Constructs an empty vector with the specified initial capacity and     * capacity increment.      */    public MessageVector() {	this.elementData = new Message[20];    }    public synchronized Message nextMessage() {      logmon.log(BasicLevel.FATAL, getName() + ", nextMessage:" + toString());      if ((current +1) < elementCount)        return elementData[++current];      else        return null;    }    /**     * Returns the number of message in this vector.     *     * @return  the number of message in this vector.     */    public synchronized int size() {      return elementCount;    }    public synchronized void reset() {      current = -1;    }    /**     * Adds the specified component to the end of this vector,      * increasing its size by one. The capacity of this vector is      * increased if its size becomes greater than its capacity. <p>     *     * @param   msg   the component to be added.     */    public synchronized void addMessage(Message msg) {      logmon.log(BasicLevel.FATAL, getName() + ", addMessage:" + toString());      if ((elementCount + 1) > elementData.length) {        Message oldData[] = elementData;        elementData = new Message[elementData.length * 2];        System.arraycopy(oldData, 0, elementData, 0, elementCount);      }      elementData[elementCount++] = msg;    }    public synchronized void removeCurrent() {      logmon.log(BasicLevel.FATAL, getName() + ", removeCurrent:" + toString());      if (elementCount > (current +1)) {        System.arraycopy(elementData, current +1,                         elementData, current, elementCount - current -1);              }      elementData[elementCount-1] = null; /* to let gc do its work */      elementCount--;      current--;    }    /**     * Removes a message specified by its stamp. Only remove real message,     * this method don't touch to acknowledge message.     *     * @param   stamp   the stamp of the message to remove.     */    public synchronized Message removeMessage(int stamp) {      Message msg = null;      logmon.log(BasicLevel.FATAL, getName() + ", removeMessage:" + toString());      for (int index=0 ; index<elementCount ; index++) {        msg = elementData[index];        if ((msg.not != null) && (msg.getStamp() == stamp)) {          if (elementCount > (index +1)) {	    System.arraycopy(elementData, index +1,                             elementData, index, elementCount - index - 1);          }          elementData[elementCount-1] = null; /* to let gc do its work */          elementCount--;          // AF: To verify !!          if (index <=current) current--;                  return msg;        }      }      throw new NoSuchElementException();    }    public String toString() {      StringBuffer strbuf = new StringBuffer();      strbuf.append(super.toString());      strbuf.append(',').append(current);      strbuf.append(',').append(elementCount);      for (int i=0; i<elementCount; i++) {        strbuf.append(",(").append(elementData[i]).append(')');      }      return strbuf.toString();    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -