📄 pubsubengine.java
字号:
/** * $RCSfile: $ * $Revision: $ * $Date: $ * * Copyright (C) 2006 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution. */package org.jivesoftware.wildfire.pubsub;import org.dom4j.DocumentHelper;import org.dom4j.Element;import org.dom4j.QName;import org.jivesoftware.util.LocaleUtils;import org.jivesoftware.util.Log;import org.jivesoftware.util.StringUtils;import org.jivesoftware.wildfire.PacketRouter;import org.jivesoftware.wildfire.XMPPServer;import org.jivesoftware.wildfire.XMPPServerListener;import org.jivesoftware.wildfire.commands.AdHocCommandManager;import org.jivesoftware.wildfire.pubsub.models.AccessModel;import org.jivesoftware.wildfire.user.UserManager;import org.xmpp.forms.DataForm;import org.xmpp.forms.FormField;import org.xmpp.packet.*;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.LinkedBlockingQueue;/** * A PubSubEngine is responsible for handling packets sent to the pub-sub service. * * @author Matt Tucker */public class PubSubEngine { private PubSubService service; /** * Manager that keeps the list of ad-hoc commands and processing command requests. */ private AdHocCommandManager manager; /** * Keep a registry of the presence's show value of users that subscribed to a node of * the pubsub service and for which the node only delivers notifications for online users * or node subscriptions deliver events based on the user presence show value. Offline * users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key * is full JID of connected resource and value is show value of the last received presence. */ private Map<String, Map<String, String>> barePresences = new ConcurrentHashMap<String, Map<String, String>>(); /** * The time to elapse between each execution of the maintenance process. Default * is 2 minutes. */ private int items_task_timeout = 2 * 60 * 1000; /** * The number of items to save on each run of the maintenance process. */ private int items_batch_size = 50; /** * Queue that holds the items that need to be added to the database. */ private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>(); /** * Queue that holds the items that need to be deleted from the database. */ private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>(); /** * Task that saves or deletes published items from the database. */ private PublishedItemTask publishedItemTask; /** * Timer to save published items to the database or remove deleted or old items. */ private Timer timer = new Timer("PubSub maintenance"); /** * The packet router for the server. */ private PacketRouter router = null; public PubSubEngine(PubSubService pubSubService, PacketRouter router) { this.service = pubSubService; this.router = router; // Initialize the ad-hoc commands manager to use for this pubsub service manager = new AdHocCommandManager(); manager.addCommand(new PendingSubscriptionsCommand(service)); // Save or delete published items from the database every 2 minutes starting in // 2 minutes (default values) publishedItemTask = new PublishedItemTask(); timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout); } /** * Handles IQ packets sent to the pubsub service. Requests of disco#info and disco#items * are not being handled by the engine. Instead the service itself should handle disco packets. * * @param iq the IQ packet sent to the pubsub service. * @return true if the IQ packet was handled by the engine. */ public boolean process(IQ iq) { // Ignore IQs of type ERROR or RESULT if (IQ.Type.error == iq.getType() || IQ.Type.result == iq.getType()) { return true; } Element childElement = iq.getChildElement(); String namespace = null; if (childElement != null) { namespace = childElement.getNamespaceURI(); } if ("http://jabber.org/protocol/pubsub".equals(namespace)) { Element action = childElement.element("publish"); if (action != null) { // Entity publishes an item publishItemsToNode(iq, action); return true; } action = childElement.element("subscribe"); if (action != null) { // Entity subscribes to a node subscribeNode(iq, childElement, action); return true; } action = childElement.element("options"); if (action != null) { if (IQ.Type.get == iq.getType()) { // Subscriber requests subscription options form getSubscriptionConfiguration(iq, childElement, action); } else { // Subscriber submits completed options form configureSubscription(iq, action); } return true; } action = childElement.element("create"); if (action != null) { // Entity is requesting to create a new node createNode(iq, childElement, action); return true; } action = childElement.element("unsubscribe"); if (action != null) { // Entity unsubscribes from a node unsubscribeNode(iq, action); return true; } action = childElement.element("subscriptions"); if (action != null) { // Entity requests all current subscriptions getSubscriptions(iq, childElement); return true; } action = childElement.element("affiliations"); if (action != null) { // Entity requests all current affiliations getAffiliations(iq, childElement); return true; } action = childElement.element("items"); if (action != null) { // Subscriber requests all active items getPublishedItems(iq, action); return true; } action = childElement.element("retract"); if (action != null) { // Entity deletes an item deleteItems(iq, action); return true; } // Unknown action requested sendErrorPacket(iq, PacketError.Condition.bad_request, null); return true; } else if ("http://jabber.org/protocol/pubsub#owner".equals(namespace)) { Element action = childElement.element("configure"); if (action != null) { String nodeID = action.attributeValue("node"); if (nodeID == null) { // if user is not sysadmin then return nodeid-required error if (!service.isServiceAdmin(iq.getFrom()) || !service.isCollectionNodesSupported()) { // Configure elements must have a node attribute so answer an error Element pubsubError = DocumentHelper.createElement(QName.get( "nodeid-required", "http://jabber.org/protocol/pubsub#errors")); sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError); return true; } else { // Sysadmin is trying to configure root collection node nodeID = service.getRootCollectionNode().getNodeID(); } } if (IQ.Type.get == iq.getType()) { // Owner requests configuration form of a node getNodeConfiguration(iq, childElement, nodeID); } else { // Owner submits or cancels node configuration form configureNode(iq, action, nodeID); } return true; } action = childElement.element("default"); if (action != null) { // Owner requests default configuration options for // leaf or collection nodes getDefaultNodeConfiguration(iq, childElement, action); return true; } action = childElement.element("delete"); if (action != null) { // Owner deletes a node deleteNode(iq, action); return true; } action = childElement.element("subscriptions"); if (action != null) { if (IQ.Type.get == iq.getType()) { // Owner requests all affiliated entities getNodeSubscriptions(iq, action); } else { modifyNodeSubscriptions(iq, action); } return true; } action = childElement.element("affiliations"); if (action != null) { if (IQ.Type.get == iq.getType()) { // Owner requests all affiliated entities getNodeAffiliations(iq, action); } else { modifyNodeAffiliations(iq, action); } return true; } action = childElement.element("purge"); if (action != null) { // Owner purges items from a node purgeNode(iq, action); return true; } // Unknown action requested so return error to sender sendErrorPacket(iq, PacketError.Condition.bad_request, null); return true; } else if ("http://jabber.org/protocol/commands".equals(namespace)) { // Process ad-hoc command IQ reply = manager.process(iq); router.route(reply); return true; } return false; } /** * Handles Presence packets sent to the pubsub service. Only process available and not * available presences. * * @param presence the Presence packet sent to the pubsub service. */ public void process(Presence presence) { if (presence.isAvailable()) { JID subscriber = presence.getFrom(); Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID()); if (fullPresences == null) { synchronized (subscriber.toBareJID().intern()) { fullPresences = barePresences.get(subscriber.toBareJID()); if (fullPresences == null) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -