⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 agenteventhandler.java

📁 sea是一个基于seda模式的实现。这个设计模式将系统分为很多stage。每个stage分布不同的任务(基于线程池)。通过任务流的方式提高系统的效率。
💻 JAVA
字号:
/* * Copyright (c) 2003, The Regents of the University of California, through * Lawrence Berkeley National Laboratory (subject to receipt of any required * approvals from the U.S. Dept. of Energy). All rights reserved. */package gov.lbl.dsd.sea.nio;import gov.lbl.dsd.sea.EventHandler;import gov.lbl.dsd.sea.event.IllegalEventException;import gov.lbl.dsd.sea.nio.event.ChannelResponse;/**Abstract base class simplifying the implementation of eventhandlers sending requests to an agent, and receiving responses from the agent.<p>Override protected <code>onXYZ</code> event handling methods for application specific behaviour.<p>For example, a typical non-blocking agent event handler accumulates partial reads along the following lines:<pre>public class MyAgentEventHandler extends AgentEventHandler {	// simple variable msg size protocol: MESSAGE = [payLoadLength(4 bytes) payLoad]	private static final Charset CHARSET = Charset.forName("UTF-8");		protected void onAccepted(ChannelResponse.Accepted rsp) {		// prepare read buffer and register READ interest		rsp.getKey().attach(new gov.lbl.dsd.sea.nio.util.ArrayByteList());		rsp.getAgent().enqueue(				new ChannelRequest.Register(this.getStage(), rsp.getKey()						.channel(), SelectionKey.OP_READ));	}		protected void onRead(ChannelResponse.Read rsp) {		ArrayByteList readBuffer = (ArrayByteList) rsp.getKey().attachment();		readBuffer.add(rsp.getBuffer());		rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); // recycle buffer		while (readBuffer.size() >= 4) {			// message header containing payload length has arrived			int payloadLength = readBuffer.asByteBuffer().getInt(0);			if (4 + payloadLength > readBuffer.size()) {				break; // payload not yet fully received, wait for more data			}			else {				// we have received the entire variable length payload				String payload = readBuffer.toString(4, 4 + payloadLength, CHARSET);				readBuffer.remove(0, 4 + payloadLength); // remove header and payload								// do something useful with payload, here we just print it				System.out.println("payload=" + payload);			}		}	}		// A more efficient but less readable/understandable alternative:	protected void onRead(ChannelResponse.Read rsp) {		ArrayByteList readBuffer = (ArrayByteList) rsp.getKey().attachment();		readBuffer.add(rsp.getBuffer());		rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); // recycle buffer		int i = 0;		while (i + 4 <= readBuffer.size()) {			// message header containing payload length has arrived			int payloadLength = readBuffer.asByteBuffer().getInt(i);			if (i + 4 + payloadLength > readBuffer.size()) {				// payload not yet fully received - wait for more data				break; 			}			else {				i += 4;				// we have received the entire variable length payload				String payload = readBuffer.toString(i, i + payloadLength, CHARSET);				i += payLoadLength;								// do something useful with payload, here we just print it				System.out.println("payload=" + payload);			}		}		// remove fully processed headers and payloads, if any		readBuffer.remove(0, i); 	}		...}</pre>@author whoschek@lbl.gov@author $Author: hoschek3 $@version $Revision: 1.12 $, $Date: 2004/08/04 23:24:56 $*/public abstract class AgentEventHandler extends EventHandler {	private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(AgentEventHandler.class);	protected AgentEventHandler() {}		/** Called when an event has been received */	public void handle(Object event) {		if (log.isTraceEnabled()) log.trace("handling event=" + event);		if (event instanceof ChannelResponse) {			if (event instanceof ChannelResponse.Closed) { 				// handle without suppressing exception check				this.onClosed((ChannelResponse.Closed) event);			} else {								ChannelResponse rsp = (ChannelResponse) event;				if (rsp.getException() != null) {					// ignore and wait for autoclosing ChannelResponse.Closed;					// handle cleanup later in onClosed(...)					if (log.isDebugEnabled()) log.debug("Got exception response=" + rsp);				} else if (event instanceof ChannelResponse.Read) {					this.onRead((ChannelResponse.Read) event);				} else if (event instanceof ChannelResponse.Write) {					this.onWrite((ChannelResponse.Write) event);				} else if (event instanceof ChannelResponse.Registered) {					this.onRegistered((ChannelResponse.Registered) event);				} else if (event instanceof ChannelResponse.Accepted) {					this.onAccepted((ChannelResponse.Accepted) event);				} else if (event instanceof ChannelResponse.Connected) {					this.onConnected((ChannelResponse.Connected) event);				} else {					throw new IllegalEventException("Agent ERROR - should never happen", event, this.getStage());				}			}		} else {			this.onApplicationEvent(event);		}	}	/**	 * Called when an event other than a ChannelResponse (that is, an	 * application specific event) has been received; The default implementation	 * throws an exception.	 */	protected void onApplicationEvent(Object event) {		throw new IllegalEventException(event, this.getStage());	}	/**	 * Called when an accepted response has been received; that is, when the	 * (server) agent has accepted a new client channel (connection).	 */	abstract protected void onAccepted(ChannelResponse.Accepted response);	/**	 * Called when a closed response has been received; that is, when the agent	 * has closed a channel (connection).	 */	abstract protected void onClosed(ChannelResponse.Closed response);	/**	 * Called when a connected response has been received; that is, when the	 * (client) agent has connected a new channel (connection) to a remote	 * server.	 */	abstract protected void onConnected(ChannelResponse.Connected response);	/**	 * Called when a registered response has been received; that is, when the	 * agent has processed a {@link ChannelRequest.Registered} request.	 */	abstract protected void onRegistered(ChannelResponse.Registered response);	/**	 * Called when a read response has been received; that is, when the agent	 * has read data from a channel (connection).	 */	abstract protected void onRead(ChannelResponse.Read response);	/**	 * Called when a write response has been received; that is, when the agent	 * has written data to a channel (connection).	 */	abstract protected void onWrite(ChannelResponse.Write response);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -