📄 pubsubengine.java
字号:
/** * $RCSfile: $ * $Revision: $ * $Date: $ * * Copyright (C) 2008 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution, or a commercial license * agreement with Jive. */package org.jivesoftware.openfire.pubsub;import org.dom4j.DocumentHelper;import org.dom4j.Element;import org.dom4j.QName;import org.jivesoftware.openfire.PacketRouter;import org.jivesoftware.openfire.XMPPServer;import org.jivesoftware.openfire.XMPPServerListener;import org.jivesoftware.openfire.pubsub.models.AccessModel;import org.jivesoftware.openfire.user.UserManager;import org.jivesoftware.util.Log;import org.jivesoftware.util.StringUtils;import org.xmpp.forms.DataForm;import org.xmpp.forms.FormField;import org.xmpp.packet.*;import java.util.*;import java.util.concurrent.ConcurrentHashMap;/** * A PubSubEngine is responsible for handling packets sent to a pub-sub service. * * @author Matt Tucker */public class PubSubEngine { /** * The packet router for the server. */ private PacketRouter router = null; public PubSubEngine(PacketRouter router) { this.router = router; } /** * Handles IQ packets sent to the pubsub service. Requests of disco#info and disco#items * are not being handled by the engine. Instead the service itself should handle disco packets. * * @param service the PubSub service this action is to be performed for. * @param iq the IQ packet sent to the pubsub service. * @return true if the IQ packet was handled by the engine. */ public boolean process(PubSubService service, IQ iq) { // Ignore IQs of type ERROR or RESULT if (IQ.Type.error == iq.getType() || IQ.Type.result == iq.getType()) { return true; } Element childElement = iq.getChildElement(); String namespace = null; if (childElement != null) { namespace = childElement.getNamespaceURI(); } if ("http://jabber.org/protocol/pubsub".equals(namespace)) { Element action = childElement.element("publish"); if (action != null) { // Entity publishes an item publishItemsToNode(service, iq, action); return true; } action = childElement.element("subscribe"); if (action != null) { // Entity subscribes to a node subscribeNode(service, iq, childElement, action); return true; } action = childElement.element("options"); if (action != null) { if (IQ.Type.get == iq.getType()) { // Subscriber requests subscription options form getSubscriptionConfiguration(service, iq, childElement, action); } else { // Subscriber submits completed options form configureSubscription(service, iq, action); } return true; } action = childElement.element("create"); if (action != null) { // Entity is requesting to create a new node createNode(service, iq, childElement, action); return true; } action = childElement.element("unsubscribe"); if (action != null) { // Entity unsubscribes from a node unsubscribeNode(service, iq, action); return true; } action = childElement.element("subscriptions"); if (action != null) { // Entity requests all current subscriptions getSubscriptions(service, iq, childElement); return true; } action = childElement.element("affiliations"); if (action != null) { // Entity requests all current affiliations getAffiliations(service, iq, childElement); return true; } action = childElement.element("items"); if (action != null) { // Subscriber requests all active items getPublishedItems(service, iq, action); return true; } action = childElement.element("retract"); if (action != null) { // Entity deletes an item deleteItems(service, iq, action); return true; } // Unknown action requested sendErrorPacket(iq, PacketError.Condition.bad_request, null); return true; } else if ("http://jabber.org/protocol/pubsub#owner".equals(namespace)) { Element action = childElement.element("configure"); if (action != null) { String nodeID = action.attributeValue("node"); if (nodeID == null) { // if user is not sysadmin then return nodeid-required error if (!service.isServiceAdmin(iq.getFrom()) || !service.isCollectionNodesSupported()) { // Configure elements must have a node attribute so answer an error Element pubsubError = DocumentHelper.createElement(QName.get( "nodeid-required", "http://jabber.org/protocol/pubsub#errors")); sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError); return true; } else { // Sysadmin is trying to configure root collection node nodeID = service.getRootCollectionNode().getNodeID(); } } if (IQ.Type.get == iq.getType()) { // Owner requests configuration form of a node getNodeConfiguration(service, iq, childElement, nodeID); } else { // Owner submits or cancels node configuration form configureNode(service, iq, action, nodeID); } return true; } action = childElement.element("default"); if (action != null) { // Owner requests default configuration options for // leaf or collection nodes getDefaultNodeConfiguration(service, iq, childElement, action); return true; } action = childElement.element("delete"); if (action != null) { // Owner deletes a node deleteNode(service, iq, action); return true; } action = childElement.element("subscriptions"); if (action != null) { if (IQ.Type.get == iq.getType()) { // Owner requests all affiliated entities getNodeSubscriptions(service, iq, action); } else { modifyNodeSubscriptions(service, iq, action); } return true; } action = childElement.element("affiliations"); if (action != null) { if (IQ.Type.get == iq.getType()) { // Owner requests all affiliated entities getNodeAffiliations(service, iq, action); } else { modifyNodeAffiliations(service, iq, action); } return true; } action = childElement.element("purge"); if (action != null) { // Owner purges items from a node purgeNode(service, iq, action); return true; } // Unknown action requested so return error to sender sendErrorPacket(iq, PacketError.Condition.bad_request, null); return true; } else if ("http://jabber.org/protocol/commands".equals(namespace)) { // Process ad-hoc command IQ reply = service.getManager().process(iq); router.route(reply); return true; } return false; } /** * Handles Presence packets sent to the pubsub service. Only process available and not * available presences. * * @param service the PubSub service this action is to be performed for. * @param presence the Presence packet sent to the pubsub service. */ public void process(PubSubService service, Presence presence) { if (presence.isAvailable()) { JID subscriber = presence.getFrom(); Map<String, String> fullPresences = service.getBarePresences().get(subscriber.toBareJID()); if (fullPresences == null) { synchronized (subscriber.toBareJID().intern()) { fullPresences = service.getBarePresences().get(subscriber.toBareJID()); if (fullPresences == null) { fullPresences = new ConcurrentHashMap<String, String>(); service.getBarePresences().put(subscriber.toBareJID(), fullPresences); } } } Presence.Show show = presence.getShow(); fullPresences.put(subscriber.toString(), show == null ? "online" : show.name()); } else if (presence.getType() == Presence.Type.unavailable) { JID subscriber = presence.getFrom(); Map<String, String> fullPresences = service.getBarePresences().get(subscriber.toBareJID()); if (fullPresences != null) { fullPresences.remove(subscriber.toString()); if (fullPresences.isEmpty()) { service.getBarePresences().remove(subscriber.toBareJID()); } } } } /** * Handles Message packets sent to the pubsub service. Messages may be of type error * when an event notification was sent to a susbcriber whose address is no longer available.<p> * * Answers to authorization requests sent to node owners to approve pending subscriptions * will also be processed by this method. * * @param service the PubSub service this action is to be performed for. * @param message the Message packet sent to the pubsub service. */ public void process(PubSubService service, Message message) { if (message.getType() == Message.Type.error) { // Process Messages of type error to identify possible subscribers that no longer exist if (message.getError().getType() == PacketError.Type.cancel) { // TODO Assuming that owner is the bare JID (as defined in the JEP). This can be replaced with an explicit owner specified in the packet JID owner = new JID(message.getFrom().toBareJID()); // Terminate the subscription of the entity to all nodes hosted at the service cancelAllSubscriptions(service, owner); } else if (message.getError().getType() == PacketError.Type.auth) { // TODO Queue the message to be sent again later (will retry a few times and // will be discarded when the retry limit is reached) } } else if (message.getType() == Message.Type.normal) { // Check that this is an answer to an authorization request DataForm authForm = (DataForm) message.getExtension("x", "jabber:x:data");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -