📄 cbjxtransport.java
字号:
/** * {@inheritDoc} **/ public void propagate( Message msg, String serviceName, String serviceParams, String prunePeer) throws IOException { //propagate is not allowed by this endpointProtocol } /** * {@inheritDoc} **/ public boolean ping(EndpointAddress addr) { Messenger messenger = getMessenger( addr, null ); boolean reachable = (null != messenger); messenger.close(); return reachable; } /** * {@inheritDoc} **/ public void processIncomingMessage( Message message, EndpointAddress srcAddr, EndpointAddress dstAddr ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "processIncomingMessage : Received message from: " + srcAddr ); } //extract the Crypto info from the message MessageElement cryptoElement = message.getMessageElement( CBJX_MSG_NS, CBJX_MSG_INFO ); if (cryptoElement == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processIncomingMessage : No '" + CBJX_MSG_INFO + "' in the message"); } return; } message.removeMessageElement( cryptoElement ); // the cbjx message info CbJxMessageInfo cryptoInfo = null; try { cryptoInfo = new CbJxMessageInfo( cryptoElement.getStream(), cryptoElement.getMimeType() ); } catch( Throwable e) { if(LOG.isEnabledFor(Level.WARN)) { LOG.warn("processIncomingMessage : Couldn't retrieve CbJxMessageInfo from '" + CBJX_MSG_INFO + "' element", e ); } return; } Message submessage = checkCryptoInfo( message, cryptoElement, cryptoInfo ); if( null == submessage ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "processIncomingMessage : discarding message from " + srcAddr ); } return; } //give back the message to the endpoint try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processIncomingMessage: delivering " + submessage + " to: " + cryptoInfo.getDestinationAddress() ); } endpoint.processIncomingMessage( submessage, cryptoInfo.getSourceAddress(), cryptoInfo.getDestinationAddress() ); } catch( Throwable all ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("processIncomingMessage: endpoint failed to demux message", all ); } } } /** * add the CryptoInfo into the message * * @param submessage the message * @param destAddress the destination * @return Message the message with the CbJxMessageInfo added */ public Message addCryptoInfo( Message submessage, EndpointAddress destAddress ) throws IOException { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Building CBJX wrapper for " + submessage ); } // Remove all existing CbJx Elements from source Iterator eachCbJxElement = submessage.getMessageElementsOfNamespace( CbJxTransport.CBJX_MSG_NS ); while( eachCbJxElement.hasNext() ) { MessageElement aMessageElement = (MessageElement) eachCbJxElement.next(); eachCbJxElement.remove(); } Message message = new Message(); CbJxMessageInfo cryptoInfo = new CbJxMessageInfo(); //set the source Id of the message cryptoInfo.setSourceID(localPeerID); cryptoInfo.setSourceAddress( localPeerAddr ); cryptoInfo.setDestinationAddress( destAddress ); //add the root cert into the message info PSECredential cred = (PSECredential) membership.getDefaultCredential(); if( null == cred ) { throw new IOException( "No authentication available for message signing." ); } Certificate cert = cred.getCertificate(); cryptoInfo.setPeerCert(cert); //compute the signature of the message body TextDocument infoDoc = (TextDocument) cryptoInfo.getDocument(MimeMediaType.XMLUTF8); byte [] infoSignature = null; try { infoSignature = PSEUtils.computeSignature( CbJxDefs.signAlgoName, cred.getPrivateKey(), infoDoc.getStream() ); } catch( Throwable e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("failed to sign " + submessage, e); } return null; } //add the cbjx:CryptoInfo into the message MessageElement infoSigElement = new ByteArrayMessageElement( CBJX_MSG_SIG, MimeMediaType.AOS, infoSignature, null ); //add the cbjx:CryptoInfo into the message MessageElement cryptoInfoElement = new TextDocumentMessageElement( CBJX_MSG_INFO, infoDoc, infoSigElement ); message.addMessageElement( CBJX_MSG_NS, cryptoInfoElement ); // Compute the signature of the encapsulated message and append it to // the container. // serialize the container WireFormatMessage subserial = WireFormatMessageFactory.toWire( submessage, WireFormatMessageFactory.DEFAULT_WIRE_MIME, null ); // calculate the signature byte [] bodySignature = null; try { bodySignature = PSEUtils.computeSignature( CbJxDefs.signAlgoName, cred.getPrivateKey(), subserial.getStream() ); } catch( Throwable e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("failed to sign" + submessage, e); } return null; } subserial = null; // Make the signature into an element MessageElement bodySigElement = new ByteArrayMessageElement( CBJX_MSG_SIG, MimeMediaType.AOS, bodySignature, null ); // Add the encapsulated body into the container message. message.addMessageElement( CBJX_MSG_NS, new JxtaMessageMessageElement( CBJX_MSG_BODY, new MimeMediaType( "application/x-jxta-msg" ), submessage, bodySigElement ) ); return message; } public Message checkCryptoInfo( Message message, MessageElement cryptoElement, CbJxMessageInfo cryptoInfo ) { //extract the body element from the message JxtaMessageMessageElement bodyElement = (JxtaMessageMessageElement) message.getMessageElement( CBJX_MSG_NS, CBJX_MSG_BODY ); if ( null == bodyElement ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("No '" + CBJX_MSG_BODY + "' in " + message ); } return null; } message.removeMessageElement( bodyElement ); //extract the peer certificate Certificate peerCert = cryptoInfo.getPeerCert(); //and from it the public key //the public key from the message RSAPublicKey publicKey = (RSAPublicKey)peerCert.getPublicKey(); //check the cert validity try { peerCert.verify(publicKey); } catch(Exception e) { if(LOG.isEnabledFor(Level.WARN)) { LOG.warn("Invalid peer cert", e); } return null; } //check the cbid try { net.jxta.impl.id.CBID.PeerID srcPeerID = (net.jxta.impl.id.CBID.PeerID) cryptoInfo.getSourceID(); byte [] pub_der = peerCert.getPublicKey().getEncoded(); net.jxta.impl.id.CBID.PeerID genID = (net.jxta.impl.id.CBID.PeerID) IDFactory.newPeerID( group.getPeerGroupID(), pub_der ); if(!srcPeerID.getUUID().equals(genID.getUUID())) { //the cbid is not valid. Discard the message if(LOG.isEnabledFor(Level.WARN)) { LOG.warn("CBID of " + message + " is not valid : " + srcPeerID + " != " + genID ); } return null; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("CBID of the message is valid"); } } catch(Throwable e) { if(LOG.isEnabledFor(Level.WARN)) { LOG.warn("failed to verify cbid", e); } return null; } // verify the signature of the cryptinfo message try { boolean valid = PSEUtils.verifySignature( CbJxDefs.signAlgoName, peerCert, cryptoElement.getSignature().getBytes( false ), cryptoElement.getStream() ); if( !valid ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "Failed to verify the signature of cryptinfo for " + message ); } return null; } } catch( Throwable e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn( "Failed to verify the signature of cryptinfo for " + message, e) ; } return null; } // then verify the signature if(LOG.isEnabledFor(Level.WARN)) { LOG.warn("verifying signature"); } // verify the signature of the message try { boolean valid = PSEUtils.verifySignature( CbJxDefs.signAlgoName, peerCert, bodyElement.getSignature().getBytes( false ), bodyElement.getStream() ); if( !valid ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("failed to verify the signature of " + message ); } return null; } } catch( Throwable e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("failed to verify the signature of " + message, e); } return null; } //the message is valid return bodyElement.getMessage(); } /** * this class filters incoming messages. * it checks if messages are valid and if not discard them */ public class CbJxInputFilter implements MessageFilterListener { public CbJxInputFilter() { super(); } /** * {@inheritDoc} **/ public Message filterMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if( dstAddr.getProtocolAddress().equals( getProtocolName() ) ) { //extract the Crypto info from the message MessageElement cryptoElement = message.getMessageElement( CBJX_MSG_NS, CBJX_MSG_INFO ); if (cryptoElement == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No '" + CBJX_MSG_INFO + "' in the message"); } return null; } message.removeMessageElement( cryptoElement ); // the cbjx message info CbJxMessageInfo cryptoInfo = null; try { cryptoInfo = new CbJxMessageInfo( cryptoElement.getStream(), cryptoElement.getMimeType() ); } catch( Throwable e) { if(LOG.isEnabledFor(Level.WARN)) { LOG.warn("Couldn't retrieve CbJxMessageInfo from '" + CBJX_MSG_INFO + "' element", e ); } return null; } return checkCryptoInfo( message, cryptoElement, cryptoInfo ); } return message; } } /** * this class filters all outgoing messages that are not sent with * messengers. (that is propagate messages). It adds CbJxInformation * into to messages. */ public class CbJxOutputFilter implements MessageFilterListener { /** * Default constructor */ public CbJxOutputFilter() { super(); } /** * {@inheritDoc} **/ public Message filterMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { Message msg = (Message) message.clone(); if ( null == msg.getMessageElement( CBJX_MSG_NS, CBJX_MSG_INFO ) ) { try { msg = addCryptoInfo( msg, dstAddr ); } catch ( IOException failed ) { return null; } } return msg; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -