📄 ngnetwork.java
字号:
// // end of message sending, if it is an acknowledge remove it// // from sendlist else wait for ack.// if (sendlist.currentMessage().not == null) {// logmon.log(BasicLevel.DEBUG, getName() + " remove ack sent");// sendlist.removeCurrent();// }// } Message msg = sendlist.nextMessage(); if (msg == null) { bufout = null; // There is no more message to send, unsubscribe this channel // for write operation. if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " write-4x:" + msg); channel.register(selector, SelectionKey.OP_READ, this); } else { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " write-4:" + msg); mos.writeMessage(msg); if (msg.not == null) { logmon.log(BasicLevel.DEBUG, getName() + " remove ack sent"); sendlist.removeCurrent(); } } } } } /** * Method called each time the channel is Readable */ private synchronized void read() throws Exception { int bytes = channel.read(bufin); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " reads: " + bytes); if (bytes == 0) return; if (bytes < 0) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + " cnx remotely closed"); close(); return; } bufin.flip(); while (bytes > 0) {// logmon.log(BasicLevel.FATAL,// "mis.getBuffer()=" + mis.getBuffer().length);// logmon.log(BasicLevel.FATAL,// "mis.getCount()=" + mis.getCount());// logmon.log(BasicLevel.FATAL,// "mis.length=" + mis.length);// logmon.log(BasicLevel.FATAL,// "bytes=" + bytes); if (mis.length == -1) {// if (mis.msg == null) { // Reads the message header. if ((mis.getCount() + bytes) < 28) { bufin.get(mis.getBuffer(), mis.getCount(), bytes); mis.setCount(mis.getCount() + bytes); bytes = 0; } else { bufin.get(mis.getBuffer(), mis.getCount(), 28-mis.getCount()); bytes -= 28-mis.getCount(); mis.setCount(28); Message msg = mis.readHeader(); if (mis.length == 28) { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", ack received #" + msg.stamp);// logmon.log(BasicLevel.FATAL, msg.toString()); doAck(msg.stamp); msg.free(); // Reset data structures for next messages mis.length = -1; mis.msg = null; mis.setCount(0); }// // Reads the message length// try {// for (; nbread <28; nbread++) {// bufin.get(mis.buf, nbread, 28-nbread);// bytes -= 1;// }// if (logmon.isLoggable(BasicLevel.DEBUG))// logmon.log(BasicLevel.DEBUG, getName() + " get length: " + length);// // Allocates byte array for message storage// array = new byte[length];// nbread = 0;// } catch (BufferUnderflowException exc) {// break; } } else { // The message header is read, reads the notification if any. if ((mis.getCount() + bytes) < (mis.length-28)) { bufin.get(mis.getBuffer(), mis.getCount(), bytes); mis.setCount(mis.getCount() + bytes); bytes = 0; } else { bufin.get(mis.getBuffer(), mis.getCount(), mis.length-28-mis.getCount()); bytes -= mis.length-28-mis.getCount(); mis.setCount(mis.length-28); Message msg = mis.readMessage(); // Keep message stamp in order to acknowledge it (be careful, // the message get a new stamp to be delivered). int stamp = msg.getStamp(); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", message received #" + stamp);// logmon.log(BasicLevel.FATAL, msg.toString()); deliver(msg); ack(stamp); // Reset data structures for next messages mis.length = -1; mis.msg = null; mis.setCount(0); } } } bufin.clear(); } /** * Class used to read messages through a stream. */ final class MessageInputStream extends ByteArrayInputStream { int length = -1; Message msg = null; MessageInputStream() { super(new byte[512]); count = 0; } public void reset() { super.reset(); length = -1; msg = null; } byte[] getBuffer() { return buf; } int getCount() { return count; } void setCount(int count) { this.count = count; } Message readHeader() throws Exception { // Reads boot timestamp of source server length = ((buf[0] & 0xFF) << 24) + ((buf[1] & 0xFF) << 16) + ((buf[2] & 0xFF) << 8) + ((buf[3] & 0xFF) << 0); msg = Message.alloc(); // Reads sender's AgentId msg.from = new AgentId( (short) (((buf[4] & 0xFF) << 8) + (buf[5] & 0xFF)), (short) (((buf[6] & 0xFF) << 8) + (buf[7] & 0xFF)), ((buf[8] & 0xFF) << 24) + ((buf[9] & 0xFF) << 16) + ((buf[10] & 0xFF) << 8) + ((buf[11] & 0xFF) << 0)); // Reads adressee's AgentId msg.to = new AgentId( (short) (((buf[12] & 0xFF) << 8) + (buf[13] & 0xFF)), (short) (((buf[14] & 0xFF) << 8) + (buf[15] & 0xFF)), ((buf[16] & 0xFF) << 24) + ((buf[17] & 0xFF) << 16) + ((buf[18] & 0xFF) << 8) + ((buf[19] & 0xFF) << 0)); // Reads source server id of message msg.source = (short) (((buf[20] & 0xFF) << 8) + ((buf[21] & 0xFF) << 0)); // Reads destination server id of message msg.dest = (short) (((buf[22] & 0xFF) << 8) + ((buf[23] & 0xFF) << 0)); // Reads stamp of message msg.stamp = ((buf[24] & 0xFF) << 24) + ((buf[25] & 0xFF) << 16) + ((buf[26] & 0xFF) << 8) + ((buf[27] & 0xFF) << 0); if ((length -28) > buf.length) buf = new byte[length -28]; count = 0; return msg; } Message readMessage() throws Exception { if (length > 28) { // Reads notification attributes boolean persistent = ((buf[28] & 0x01) == 0x01) ? true : false; boolean detachable = ((buf[28] & 0x10) == 0x10) ? true : false; pos = 1; // Reads notification object ObjectInputStream ois = new ObjectInputStream(this); msg.not = (Notification) ois.readObject(); msg.not.persistent = persistent; msg.not.detachable = detachable; msg.not.detached = false; } else { msg.not = null; } return msg; } } /** * Removes all messages in sendList previous to the ack'ed one. * Be careful, messages in sendList are not always in stamp order. * Its method should not be synchronized, it scans the list from * begin to end, and it removes always the first element. Other * methods using sendList just adds element at the end. */ final private void doAck(int ack) throws IOException { Message msg = null; try { // Suppress the acknowledged notification from waiting list, // and deletes it. msg = sendlist.removeMessage(ack); AgentServer.getTransaction().begin(); msg.delete(); msg.free(); AgentServer.getTransaction().commit(); AgentServer.getTransaction().release(); if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", remove msg#" + msg.getStamp()); } catch (NoSuchElementException exc) { logmon.log(BasicLevel.WARN, getName() + ", can't ack, unknown msg#" + ack); } } final private void ack(int stamp) throws Exception { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", set ack msg#" + stamp); Message ack = Message.alloc(AgentId.localId, AgentId.localId(server.sid), null); ack.source = AgentServer.getServerId(); ack.dest = AgentServer.getServerDesc(server.sid).gateway; ack.stamp = stamp; send(ack); } void close() throws IOException { if (logmon.isLoggable(BasicLevel.DEBUG)) logmon.log(BasicLevel.DEBUG, getName() + ", close"); try { channel.keyFor(selector).cancel(); } catch (Exception exc) {} try { channel.close(); } catch (Exception exc) { // } finally { channel = null; } nbwrite = 0; bufout = null; } public String toString() { StringBuffer strbuf = new StringBuffer(); strbuf.append('(').append(super.toString()); strbuf.append(',').append(name); strbuf.append(',').append(channel); strbuf.append(',').append(nbwrite); strbuf.append(',').append(sendlist).append(')'); return strbuf.toString(); } } final class MessageVector { /** * The array buffer into which the components of the vector are * stored. The capacity of the vector is the length of this array buffer, * and is at least large enough to contain all the vector's elements.<p> * * Any array elements following the last element in the Vector are null. */ private Message elementData[] = null; /** * The number of valid components in this <tt>MessageVector</tt> object. */ private int elementCount = 0; /** * The actual item in this <tt>MessageVector</tt> object. */ private int current = -1; /** * Constructs an empty vector with the specified initial capacity and * capacity increment. */ public MessageVector() { this.elementData = new Message[20]; } public synchronized Message nextMessage() { logmon.log(BasicLevel.FATAL, getName() + ", nextMessage:" + toString()); if ((current +1) < elementCount) return elementData[++current]; else return null; } /** * Returns the number of message in this vector. * * @return the number of message in this vector. */ public synchronized int size() { return elementCount; } public synchronized void reset() { current = -1; } /** * Adds the specified component to the end of this vector, * increasing its size by one. The capacity of this vector is * increased if its size becomes greater than its capacity. <p> * * @param msg the component to be added. */ public synchronized void addMessage(Message msg) { logmon.log(BasicLevel.FATAL, getName() + ", addMessage:" + toString()); if ((elementCount + 1) > elementData.length) { Message oldData[] = elementData; elementData = new Message[elementData.length * 2]; System.arraycopy(oldData, 0, elementData, 0, elementCount); } elementData[elementCount++] = msg; } public synchronized void removeCurrent() { logmon.log(BasicLevel.FATAL, getName() + ", removeCurrent:" + toString()); if (elementCount > (current +1)) { System.arraycopy(elementData, current +1, elementData, current, elementCount - current -1); } elementData[elementCount-1] = null; /* to let gc do its work */ elementCount--; current--; } /** * Removes a message specified by its stamp. Only remove real message, * this method don't touch to acknowledge message. * * @param stamp the stamp of the message to remove. */ public synchronized Message removeMessage(int stamp) { Message msg = null; logmon.log(BasicLevel.FATAL, getName() + ", removeMessage:" + toString()); for (int index=0 ; index<elementCount ; index++) { msg = elementData[index]; if ((msg.not != null) && (msg.getStamp() == stamp)) { if (elementCount > (index +1)) { System.arraycopy(elementData, index +1, elementData, index, elementCount - index - 1); } elementData[elementCount-1] = null; /* to let gc do its work */ elementCount--; // AF: To verify !! if (index <=current) current--; return msg; } } throw new NoSuchElementException(); } public String toString() { StringBuffer strbuf = new StringBuffer(); strbuf.append(super.toString()); strbuf.append(',').append(current); strbuf.append(',').append(elementCount); for (int i=0; i<elementCount; i++) { strbuf.append(",(").append(elementData[i]).append(')'); } return strbuf.toString(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -