⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pubsubengine.java

📁 基于Jabber协议的即时消息服务器
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
/** * $RCSfile: $ * $Revision: $ * $Date: $ * * Copyright (C) 2006 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution. */package org.jivesoftware.wildfire.pubsub;import org.dom4j.DocumentHelper;import org.dom4j.Element;import org.dom4j.QName;import org.jivesoftware.util.LocaleUtils;import org.jivesoftware.util.Log;import org.jivesoftware.util.StringUtils;import org.jivesoftware.wildfire.PacketRouter;import org.jivesoftware.wildfire.XMPPServer;import org.jivesoftware.wildfire.XMPPServerListener;import org.jivesoftware.wildfire.commands.AdHocCommandManager;import org.jivesoftware.wildfire.pubsub.models.AccessModel;import org.jivesoftware.wildfire.user.UserManager;import org.xmpp.forms.DataForm;import org.xmpp.forms.FormField;import org.xmpp.packet.*;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.LinkedBlockingQueue;/** * A PubSubEngine is responsible for handling packets sent to the pub-sub service. * * @author Matt Tucker */public class PubSubEngine {    private PubSubService service;    /**     * Manager that keeps the list of ad-hoc commands and processing command requests.     */    private AdHocCommandManager manager;    /**     * Keep a registry of the presence's show value of users that subscribed to a node of     * the pubsub service and for which the node only delivers notifications for online users     * or node subscriptions deliver events based on the user presence show value. Offline     * users will not have an entry in the map. Note: Key-> bare JID and Value-> Map whose key     * is full JID of connected resource and value is show value of the last received presence.     */    private Map<String, Map<String, String>> barePresences =            new ConcurrentHashMap<String, Map<String, String>>();    /**     * The time to elapse between each execution of the maintenance process. Default     * is 2 minutes.     */    private int items_task_timeout = 2 * 60 * 1000;    /**     * The number of items to save on each run of the maintenance process.     */    private int items_batch_size = 50;    /**     * Queue that holds the items that need to be added to the database.     */    private Queue<PublishedItem> itemsToAdd = new LinkedBlockingQueue<PublishedItem>();    /**     * Queue that holds the items that need to be deleted from the database.     */    private Queue<PublishedItem> itemsToDelete = new LinkedBlockingQueue<PublishedItem>();    /**     * Task that saves or deletes published items from the database.     */    private PublishedItemTask publishedItemTask;    /**     * Timer to save published items to the database or remove deleted or old items.     */    private Timer timer = new Timer("PubSub maintenance");    /**     * The packet router for the server.     */    private PacketRouter router = null;    public PubSubEngine(PubSubService pubSubService, PacketRouter router) {        this.service = pubSubService;        this.router = router;        // Initialize the ad-hoc commands manager to use for this pubsub service        manager = new AdHocCommandManager();        manager.addCommand(new PendingSubscriptionsCommand(service));        // Save or delete published items from the database every 2 minutes starting in        // 2 minutes (default values)        publishedItemTask = new PublishedItemTask();        timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);    }    /**     * Handles IQ packets sent to the pubsub service. Requests of disco#info and disco#items     * are not being handled by the engine. Instead the service itself should handle disco packets.     *     * @param iq the IQ packet sent to the pubsub service.     * @return true if the IQ packet was handled by the engine.     */    public boolean process(IQ iq) {        // Ignore IQs of type ERROR or RESULT        if (IQ.Type.error == iq.getType() || IQ.Type.result == iq.getType()) {            return true;        }        Element childElement = iq.getChildElement();        String namespace = null;        if (childElement != null) {            namespace = childElement.getNamespaceURI();        }        if ("http://jabber.org/protocol/pubsub".equals(namespace)) {            Element action = childElement.element("publish");            if (action != null) {                // Entity publishes an item                publishItemsToNode(iq, action);                return true;            }            action = childElement.element("subscribe");            if (action != null) {                // Entity subscribes to a node                subscribeNode(iq, childElement, action);                return true;            }            action = childElement.element("options");            if (action != null) {                if (IQ.Type.get == iq.getType()) {                    // Subscriber requests subscription options form                    getSubscriptionConfiguration(iq, childElement, action);                }                else {                    // Subscriber submits completed options form                    configureSubscription(iq, action);                }                return true;            }            action = childElement.element("create");            if (action != null) {                // Entity is requesting to create a new node                createNode(iq, childElement, action);                return true;            }            action = childElement.element("unsubscribe");            if (action != null) {                // Entity unsubscribes from a node                unsubscribeNode(iq, action);                return true;            }            action = childElement.element("subscriptions");            if (action != null) {                // Entity requests all current subscriptions                getSubscriptions(iq, childElement);                return true;            }            action = childElement.element("affiliations");            if (action != null) {                // Entity requests all current affiliations                getAffiliations(iq, childElement);                return true;            }            action = childElement.element("items");            if (action != null) {                // Subscriber requests all active items                getPublishedItems(iq, action);                return true;            }            action = childElement.element("retract");            if (action != null) {                // Entity deletes an item                deleteItems(iq, action);                return true;            }            // Unknown action requested            sendErrorPacket(iq, PacketError.Condition.bad_request, null);            return true;        }        else if ("http://jabber.org/protocol/pubsub#owner".equals(namespace)) {            Element action = childElement.element("configure");            if (action != null) {                String nodeID = action.attributeValue("node");                if (nodeID == null) {                    // if user is not sysadmin then return nodeid-required error                    if (!service.isServiceAdmin(iq.getFrom()) ||                            !service.isCollectionNodesSupported()) {                        // Configure elements must have a node attribute so answer an error                        Element pubsubError = DocumentHelper.createElement(QName.get(                                "nodeid-required", "http://jabber.org/protocol/pubsub#errors"));                        sendErrorPacket(iq, PacketError.Condition.bad_request, pubsubError);                        return true;                    }                    else {                        // Sysadmin is trying to configure root collection node                        nodeID = service.getRootCollectionNode().getNodeID();                    }                }                if (IQ.Type.get == iq.getType()) {                    // Owner requests configuration form of a node                    getNodeConfiguration(iq, childElement, nodeID);                }                else {                    // Owner submits or cancels node configuration form                    configureNode(iq, action, nodeID);                }                return true;            }            action = childElement.element("default");            if (action != null) {                // Owner requests default configuration options for                // leaf or collection nodes                getDefaultNodeConfiguration(iq, childElement, action);                return true;            }            action = childElement.element("delete");            if (action != null) {                // Owner deletes a node                deleteNode(iq, action);                return true;            }            action = childElement.element("subscriptions");            if (action != null) {                if (IQ.Type.get == iq.getType()) {                    // Owner requests all affiliated entities                    getNodeSubscriptions(iq, action);                }                else {                    modifyNodeSubscriptions(iq, action);                }                return true;            }            action = childElement.element("affiliations");            if (action != null) {                if (IQ.Type.get == iq.getType()) {                    // Owner requests all affiliated entities                    getNodeAffiliations(iq, action);                }                else {                    modifyNodeAffiliations(iq, action);                }                return true;            }            action = childElement.element("purge");            if (action != null) {                // Owner purges items from a node                purgeNode(iq, action);                return true;            }            // Unknown action requested so return error to sender            sendErrorPacket(iq, PacketError.Condition.bad_request, null);            return true;        }        else if ("http://jabber.org/protocol/commands".equals(namespace)) {            // Process ad-hoc command            IQ reply = manager.process(iq);            router.route(reply);            return true;        }        return false;    }    /**     * Handles Presence packets sent to the pubsub service. Only process available and not     * available presences.     *     * @param presence the Presence packet sent to the pubsub service.     */    public void process(Presence presence) {        if (presence.isAvailable()) {            JID subscriber = presence.getFrom();            Map<String, String> fullPresences = barePresences.get(subscriber.toBareJID());            if (fullPresences == null) {                synchronized (subscriber.toBareJID().intern()) {                    fullPresences = barePresences.get(subscriber.toBareJID());                    if (fullPresences == null) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -