Java Messaging Service are used mostly for the asynchronous
communication, however JMS can also used for synchronous communication. It
provides advantage of decoupling the client and service totally.
Below example explains how to send a request from client (i.e J2SE App) to a MDB bean. Once the MDB receives the message it get processed and sends the response back to the Client (J2SE) application.
Below example explains how to send a request from client (i.e J2SE App) to a MDB bean. Once the MDB receives the message it get processed and sends the response back to the Client (J2SE) application.
Look up the connection factory object from the initial content. From the connection factory, session object are created. After creating the session, requestQueue name should be looked up from the initial context. From the session object, we can create a TextMessage . Before sending the message Object, to the Queue below are few properties which need to set to the message object.
setJMSReplyTo:
This is one the property which tells the consumer (here is our MDB) to which queue the response message should be sent. It accepts queue object as an arguments.
setJMSCorrelationID:
As the responses are sending to a queue, it can be consumed by any consumer. However the response message should only delivered to the client which initiated the request.
Messages are uniquely identified by Correlation ID, which are used by Message Selectors to consumer only the message which matches this ID.
Client:
package com.passion4java.requestreply; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import org.safehaus.uuid.UUID; import org.safehaus.uuid.UUIDGenerator; public class Client { static int counter =0; public boolean responseReceived = false; public String response = ""; public static void main(String[] args) throws Exception { new Client().send2Queue(); } public void send2Queue() throws Exception{ //Setting JNDI values, here JBOSS JNDI values System.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); System.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces"); System.setProperty("java.naming.provider.url", "jnp://localhost:1099"); InitialContext ctx = new InitialContext(); String requestQueueName ="jms/requestQueue"; String replyQueueName ="jms/replyQueue"; String connectionFactoryName = "jms/requestReplyQCF"; //Sending Message MDB Connection connection = null; Session session = null; Destination destination = null; MessageProducer messageProducer = null; ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup(connectionFactoryName); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination =(Destination) ctx.lookup(requestQueueName); messageProducer = session.createProducer(destination); connection.start(); this.sendMessage(session, ctx, messageProducer, replyQueueName, " FIRST MDB"); messageProducer.close(); session.close(); connection.close(); } public void sendMessage(Session session,InitialContext ctx,MessageProducer messageProducer,String replyQueueName, String requestText) throws RuntimeException { try { //Creating Unique CorrelationID UUIDGenerator uuid = UUIDGenerator.getInstance(); UUID u = uuid.generateRandomBasedUUID(); TextMessage message = session.createTextMessage(requestText); String correlationId = u.toString(); //Create Reply To Queue Destination replyQueue = (Destination) ctx.lookup(replyQueueName); message.setJMSReplyTo(replyQueue); message.setJMSCorrelationID(correlationId); String messgeSelector = "JMSCorrelationID = '" + correlationId + "'"; MessageConsumer replyConsumer = session.createConsumer(replyQueue,messgeSelector); messageProducer.send(message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); System.out.println("++++++++++++++++++++++++++++++++++++++++++"); System.out.println("Message sent to Bean"+ requestText); System.out.println("messgeSelector name is : "+messgeSelector.toString()); System.out.println("++++++++++++++++++++++++++++++++++++++++++"); Message replayMessage = replyConsumer.receive(); TextMessage textMessage = (TextMessage) replayMessage; String replayText = textMessage.getText(); System.out.println("Request Messge -->"+ requestText); System.out.println("Response Message -->"+ replayText); } catch (JMSException je) { throw new RuntimeException(je); } catch (NamingException ne) { throw new RuntimeException(ne); } } }
1
Server:
package com.passion4java.requestreply; import javax.ejb.EJBException; import javax.ejb.MessageDrivenBean; import javax.ejb.MessageDrivenContext; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; public class ServerMDB implements MessageListener, MessageDrivenBean { private Session session; Context ctx; public ServerMDB() { try { ctx = new InitialContext(); } catch (NamingException e) { throw new RuntimeException(e); } } public void onMessage(Message message) { String text = null; Connection connection = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; text = textMessage.getText(); System.out.println("****************************************************"); System.out.println("Received Message -->"+text); System.out.println("****************************************************"); //Send the reply Destination replyDestination = message.getJMSReplyTo(); String correlationID = message.getJMSCorrelationID(); //ID which got from the client Message System.out.println("input correlationID"+ correlationID); ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("jms/requestReplyQCF"); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer replyProducer = session.createProducer(replyDestination); TextMessage replyMessage = session.createTextMessage(); replyMessage.setText("Hi i am a Server MDB. Thanks for using request/reply"); replyMessage.setJMSCorrelationID(correlationID); replyProducer.send(replyMessage); System.out.println("****************************************************"); System.out.println("Sent reply"); System.out.println("\tTime: " + System.currentTimeMillis() + " ms"); System.out.println("\tMessage ID: " + replyMessage.getJMSMessageID()); System.out.println("\tCorrel. ID: " + replyMessage.getJMSCorrelationID()); System.out.println("\tReply to: " + replyMessage.getJMSReplyTo()); System.out.println("\tContents: " + replyMessage.getText()); System.out.println("****************************************************"); } else { System.err.println("Expected a Text Message"); } connection.close(); } catch (Throwable t) { t.printStackTrace(); }finally{ } } public void ejbRemove() throws EJBException { // TODO Auto-generated method stub } public void setMessageDrivenContext(MessageDrivenContext arg0) throws EJBException { // TODO Auto-generated method stub } public void ejbCreate() { } }
1.
Once the messages are sent, it should be
received by the consumer. Once the consumer receives the message, it do
the necessary processing. In our example, we are just appending text to
the input message.
Once the server done with the processing, it times to send the request back to the same client. The server will get the response quest object from the input messages itself by calling the getJMSReplyTo(). Once it gets the response queue, it creates a messege.
Once the server done with the processing, it times to send the request back to the same client. The server will get the response quest object from the input messages itself by calling the getJMSReplyTo(). Once it gets the response queue, it creates a messege.
7.
After creating the message, the server should
set the correlation ID to the response message. It should be same as request message
correlation ID.
8.
This is because, client uses Message Selector
based on the message correlation ID and
it listens to the queue (where the server should send the response).
Back to the Client:
1.
Once the Message are sent from server to the
reply queue
2.
Client will listen to that queue with the unique
Message Selector ID, as the client and server Message Selector are same the
message which sent by the sever should be received by the same client.
2 comments:
Wow, that's a lot of code.
This is cool and all, but you can do this with a lot less code using something like camel, spring integration, or even the base spring framework.
This is a good example, though.
Thats really true, you can write a few lines using camel which does the same.As Camel requires lots of dependency and its mainly used for routing... to make it simple i thought writing the same with only JMS & MDB's.. soon i am planning write a simple camel Hello World which does the same ;).. any thanks for your comments.
Post a Comment