📄 massmessagequeue.java.svn-base
字号:
package com.infobank.superchannel.daemon.messagelauncher;import java.util.Hashtable;import java.util.LinkedList;import java.util.Queue;import java.util.Set;import java.util.concurrent.atomic.AtomicInteger;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.util.DaemonConfiguration;/** * @author Xiaoguang Sun * @version 类说明: 批量短信息队列实现 */public class MassMessageQueue implements MessageQueue { // 子消息队列散列表,保存有所有当前活动的子队列 private Hashtable<String, Queue<TextMessage>> messagesQueue=new Hashtable<String,Queue<TextMessage> >(); // 当前使用中的子队列 private Queue<TextMessage> currentSubQueue=null; // 当前子队列已使用的配额 private AtomicInteger usedQuota=new AtomicInteger(0); // 批量短信息队列当前尺寸 private AtomicInteger queueSize=new AtomicInteger(0); // 子队列数目 private AtomicInteger numberOfSubQueue=new AtomicInteger(0); // maximum queue size private static int capacity=0; // 当前通道的设定配额 private static AtomicInteger channelQuota=new AtomicInteger(0); // 所有短信息的内容的集合 private String keySet[]=new String[16]; // 当前发送子队列的索引 private AtomicInteger keyIndex=new AtomicInteger(0); // message launcher daemon private static MessageLauncherDaemon daemon=null; /** * 初始化批量短信队列 * @param daemon 消息发送服务器对象 */ public static boolean init(MessageLauncherDaemon daemon){ MassMessageQueue.daemon=daemon; String tag=daemon.getTag(); // 获取批量短信息队列单一内容短信一次发送配额 DaemonConfiguration config=DaemonConfiguration.getInstance(); StringBuilder sb=new StringBuilder(); sb.append("/daemon[@type='MessageLauncher' and @tag='"); sb.append(tag); sb.append("']/message/queue[@type='Mass']/"); String basePath=sb.toString(); try{ sb=new StringBuilder(); sb.append(basePath); sb.append("quota"); channelQuota.set(config.getInt(sb.toString())); sb=new StringBuilder(); sb.append(basePath); sb.append("capacity"); capacity=config.getInt(sb.toString()); } catch(Exception ex){ return false; } return true; } /** * 新增待发短消息 * @param messages 待发送短消息 */ public void addMessage(Queue<TextMessage> messages){ // 一次传递进入的消息拥有一样的内容 // 如果系统中还有没有发完的短信也拥有一样的内容,则将此批信息都放入同一个队列 queueSize.addAndGet(messages.size()); String content=messages.peek().getMessage(); Queue<TextMessage> subQueue=messagesQueue.get(content); if(subQueue!=null){ synchronized(subQueue){ subQueue.addAll(messages); } }else{ messagesQueue.put(content, messages); resetKeySet(true); } numberOfSubQueue.set(messagesQueue.size()); } /** * 新增待发短消息 * @param message 待发送短消息 * @deprecated 批量短信队列不应该出现单独增加一条短信息的情况 */ public void addMessage(TextMessage message){ } /** * 从消息队列中取出一定数目的消息 * @return 取出的待发送短消息 */ public Queue<TextMessage> getMessage(int number) { if(number<=0) return null; // 试图循环取得number条短信 Queue<TextMessage> messages=new LinkedList<TextMessage>(); for(int idx=0;idx<number;++idx){ TextMessage message=getMessage(); if(message==null){ // 系统队列中没有消息可发退出 break; } messages.add(message); } return messages; } /** * 重置子队列索引集 * @param callSwitch 是否换出当前子队列 */ private void resetKeySet(boolean callSwitch){ // 重新设置队列索引集 Set<String> keySet=messagesQueue.keySet(); this.keySet=keySet.toArray(this.keySet); if(callSwitch) switchQueue(); } /** * 切换当前子队列 */ private void switchQueue(){ if(keyIndex.get()>=keySet.length) keyIndex.set(0); if(keySet[keyIndex.get()]==null){ keyIndex.set(0); } String key=keySet[keyIndex.get()]; if(key==null){ resetKeySet(false); return; } currentSubQueue=messagesQueue.get(key); usedQuota.set(0); keyIndex.addAndGet(1); } /** * 从消息队列中取出一条待发短消息 * @return 取出的待发送短消息 */ public TextMessage getMessage() { if(messagesQueue.isEmpty()) return null; if(usedQuota.get()>=channelQuota.get()){ try{ // 当前子队列所代表内容的短信已经发送条数超过配额,需要进行自队列切换 switchQueue(); }catch(Exception ex){ daemon.getObserver().fatal(ex); } } if(currentSubQueue==null){ // 当前子队列为空 return null; } TextMessage result=currentSubQueue.poll(); if(result!=null){ usedQuota.addAndGet(1); queueSize.decrementAndGet(); } try{ // 当前子队列已经为空,需要将其删除 String message=result.getMessage(); if(currentSubQueue.isEmpty()&&message!=null){ messagesQueue.remove(message); resetKeySet(true); } }catch(Exception ex){ daemon.getObserver().fatal(ex); } return result; } /** * 取得消息队列当前长度 * @return 消息队列当前长度 */ public int getMessageQueueSize() { return queueSize.get(); } public boolean isFull() { return queueSize.get()>=capacity; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -