📄 emailexecutor.java
字号:
+ methodName.toString() + ") argument"); if (log.isLoggable(Level.FINE)) log.fine(methodName.toString() + "(" + msgArr.length + ") with emailSessionId=" + getEmailSessionId()); String requestId = null; try { // We use the RequestReplyExecutor.java for the request/reply pattern MsgInfo msgInfo = new MsgInfo(this.glob, MsgInfo.INVOKE_BYTE, methodName, getSecretSessionId(), super.getProgressListener()); msgInfo.addMessage(msgArr); requestId = msgInfo.createRequestId(null); if (expectingResponse) { // register at the POP3 poller getPop3Driver().registerForEmail(getEmailSessionId(), requestId, this); } // super calls our sendMessage() which effectively sends the message Object response = super.requestAndBlockForReply(msgInfo, expectingResponse, false); return response; } catch (XmlBlasterException e) { // ErrorCode.USER* errors can't arrive here throw e; } catch (Throwable e) { e.printStackTrace(); throw new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION, ME, "Sorry, email sending " + methodName.toString() + " failed, no mail sent to " + addressBase.getRawAddress(), e); } finally { if (expectingResponse) getPop3Driver().deregisterForEmail(getEmailSessionId(), requestId); } } /** * Notification by Pop3Driver when a (response) email message arrives. * Enforced by I_ResponseListener */ public void incomingMessage(String requestId, Object response) { EmailData emailData = (EmailData) response; AttachmentHolder msgUnitAttachmentHolder = null; String pop3Url = null; try { pop3Url = getPop3Driver().getPop3Url(); // for logging only msgUnitAttachmentHolder = emailData.getMsgUnitAttachment(); // "*.xbf", "*.xbfz", "*.xmlz", ... } catch (Throwable e) { log.warning("Error parsing email data from " + pop3Url + ", please check the format: " + e.toString()); return; } if (msgUnitAttachmentHolder == null) { log.warning("Got email from POP3 but there was no MsgUnit attachment, we ignore it: " + emailData.toXml(true)); return; } byte[] encodedMsgUnit = msgUnitAttachmentHolder.getContent(); MsgInfo[] msgInfos = null; try { if (MsgInfo.isCompressed(msgUnitAttachmentHolder.getFileName(), msgUnitAttachmentHolder.getContentType())) { // Decompress the bytes int length = encodedMsgUnit.length; this.decompressor.reset(); this.decompressor.setInput(encodedMsgUnit, 0, length); byte[] buf = new byte[2048+length]; ByteArrayOutputStream out = new ByteArrayOutputStream(2048+length); while (!this.decompressor.finished()) { int resultLength = this.decompressor.inflate(buf); if (resultLength > 0) out.write(buf, 0, resultLength); } encodedMsgUnit = out.toByteArray(); if (log.isLoggable(Level.FINE)) log.fine("Decompressed message from " + length + " to " + encodedMsgUnit.length + " bytes"); } String parserClassName = MsgInfoParserFactory.instance().guessParserName(msgUnitAttachmentHolder.getFileName(), msgUnitAttachmentHolder.getContentType()); msgInfos = MsgInfo.parse(glob, this.progressListener, encodedMsgUnit, parserClassName, this.pluginConfig); if (msgInfos.length < 1) { // spam? log.warning("Unexpected msgInfo with length==0, requestId=" + requestId + " data=" + emailData.toXml(true)); Thread.dumpStack(); return; } for (int j=0; j<msgInfos.length; j++) { MsgInfo msgInfo = msgInfos[j]; msgInfo.setRequestIdGuessed(emailData.isRequestIdFromSentDate()); msgInfo.setBounceObject(BOUNCE_MAILFROM_KEY, emailData.getFrom()); // The messageId could be in the subject and not in the attachment msgInfo.setBounceObject(BOUNCE_MESSAGEID_KEY, emailData.getMessageId()); AttachmentHolder[] attachments = emailData.getAttachments(); for (int i=0; i<attachments.length; i++) { AttachmentHolder a = attachments[i]; if (a == msgUnitAttachmentHolder) continue; // TODO: Determine which attachments to bounce msgInfo.setBounceObject(a.getFileName(), a); } } } catch (Throwable e) { log.warning("Error parsing email data from " + pop3Url + ", check if client and server have identical compression settings: " + e.toString() + ": " + emailData.toXml(true)); return; } // Response and Exception messages should NEVER expire if (emailData.isExpired() && msgInfos[0].isInvoke()) { log.warning("Message is expired, we discard it: " + emailData.toString()); return; } // For XmlScript && INVOKE we could have multiple message bundled // else length is always 1! for (int i=0; i<msgInfos.length; i++) { MsgInfo msgInfo = msgInfos[i]; // If counterside has stripped information we add it again from the messageId attachment if (msgInfo.getRequestId().length() == 0) msgInfo.setRequestId(emailData.getRequestId()); if (msgInfo.getSecretSessionId().length() == 0) msgInfo.setSecretSessionId(emailData.getSessionId()); try { if (i==0 && msgInfo.isInvoke()) { if (isLoopingMail(msgInfo, emailData)) return; } /* Memory leak cleanup is handled by EmailDriver.java on LogoutEvent if (MethodName.DISCONNECT.equals(msgInfo.getMethodName())) { removeFromLoopProtection(emailData.getFrom()); } */ // This wakes up the blocking thread of sendEmail() and returns the // returnQos or the received invocation // On server side it typically invokes the core connect() or publish() etc. if (receiveReply(msgInfo, false) == false) { log.warning("Error parsing email data from " + getPop3Driver().getPop3Url() + ", CONNECT etc is not yet implemented"); } if (i==0 && msgInfos.length > 1 && MethodName.CONNECT.equals(msgInfo.getMethodName())) { // If multiple requests where bundled pass the others the secret session id for (int k=1; k<msgInfos.length; k++) msgInfos[k].setSecretSessionId(msgInfo.getSecretSessionId()); } } catch (Throwable e) { log.warning("Can't process email data from " + pop3Url + ": " + e.toString()); return; } } } /** * Some weak looping protection. * Assume requestId to be strictly increasing * to detect email duplicates (which can be produced by MTAs) */ protected boolean isLoopingMail(MsgInfo msgInfo, EmailData emailData) { try { LoopProtection loopProtection = null; synchronized (this.senderLoopProtectionMap) { loopProtection = (LoopProtection)this.senderLoopProtectionMap.get(emailData.getFrom()); if (loopProtection == null) { loopProtection = new LoopProtection(emailData.getFrom(), -1, -1); this.senderLoopProtectionMap.put(loopProtection.key, loopProtection); } } long currRequestId = new Long(msgInfo.getRequestId()).longValue(); if (MethodName.PING.equals(msgInfo.getMethodName())) { if (loopProtection.lastPingRequestId >= 0 && currRequestId <= loopProtection.lastPingRequestId) { log.warning("Can't process email data from " + getPop3Driver().getPop3Url() + ", it seems to be looping as requestId="+currRequestId+" (last="+loopProtection.lastPingRequestId+") has been processed already" + ": " + emailData.toXml(true)); return true; } loopProtection.lastPingRequestId = currRequestId; } else { if (loopProtection.lastRequestId >= 0 && currRequestId <= loopProtection.lastRequestId) { log.warning("Can't process email data from " + getPop3Driver().getPop3Url() + ", it seems to be looping as requestId="+currRequestId+" (last="+loopProtection.lastRequestId+") has been processed already" + ": " + emailData.toXml(true)); return true; } loopProtection.lastRequestId = currRequestId; } } catch (Throwable e) { log.warning("Cant handle requestId '"+msgInfo.getRequestId()+"' to be of type long"); } return false; } /** * Cleanup * @param key this is the sender email address, for example "xmlBlaster@localhost" for a client * or "demo@localhost" on server side (emailData.getFrom()) * @return The removed entry or null if not found */ protected LoopProtection removeFromLoopProtection(String key) { LoopProtection loopProtection = null; synchronized (this.senderLoopProtectionMap) { loopProtection = (LoopProtection)this.senderLoopProtectionMap.remove(key); if (loopProtection == null) { // Is OK for email plugins which only invokes log.fine("No loopProtection entry found for sender " + key + " which is OK in most cases, current known are: " + getLoopProtectionList()); } } return loopProtection; } /** * Returns a comma seperated list of all 'from email addresses'. * Used for JMX. * @return For example "joe@localhost,blue@localhost", never null */ public String getLoopProtectionList() { LoopProtection[] arr = getLoopProtections(); StringBuffer sb = new StringBuffer(512); for (int i=0; i<arr.length; i++) { if (i>0) sb.append(","); sb.append(arr[i].key); } return sb.toString(); } protected LoopProtection[] getLoopProtections() { synchronized (this.senderLoopProtectionMap) { return (LoopProtection[])this.senderLoopProtectionMap.values().toArray(new LoopProtection[this.senderLoopProtectionMap.size()]); } } /** * @return messageId="<messageId><sessionId>sessionId:4423c443</sessionId><requestId>3</requestId><methodName>subscribe</methodName></messageId>" */ protected String createMessageId(MsgInfo msgInfo, String requestId, MethodName methodName, Timestamp expiryTimestamp) { String messageId = (String)msgInfo.getBounceObject(BOUNCE_MESSAGEID_KEY); if (messageId == null) { String sessionId = getEmailSessionId(msgInfo); { // Hardcoded simplification of email subject for messages send manually from a normal email client // like this the GUI user can simply push the reply button to send further publish() etc. // RE: <messageId><sessionId>sessionId:127.0.0.2-null-1134497522115--102159664-3</sessionId></messageId> //if (methodName.equals(MethodName.CONNECT)) { // We send the secretSessionId in the SUBJECT of a ConnectReturnQos //sessionId = msgInfo.getSecretSessionId(); //} if (!msgInfo.isInvoke()) // Responses and exceptions should never expire expiryTimestamp = null; if (msgInfo.isRequestIdGuessed()) { requestId = null; methodName = null; } } messageId = EmailData.createMessageId(sessionId, requestId, methodName, expiryTimestamp); } return messageId; } /** * Extends RequestReplyExecutor.sendMessage */ protected void sendMessage(MsgInfo msgInfo, String requestId, MethodName methodName, boolean udp) throws XmlBlasterException,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -