memoryqueueservicefilter.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 395 行
JAVA
395 行
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT. See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * * Free Software Foundation, Inc. * 59 Temple Place, Suite 330 * Boston, MA 02111-1307 USA * * @author Scott Ferguson */package com.caucho.hemp.broker;import com.caucho.bam.BamError;import com.caucho.bam.BamService;import com.caucho.bam.BamStream;import com.caucho.config.ConfigException;import com.caucho.hmtp.*;import com.caucho.server.resin.*;import com.caucho.server.util.*;import com.caucho.util.*;import java.util.*;import java.util.concurrent.*;import java.util.logging.*;import java.lang.ref.*;import java.io.Serializable;/** * Queue of hmtp packets */public class MemoryQueueServiceFilter implements BamService, Runnable{ private static final Logger log = Logger.getLogger(MemoryQueueServiceFilter.class.getName()); private static final L10N L = new L10N(MemoryQueueServiceFilter.class); private final Executor _executor = ScheduledThreadPool.getLocal(); private final ClassLoader _loader = Thread.currentThread().getContextClassLoader(); private final BamService _service; private final BamStream _brokerStream; private int _threadSemaphore; private Packet []_queue = new Packet[32]; private int _head; private int _tail; public MemoryQueueServiceFilter(BamService service, BamStream brokerStream, int threadMax) { if (service == null) throw new NullPointerException(); if (brokerStream == null) throw new NullPointerException(); _service = service; _brokerStream = brokerStream; _threadSemaphore = threadMax; } /** * Returns the service's jid */ public String getJid() { return _service.getJid(); } /** * Returns the service's jid */ public void setJid(String jid) { } /** * Sends a message */ public void message(String to, String from, Serializable value) { enqueue(new Message(to, from, value)); } /** * Sends a message */ public void messageError(String to, String from, Serializable value, BamError error) { enqueue(new MessageError(to, from, value, error)); } /** * Query an entity */ public boolean queryGet(long id, String to, String from, Serializable query) { enqueue(new QueryGet(id, to, from, query)); return true; } /** * Query an entity */ public boolean querySet(long id, String to, String from, Serializable query) { enqueue(new QuerySet(id, to, from, query)); return true; } /** * Query an entity */ public void queryResult(long id, String to, String from, Serializable value) { enqueue(new QueryResult(id, to, from, value)); } /** * Query an entity */ public void queryError(long id, String to, String from, Serializable query, BamError error) { enqueue(new QueryError(id, to, from, query, error)); } /** * Presence */ public void presence(String to, String from, Serializable data) { enqueue(new Presence(to, from, data)); } /** * Presence unavailable */ public void presenceUnavailable(String to, String from, Serializable data) { enqueue(new PresenceUnavailable(to, from, data)); } /** * Presence probe */ public void presenceProbe(String to, String from, Serializable data) { enqueue(new PresenceProbe(to, from, data)); } /** * Presence subscribe */ public void presenceSubscribe(String to, String from, Serializable data) { enqueue(new PresenceSubscribe(to, from, data)); } /** * Presence subscribed */ public void presenceSubscribed(String to, String from, Serializable data) { enqueue(new PresenceSubscribed(to, from, data)); } /** * Presence unsubscribe */ public void presenceUnsubscribe(String to, String from, Serializable data) { enqueue(new PresenceUnsubscribe(to, from, data)); } /** * Presence unsubscribed */ public void presenceUnsubscribed(String to, String from, Serializable data) { enqueue(new PresenceUnsubscribed(to, from, data)); } /** * Presence error */ public void presenceError(String to, String from, Serializable data, BamError error) { enqueue(new PresenceError(to, from, data, error)); } public BamStream getAgentStream() { return _service.getAgentStream(); } /** * Requests that an agent with the given jid be started. */ public boolean startAgent(String jid) { return _service.startAgent(jid); } /** * Requests that an agent with the given jid be stopped. */ public boolean stopAgent(String jid) { return _service.stopAgent(jid); } /** * Called when an agent logs in */ public void onAgentStart(String jid) { _service.onAgentStart(jid); } /** * Called when an agent logs out */ public void onAgentStop(String jid) { _service.onAgentStop(jid); } /** * Returns a filter for outbound calls, i.e. filtering messages to the agent. */ public BamStream getAgentFilter(BamStream stream) { return _service.getAgentFilter(stream); } /** * Returns a filter for inbound calls, i.e. filtering messages to the broker * from the agent. */ public BamStream getBrokerFilter(BamStream stream) { return _service.getBrokerFilter(stream); } protected void enqueue(Packet packet) { if (log.isLoggable(Level.FINER)) { int size = (_head - _tail + _queue.length) % _queue.length; log.finer(this + " enqueue(" + size + ") " + packet); } boolean isStartThread = false; synchronized (this) { int next = (_head + 1) % _queue.length; if (next == _tail) { Packet []extQueue = new Packet[_queue.length + 32]; int i = 0; for (int tail = _tail; tail != _head; tail = (tail + 1) % _queue.length) { extQueue[i++] = _queue[tail]; } _queue = extQueue; _tail = 0; _head = i; next = _head + 1; } _queue[_head] = packet; _head = next; if (_threadSemaphore > 0) { _threadSemaphore--; isStartThread = true; } } if (isStartThread) { _executor.execute(this); } } protected Packet dequeue() { synchronized (this) { if (_head == _tail) { _threadSemaphore++; return null; } Packet packet = _queue[_tail]; _tail = (_tail + 1) % _queue.length; return packet; } } public void run() { Packet packet; Thread.currentThread().setContextClassLoader(_loader); while ((packet = dequeue()) != null) { boolean isValid = false; try { if (log.isLoggable(Level.FINER)) log.finer(this + " dequeue " + packet); packet.dispatch(getAgentStream(), _brokerStream); isValid = true; } catch (Exception e) { log.log(Level.WARNING, e.toString(), e); isValid = true; } finally { // fix semaphore in case of thread death if (! isValid) { synchronized (this) { _threadSemaphore++; } } } } } @Override public String toString() { return getClass().getSimpleName() + "[" + _service + "]"; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?