📄 agenteventhandler.java
字号:
/* * Copyright (c) 2003, The Regents of the University of California, through * Lawrence Berkeley National Laboratory (subject to receipt of any required * approvals from the U.S. Dept. of Energy). All rights reserved. */package gov.lbl.dsd.sea.nio;import gov.lbl.dsd.sea.EventHandler;import gov.lbl.dsd.sea.event.IllegalEventException;import gov.lbl.dsd.sea.nio.event.ChannelResponse;/**Abstract base class simplifying the implementation of eventhandlers sending requests to an agent, and receiving responses from the agent.<p>Override protected <code>onXYZ</code> event handling methods for application specific behaviour.<p>For example, a typical non-blocking agent event handler accumulates partial reads along the following lines:<pre>public class MyAgentEventHandler extends AgentEventHandler { // simple variable msg size protocol: MESSAGE = [payLoadLength(4 bytes) payLoad] private static final Charset CHARSET = Charset.forName("UTF-8"); protected void onAccepted(ChannelResponse.Accepted rsp) { // prepare read buffer and register READ interest rsp.getKey().attach(new gov.lbl.dsd.sea.nio.util.ArrayByteList()); rsp.getAgent().enqueue( new ChannelRequest.Register(this.getStage(), rsp.getKey() .channel(), SelectionKey.OP_READ)); } protected void onRead(ChannelResponse.Read rsp) { ArrayByteList readBuffer = (ArrayByteList) rsp.getKey().attachment(); readBuffer.add(rsp.getBuffer()); rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); // recycle buffer while (readBuffer.size() >= 4) { // message header containing payload length has arrived int payloadLength = readBuffer.asByteBuffer().getInt(0); if (4 + payloadLength > readBuffer.size()) { break; // payload not yet fully received, wait for more data } else { // we have received the entire variable length payload String payload = readBuffer.toString(4, 4 + payloadLength, CHARSET); readBuffer.remove(0, 4 + payloadLength); // remove header and payload // do something useful with payload, here we just print it System.out.println("payload=" + payload); } } } // A more efficient but less readable/understandable alternative: protected void onRead(ChannelResponse.Read rsp) { ArrayByteList readBuffer = (ArrayByteList) rsp.getKey().attachment(); readBuffer.add(rsp.getBuffer()); rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); // recycle buffer int i = 0; while (i + 4 <= readBuffer.size()) { // message header containing payload length has arrived int payloadLength = readBuffer.asByteBuffer().getInt(i); if (i + 4 + payloadLength > readBuffer.size()) { // payload not yet fully received - wait for more data break; } else { i += 4; // we have received the entire variable length payload String payload = readBuffer.toString(i, i + payloadLength, CHARSET); i += payLoadLength; // do something useful with payload, here we just print it System.out.println("payload=" + payload); } } // remove fully processed headers and payloads, if any readBuffer.remove(0, i); } ...}</pre>@author whoschek@lbl.gov@author $Author: hoschek3 $@version $Revision: 1.12 $, $Date: 2004/08/04 23:24:56 $*/public abstract class AgentEventHandler extends EventHandler { private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(AgentEventHandler.class); protected AgentEventHandler() {} /** Called when an event has been received */ public void handle(Object event) { if (log.isTraceEnabled()) log.trace("handling event=" + event); if (event instanceof ChannelResponse) { if (event instanceof ChannelResponse.Closed) { // handle without suppressing exception check this.onClosed((ChannelResponse.Closed) event); } else { ChannelResponse rsp = (ChannelResponse) event; if (rsp.getException() != null) { // ignore and wait for autoclosing ChannelResponse.Closed; // handle cleanup later in onClosed(...) if (log.isDebugEnabled()) log.debug("Got exception response=" + rsp); } else if (event instanceof ChannelResponse.Read) { this.onRead((ChannelResponse.Read) event); } else if (event instanceof ChannelResponse.Write) { this.onWrite((ChannelResponse.Write) event); } else if (event instanceof ChannelResponse.Registered) { this.onRegistered((ChannelResponse.Registered) event); } else if (event instanceof ChannelResponse.Accepted) { this.onAccepted((ChannelResponse.Accepted) event); } else if (event instanceof ChannelResponse.Connected) { this.onConnected((ChannelResponse.Connected) event); } else { throw new IllegalEventException("Agent ERROR - should never happen", event, this.getStage()); } } } else { this.onApplicationEvent(event); } } /** * Called when an event other than a ChannelResponse (that is, an * application specific event) has been received; The default implementation * throws an exception. */ protected void onApplicationEvent(Object event) { throw new IllegalEventException(event, this.getStage()); } /** * Called when an accepted response has been received; that is, when the * (server) agent has accepted a new client channel (connection). */ abstract protected void onAccepted(ChannelResponse.Accepted response); /** * Called when a closed response has been received; that is, when the agent * has closed a channel (connection). */ abstract protected void onClosed(ChannelResponse.Closed response); /** * Called when a connected response has been received; that is, when the * (client) agent has connected a new channel (connection) to a remote * server. */ abstract protected void onConnected(ChannelResponse.Connected response); /** * Called when a registered response has been received; that is, when the * agent has processed a {@link ChannelRequest.Registered} request. */ abstract protected void onRegistered(ChannelResponse.Registered response); /** * Called when a read response has been received; that is, when the agent * has read data from a channel (connection). */ abstract protected void onRead(ChannelResponse.Read response); /** * Called when a write response has been received; that is, when the agent * has written data to a channel (connection). */ abstract protected void onWrite(ChannelResponse.Write response);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -