📄 leafnode.java
字号:
for (Element item : itemElements) { itemID = item.attributeValue("id"); List entries = item.elements(); payload = entries.isEmpty() ? null : (Element) entries.get(0); // Create a published item from the published data and add it to the node and the db synchronized (publishedItems) { // Make sure that the published item has an ID and that it's unique in the node if (itemID == null) { itemID = StringUtils.randomString(15); } while (itemsByID.get(itemID) != null) { itemID = StringUtils.randomString(15); } // Create a new published item newItem = new PublishedItem(this, publisher, itemID, new Date()); newItem.setPayload(payload); // Add the new item to the list of published items newPublishedItems.add(newItem); // Add the published item to the list of items to persist (using another thread) // but check that we don't exceed the limit. Remove oldest items if required. while (!publishedItems.isEmpty() && publishedItems.size() >= maxPublishedItems) { PublishedItem removedItem = publishedItems.remove(0); itemsByID.remove(removedItem.getID()); // Add the removed item to the queue of items to delete from the database. The // queue is going to be processed by another thread service.queueItemToRemove(removedItem); } addPublishedItem(newItem); // Add the new published item to the queue of items to add to the database. The // queue is going to be processed by another thread service.queueItemToAdd(newItem); } } } // Build event notification packet to broadcast to subscribers Message message = new Message(); Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event"); // Broadcast event notification to subscribers and parent node subscribers Set<NodeAffiliate> affiliatesToNotify = new HashSet<NodeAffiliate>(affiliates); // Get affiliates that are subscribed to a parent in the hierarchy of parent nodes for (CollectionNode parentNode : getParents()) { for (NodeSubscription subscription : parentNode.getSubscriptions()) { affiliatesToNotify.add(subscription.getAffiliate()); } } // TODO Use another thread for this (if # of subscribers is > X)???? for (NodeAffiliate affiliate : affiliatesToNotify) { affiliate.sendPublishedNotifications(message, event, this, newPublishedItems); } } /** * Deletes the list of published items from the node. Event notifications may be sent to * subscribers for the deleted items. When an affiliate has many subscriptions to the node, * the affiliate will get a notification for each set of items that affected the same list * of subscriptions.<p> * * For performance reasons the deleted published items are saved to the database * using a background thread. Sending event notifications to node subscribers may * also use another thread to ensure good performance.<p> * * @param toDelete list of items that were deleted from the node. */ public void deleteItems(List<PublishedItem> toDelete) { synchronized (publishedItems) { for (PublishedItem item : toDelete) { // Remove items to delete from memory publishedItems.remove(item); // Update fast look up cache of published items itemsByID.remove(item.getID()); } } // Remove deleted items from the database for (PublishedItem item : toDelete) { service.queueItemToRemove(item); } if (isNotifiedOfRetract()) { // Broadcast notification deletion to subscribers // Build packet to broadcast to subscribers Message message = new Message(); Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event"); // Send notification that items have been deleted to subscribers and parent node // subscribers Set<NodeAffiliate> affiliatesToNotify = new HashSet<NodeAffiliate>(affiliates); // Get affiliates that are subscribed to a parent in the hierarchy of parent nodes for (CollectionNode parentNode : getParents()) { for (NodeSubscription subscription : parentNode.getSubscriptions()) { affiliatesToNotify.add(subscription.getAffiliate()); } } // TODO Use another thread for this (if # of subscribers is > X)???? for (NodeAffiliate affiliate : affiliatesToNotify) { affiliate.sendDeletionNotifications(message, event, this, toDelete); } } } /** * Sends an IQ result with the list of items published to the node. Item ID and payload * may be included in the result based on the node configuration. * * @param originalRequest the IQ packet sent by a subscriber (or anyone) to get the node items. * @param publishedItems the list of published items to send to the subscriber. * @param forceToIncludePayload true if the item payload should be include if one exists. When * false the decision is up to the node. */ void sendPublishedItems(IQ originalRequest, List<PublishedItem> publishedItems, boolean forceToIncludePayload) { IQ result = IQ.createResultIQ(originalRequest); Element childElement = originalRequest.getChildElement().createCopy(); result.setChildElement(childElement); Element items = childElement.element("items"); for (PublishedItem publishedItem : publishedItems) { Element item = items.addElement("item"); if (isItemRequired()) { item.addAttribute("id", publishedItem.getID()); } if ((forceToIncludePayload || isPayloadDelivered()) && publishedItem.getPayload() != null) { item.add(publishedItem.getPayload().createCopy()); } } // Send the result service.send(result); } public PublishedItem getPublishedItem(String itemID) { if (!isItemRequired()) { return null; } synchronized (publishedItems) { return itemsByID.get(itemID); } } public List<PublishedItem> getPublishedItems() { synchronized (publishedItems) { return Collections.unmodifiableList(publishedItems); } } public List<PublishedItem> getPublishedItems(int recentItems) { synchronized (publishedItems) { int size = publishedItems.size(); if (recentItems > size) { // User requested more items than the one the node has so return the current list return Collections.unmodifiableList(publishedItems); } else { // Return the number of recent items the user requested List<PublishedItem> recent = publishedItems.subList(size - recentItems, size); return new ArrayList<PublishedItem>(recent); } } } public PublishedItem getLastPublishedItem() { synchronized (publishedItems) { if (publishedItems.isEmpty()) { return null; } return publishedItems.get(publishedItems.size()-1); } } /** * Returns true if the last published item is going to be sent to new subscribers. * * @return true if the last published item is going to be sent to new subscribers. */ public boolean isSendItemSubscribe() { return sendItemSubscribe; } void setMaxPayloadSize(int maxPayloadSize) { this.maxPayloadSize = maxPayloadSize; } void setPersistPublishedItems(boolean persistPublishedItems) { this.persistPublishedItems = persistPublishedItems; } void setMaxPublishedItems(int maxPublishedItems) { this.maxPublishedItems = maxPublishedItems; } void setSendItemSubscribe(boolean sendItemSubscribe) { this.sendItemSubscribe = sendItemSubscribe; } /** * Purges items that were published to the node. Only owners can request this operation. * This operation is only available for nodes configured to store items in the database. All * published items will be deleted with the exception of the last published item. */ public void purge() { List<PublishedItem> toDelete = null; // Calculate items to delete synchronized (publishedItems) { if (publishedItems.size() > 1) { // Remove all items except the last one toDelete = new ArrayList<PublishedItem>( publishedItems.subList(0, publishedItems.size() - 1)); // Remove items to delete from memory publishedItems.removeAll(toDelete); // Update fast look up cache of published items itemsByID = new HashMap<String, PublishedItem>(); itemsByID.put(publishedItems.get(0).getID(), publishedItems.get(0)); } } if (toDelete != null) { // Delete purged items from the database for (PublishedItem item : toDelete) { service.queueItemToRemove(item); } // Broadcast purge notification to subscribers // Build packet to broadcast to subscribers Message message = new Message(); Element event = message.addChildElement("event", "http://jabber.org/protocol/pubsub#event"); Element items = event.addElement("purge"); items.addAttribute("node", nodeID); // Send notification that the node configuration has changed broadcastNodeEvent(message, false); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -