JMS

Some of the Java clients are based on Java Message Service (JMS). JMS is a message oriented middleware API, which is a part of the Java Platform Enterprise Edition. More information about Java Message Service can be found at the Oracle website - http://www.oracle.com/technetwork/java/index-jsp-142945.html.

The goal of this chapter is not to provide a comprehensive guide to JMS API, but to provide code snippets illustrating the work with the Eurex Clearing FIXML/FpML/Margin Calculator Interface in Java.

The details which are specific to the different JMS implementations will be described in the subsequent chapters.

Java Naming and Directory Interface (JNDI)

The JMS applications typically use the Java Naming and Directory Interface (JNDI) to obtain a connection factory, connection URI and message source / target adresses. The JNDI configuration might be kept separate from the application – for example stored in a properties file. But it can be also dynamically created Properties or HashMap object.

The JNDI properties are used by the client application to connect to the broker and send or receive messages. The connection factory name as well as the syntax of the connection URI and addresses are different for different APIs.

The properties file has to be loaded and processed into the application. The classes Properties (java.util) and InitialContext (javax.naming) will be used. Following example shows how to load the properties from a file:

InitialContext ctx;

Properties properties = new Properties();

properties.load(new FileInputStream("<PropertiesFile>"));

ctx = new InitialContext(properties);

The <PropertiesFile> has to be replaced according to the Member’s environment. As a result, an InitalContext object is created in variable ctx containing all JNDI resources defined in the properties file. The context will be used later to retrieve the connection string and the destinations.

Preparing connection and session

The connection (class Connection from javax.jms) is created using the ConnectionFactory class (javax.jms). The connection factory has to be initialized using the connection string from our context:

ConnectionFactory fact = (ConnectionFactory)ctx.lookup("connection");

The connection factory is used to create a connection:

Connection conn;conn = fact.createConnection();

After these steps, the connection is created in the conn object and connects to the AMQP broker. However, it is in state STOPPED. The STOPPED state allows the applications to send messages, but not to receive them. In order to receive messages, the connection has to be started (see chapter 3.2.4). This gives the application enough time to prepare for receiving of messages (create receivers, queues, listeners, …).

Using the prepared connection, a session can be created.6 The session is an instance of class Session from package javax.jms:

Session sess;sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);

The Session.CLIENT_ACKNOWLEDGE parameter is instructing the session, that the acknowledgments of the messages will be done manually by the client application. In case the acknowledgement should be done automatically by the application, the Session.AUTO_ACKNOWLEDGE option should be used. Using auto-acknowledgements without transactions is not recommended (see Eurex Clearing FIXML/FpML/Margin Calculator Interface Specification, “Volume E: AMQP Setup & Internals” for more details about reliability).

A session should be used as a long lasting resource and shouldn’t be created too often. For instance creating a new session for sending each message in a tight loop can result in following exception:

javax.jms.JMSException (Exception when sending message:timed out waiting for session to become open (state=DETACHED))

Instead, the application should create a session before entering the loop and re-use the session.

When using the Spring framework, the SingleConnectionFactory should not be used because it recreates Session and Producer each time a message is to be sent. Instead, for example the CachingConnectionFactory should be used. The CachingConnectionFactory keeps both Session and Producer created and attached.

Creating a receiver/sender

After the connection and session have been prepared, a receiver or producer can be prepared next. The producer is an instance of class MessageProducer (javax.jms). The producer can be created by the session, using the method createProducer(…). The producer is always bound to a specific destination, queue or topic which can be created from the context which has been prepared in chapter 3.2.1:

MessageProducer requestProducer;Destination requestDest = (Destination)ctx.lookup("requestAddress");requestProducer = sess.createProducer(requestDest);

The message receiver is an instance of class MessageConsumer7 (javax.jms). The receiver is created in the same way as the producer. Just instead of using the session’s createProducer(…) method, the method createConsumer(…) is used. The receivers for receiving responses or broadcasts are created in the same way. Just the destination (and the address which has been used to create the destination) is different:

MessageConsumer responseConsumer;Destination responseDest = (Destination)ctx.lookup("responseAddress");responseConsumer = sess.createConsumer(responseDest);

There are multiple methods to get the messages from the receiver. One of them is a usage of a message listener. Message listener is a special object, which implements the MessageListener interface from package javax.jms. In order to use the listener, it has to be registered with the producer. The registration can be done using the setMessageListener(…) method of the receiver:

responseConsumer.setMessageListener(new Listener());

The listener will be described in detail in chapter 3.2.6.3.

Starting the connection

With connection, session and receiver ready, the connection can be started:

conn.start();

Only when the connection is started, the application can receive messages from the AMQP broker. If the application is intended to only send messages, the start of the connection is not necessary.

Thread safety

The JMS Session object is not thread safe. Since a MessageProducer/MessageConsumer is bound to a Session it cannot be used from more than one thread at the same time. For multi-threaded access it is necessary to use a separate session (and underlying objects) from each thread.

Receiving/sending messages

Preparing a request message

To prepare a new message, the TextMessage class (javax.jms) can be used. For request messages, only the message body and the reply to key have to be filled. The message body can be entered when a new message is constructed, using the session’s method createTextMessage(…). After preparing the message, the reply to destination, queue or topic can be assigned to it. As before, the destination for the reply to parameter is created from the context object. With the destination being ready, the method setJMSReplyTo(…) can be used to assign it to the message.

TextMessage message = sess.createTextMessage("<FIXML>...</FIXML>");Destination replyDest = (Destination)ctx.lookup("replyAddress");message.setJMSReplyTo(replyDest);

Sending a request message

The message prepared in the previous chapter can be sent using the message producer. Since the producer has been initialized with the destination already at the beginning, it is not necessary to use the request destination again:

requestProducer.send(message);

Depending on the specific client, the messages might be by default sent synchronously or asynchronously.

The request queues have only limited capacity and when the queue is almost full a flow control mechanism will be activated by the broker (the exact queue sizes as well as the flow control thresholds for different interfaces can be found in the Volume E of this documentation). When the flow control is activated for the given request queue, the broker will delay sending the confirmations of received messages. However, the flow control support in the Java JMS API is only limited and when the flow control is activated the client will only wait for certain time and afterwards the send call fails with an exception. When sending the messages asynchronously, the client will continue sending messages and can exceed the queue capacity despite the flow control.

The recommended way how to ensure the queue capacity will not be exceeded in Java JMS client is to track the number of outstanding requests (requests which were sent and not yet responded to) within the application and stop sending messages when the number of outstanding requests reaches the flow control threshold.

Receiving a message using Message listener

Message listener can be every object which implements the MessageListener interface (javax.jms). The assignment of the listener to the receiver is described in chapter 3.2.3. The MessageListener interface has only one method, called onMessage(…). This method is called whenever the receiver receives a new message. The message is passed to the onMessage() method as a parameter and can be either processed inside of the method or passed to another object. In case the session has been created with manual acknowledgements, the message should be acknowledged after its processing is finished (see chapter 3.2.2 for more details). The acknowledgement can be done using the call of the acknowledge() method of the message. When using auto-acknowledgements, it is not necessary to acknowledge the message manually. One listener object can be used by multiple receivers.

public class Listener implements MessageListener{ public void onMessage(Message msg) { // Processing of the message try { // Acknowledging the message manually msg.acknowledge(); } catch (JMSException e) { // Handling the exception } }}

Please note, that the JMS Session object is not thread-safe. Therefore, it should not be used concurrently from multiple threads. If it is planned to receive concurrently messages from multiple sources, then one should create different sessions and create for each session one MessageConsumer, since the MessageConsumer is created and assigned to one session only. Afterwards, each MessageListener will then be assigned to different MessageConsumers and therefore to different Sessions.

Using single session for multiple MessageConsumers has the effect that all calls to their onMessage() methods are serialized and the parallel message consuming is not used.

The message listener is used in the broadcast receiver example.

Receiving a message using the receive() method

Messages can be also received using the receive() method of the MessageConsumer instance:

Message msg = responseConsumer.receive();// Processing of the messagemsg.acknowledge();

Using parameters of the receive() method, the application can either wait until a message is received for a limited (pass the timeout in milliseconds as a parameter to the method) or unlimited time. Using the method receiveNoWait(), a message can be received without waiting (if there is no message waiting, the method will return null).

In case the session has been created with manual acknowledgements, the message should be acknowledged after its processing is finished (see chapter 3.2.2 for more details). The acknowledgement can be done using the call of the acknowledge() method of the message. When using auto-acknowledgements, it is not necessary to acknowledge the message manually.

The receive() method is used in the response receiver example.

Message processing

The received message is returned from the receive() method or passed to the onMessage() listener method as an instance of the more generic class Message (javax.jms). In order to process the message, it has to be casted either to TextMessage or the ByteMessage. The JMS API decides based on the message payload and message properties whether the message will be handled as TextMessage or BytesMessage. The messages received on Eurex Clearing interfaces might be presented in both types, depending on the message sender and the exact content.

With an instance of TextMessage class, it is easy to retrieve the message body. The method getText(…) will return the body as a String object. BytesMessage has the methods getBodyLength() and readByte(), which can be used to read the message body byte by byte. Since the FIXML/FpML/Margin Calculator response or broadcast messages are text based, the byte content has to be transformed to a string using a StringBuilder class.

if (msg instanceof TextMessage){ TextMessage textMessage = (TextMessage) msg; messageText = textMessage.getText(); // process the message body}else if (msg instanceof BytesMessage){ BytesMessage bytesMessage = (BytesMessage) msg; StringBuilder builder = new StringBuilder(); for (int i = 0; i < bytesMessage.getBodyLength(); i++) { builder.append((char)bytesMessage.readByte()); } // process the message body}else{ // Unexpected message delivered}

All other message attributes can be retrieved using the usual getter methods.

Closing the connection

When the application is exiting, it should properly close all AMQP related objects. The receivers, producers, session and connection all have a method close(), which will properly close them:

responseConsumer.close();requestProducer.close();sess.close();conn.close();

Error handling

In JMS the errors are handled by catching the proper exceptions. The errors are either synchronous (e.g. creating the session fails) or asynchronous (e.g. the connection to the broker is lost as the client is waiting for messages). This chapter explains how to properly handle and recover from such situations.

A client application should be designed in a way that it is resilient to the above errors, it doesn’t get stuck when error occurs and at the same time it doesn’t start consuming more and more resources. The main building blocks for each client application are (starting from the top) a connection, session and receiver/sender. Closing a session automatically closes all receivers/senders beneath it and closing a connection automatically closes all underlying sessions.

A typical way of handling the chain of creating the producer/consumer may look like:

try {
  connection = fact.createConnection();
  session = connection.createSession(false,
  Session.CLIENT_ACKNOWLEDGE);
  Destination requestDestination = (Destination)
  ctx.lookup("requestAddress");
  MessageProducer requestProducer;
  requestProducer = session.createProducer(requestDestination);
} catch (JMSException e) {
  e.printStackTrace();
} finally {
  connection.close();
}

In the above example we omitted the creation of the context and connection factory for better readability. Creation of the connection, session or message producer can result in a failure and in that case an exception is thrown. In the catch block, we print the stack trace and continue with a finally block by closing the connection, which is executed also in the case when try block finished without any failure.

The JMSException is the root class for exceptions thrown by JMS API methods. Catching JMSException provides a generic way of handling all exceptions related to the JMS API. One can catch the subclasses of this exception (e.g. IllegalStateException, InvalidDestinationException) which are described in the JMS API documentation. In some cases, depending on the type of error, it is not necessary to close the whole connection, but only the session and/or restart the producer/consumer. However, the JMS doesn’t define what happens in terms of the connection preservation in each case; neither there is a straightforward way of checking whether the connection or session is still valid. Closing and restarting the connection can be therefore considered as the safest option.

The above example illustrates how to catch exceptions synchronously, i.e. when some particular JMS API method fails.

For applications with only asynchronous message consumers, there exists an ExceptionListener interface behavior as follows. If a JMS provider detects a serious problem with a Connection object, it informs the Connection object's ExceptionListener, if one has been registered. It does this by calling the listener's onException method, passing it a JMSException argument describing the problem. In practice, when an exception listener is called, the connection is broken and the JMS service is no longer available for the connection.

The example source codes present the way how to register the ExceptionListener and how to notify the main thread in case an asynchronous exception occurs. Such a class has to implement the ExceptionListener interface and override the onException method. Afterwards, the instantiated class can be registered using Connection’s setExceptionListener method.

Generally, the client application should properly check all JMS API methods for exceptions and in case the exception occurred, an application can, at a minimum, log the problem and clean up its resources. An application can also notify any interested parties that need to be notified of such a problem. An application should be designed with a clean initialization setup, so it would be feasible to reinitialize the JMS objects when the exception occurs (either synchronous or asynchronous).

6. For more details about the differences and relationship between connection and session, please visit JMS documentation or AMQP specification.
7. The JMS and AMQP are using slightly different terminology. The JMS term Consumer corresponds to the AMQP term receiver.