Apache Qpid JMS client for AMQP 1.0

Connection Factory

The Apache Qpid JMS client for AMQP 1.0 has its own connection factory, which is used to resolve the JNDI properties. The properties file has to contain the identification of the context factory:

java.naming.factory.initial=org.apache.qpid.jms.jndi.JmsInitialContextFactory

Connection URI

The connection URI specifies where the AMQP client should connect and what connection parameters should be used. In the JNDI properties, the connection URI should be placed like this:connectionfactory.[jndiname]=<ConnectionURL>

for example:connectionfactory.connection=<ConnectionURL>

For connecting to Eurex Clearing interfaces the connection string needs to specify:

  • The correct IP address/hostname of the broker
  • The correct port of the broker
  • The path to the keystore with the client certificate
  • Alias of the member certificate in the keystore
  • The path to the truststore with the broker public keys
  • Passwords for the keystore and truststore
  • Idle timeout / heartbeat

The connection URI has a following format:amqp://hostname:port[?option=value[&option2=value...]]

or for SSL connections:amqps://hostname:port[?option=value[&option2=value...]]

The options needed to connect to the Eurex AMQP Interfaces are:

  • jms.clientID
  • transport.trustStoreLocation
  • transport.trustStorePassword
  • transport.keyStoreLocation
  • transport.keyStorePassword
  • transport.keyAlias
  • amqp.idleTimeout

Example connection string:

amqps://<Hostname>:<Port>?jms.clientID=<ClientID>&transport.trustStoreLocation=<PathToTruststore>&transport.trustStorePassword=<TruststorePassword>&transport.keyStoreLocation=<PathToKeystore>&transport.keyStorePassword=<KeystorePassword>&transport.keyAlias=<KeystoreAlias>&amqp.idleTimeout=<HeartbeatInterval>

The client ID is the identifier of a Member application and can be defined according to the Member’s needs. The Qpid JMS client is using default idle timeout / heartbeat of 60 seconds. Because Eurex Clearing AMQP brokers currently do not support heartbeat, it has to be set to 0 to switch it off.

An example connection string for the Eurex Clearing FIXML Interface may then look like this:

amqps://ecag-fixml-simu1.deutsche-boerse.com:10170?jms.clientID=my-test-client1&transport.trustStoreLocation=truststore.jks&transport.trustStorePassword=123456&transport.keyStoreLocation=ABCFR_ABCFRALMMACC1.keystore&transport.keyStorePassword=123456&transport.keyAlias=abcfr_abcfralmmacc1&amqp.idleTimeout=0

Additional connection options can be found in the documentation on http://qpid.apache.org/documentation.html

The client supports automatic failover / reconnect. To enable the failover, the connection URI has to be wrapped into a failover prefix. The options starting with “jms.” Should be used outside of the failover enclosure while the other options (e.g. starting with “amqp.” or “transport.”) should stay inside. Alternatively, the other options can used outside of the failover enclosure with the prefix “failover.nested.” – such options would apply to all brokers.

failover:(amqps://hostname:port[?option=value[&option2=value...]])[?failoverOption=value[&failoverOption2=value...][&jmsOption=value...][&nestedOption=value...]]

The failover supports among other following options:

  • failover.reconnectDelay
  • failover.maxReconnectAttempts
  • failover.useReconnectBackOff
  • failover.reconnectBackOffMultiplier

These options can be used to control how many times and in which time intervals the client should try to reconnect to the broker. For example:

failover:(amqps://ecag-fixml-simu1.deutsche-boerse.com:10170?...)?failover.reconnectDelay=30000&failover.maxReconnectAttempts=10&failover.useReconnectBackOff=false&jms.clientID=myClient&failover.nested.amqp.idleTimeout=0

Additional failover options can be found in the documentation on http://qpid.apache.org/documentation.html

Addresses

Addresses are used to describe the message target or message source.9 The address is a string, which is passed as a parameter to a receiver or a sender, where it is handled. An address always resolves to a node – either queue or topic. This chapter will focus on the specific address strings, which can be used to interact with the Eurex Clearing interfaces.

Every application needs 4 different address string types in order to fully utilize the Eurex Clearing interfaces:

  1. Receiving broadcasts
  2. Receiving responses
  3. Sending requests
  4. “ReplyTo” address in requests

In the JNDI properties, the type queue should be used for receiving messages (broadcast address and response address) and the type topic for sending messages (reply address and request address):

queue.[jndiname]=<Address>topic.[jndiname]=<Address>

for example:

topic.requestAddress=<Address>queue.responseAddress=<Address>

Receiving responses to requests

As described in the Eurex Clearing FIXML/FpML/Margin Calculator Interface Specification, “Volume E: AMQP Setup & Internals”, receiving responses to requests can be done either using an auto-delete response queue which has to be created by the client application and bound to the response exchange or using the predefined response queue. Unlike the AMQP 0-10 client, which receives the responses using the auto-delete response queues, the 1.0 client is already using the predefined response queue.

The following address string can be used as a template:

<ResponseQueueName>

The placeholders in this template have to be replaced with the appropriate values:

queue.responseAddress=response.ABCFR_ABCFRALMMACC1

Sending requests

The request messages should be sent to the request exchange, which is specific for each Member. Since the request exchange is already predefined, the address string is simpler than the address string for receiving responses:

<RequestExchange>

The placeholders in this template have to be replaced with the appropriate values:

topic.requestAddress=request.ABCFR_ABCFRALMMACC1

“ReplyTo” address in requests

The “ReplyTo” address is assigned as a property to the request message. It encodes both the reply to exchange as well as the reply to routing key:

<ResponseExchange>/<ResponseRoutingKey>

The placeholders in this template have to be replaced with the appropriate values:

topic.replyAddress=response/response.ABCFR_ABCFRALMMACC1

The response to a request message sent with the reply to address above can be received by a receiver created using the example address from chapter 3.4.3.1.

Receiving broadcasts

To receive broadcast messages, it is necessary to create a receiver on the broadcast queues predefined during the technical maintenance. The following address string can be used as a template:

<PredefinedBroadcastQueue>

The <PredefinedBroadcastQueue> placeholder has to be replaced by the real name of the Members’ broadcast queue. The address string corresponding to the queue for the trade confirmation broadcast stream of Member ABCFR, account ABCFR_ABCFRALMMACC1 will be as follows:

queue.responseAddress=broadcast.ABCFR_ABCFRALMMACC1.TradeConfirmation

Performance

Receive pre-fetching

AMQP brokers typically push messages to client consumers without explicit client requests (asynchronously, in the background) up to a certain number of unsettled messages. The next time a message would be passed on to the application code, it is usually taken from this buffer (avoiding synchronous I/O). This buffering capacity of a client is configurable and it is typically set to hundreds of messages by default. Setting it too low can have a negative impact on message throughput (less overlap of message processing and background I/O). Setting it too high can have a negative impact on client memory consumption (pre-fetch buffers need to hold many messages). Also, all messages pre-fetched by one consumer are “locked” to that consumer (and will not be delivered to any other consumer reading the same queue) until the consumer releases/rejects them. This can lead to a less than ideal load balancing in case of parallel consumption and processing of messages from a single broker queue.

In Apache Qpid JMS client for AMQP 1.0 client, the pre-fetch capacity can be specified in the connection address string using several options. For receiving from queues, following two options are relevant:

  • jms.prefetchPolicy.queuePrefetch
  • jms.prefetchPolicy.all

The default prefetch limit is set to 1000. Additional prefetch options can be found in the documentation on http://qpid.apache.org/documentation.html

Message acknowledgement

Message acknowledgement is synchronous by default. In case a client application requires asynchronous message acknowledgement (e.g. doesn’t require guarantee that the acknowledged message was removed from a broker queue before proceeding further), it can be enabled using the “jms.sendAcksAsync” connection option.

When using explicit acknowledgement of received messages (as described in section 3.7.4), doing one-by-one synchronous acknowledgement of messages can severely degrade performance. Message consumption rate is then limited by the network round-trip latency between the client and the broker. For example, a round-trip time of 100ms would limit the synchronous throughput to no more than 10 messages per second.

Message receiving performance in the synchronous acknowledge mode can be improved via block processing of messages. The following Java code example is an extension of the code example from section 3.2.6.4 and illustrates block processing. It will receive and process up to 100 messages, then finally acknowledge all received messages in a single call (thus mitigating the impact of network latency):

int maxCount = 100;

Message message, lastMessage = null;

// receive and process up to 100 messages

while ((message = responseConsumer.receiveNoWait()) != null) {

// process the message

System.out.println(message.toString());

// remember the last processed message

lastMessage = message;

// check total block size

if (--maxCount <= 0) {

break;

}

}

// acknowledge ALL previously received messages

if (lastMessage != null) {

lastMessage.acknowledge();

}

Sender synchronization

When sending messages (as shown in section 3.2.6.2), the send call is by default synchronous for messages which are persistent and are not part of a transaction. All other messages are sent asynchronously. Asynchronous sending means that a message being sent is not guaranteed to arrive at a broker by the time the send call returns in a client application. When a message is sent synchronously, the send call will wait for confirmation from the broker. However, sending messages synchronously can have a significant negative impact on performance in case of significant network latency between the client and the server.

Unlike in C++, it is not possible to invoke explicit session synchronization in JMS (to achieve block or batch sending and mitigate the negative impact of network latency).

Following options in the connection URI can be used to change this behavior:

  • jms.forceAsyncSend
  • jms.alwaysSyncSend

The option jms.alwaysSyncSend can force the client to send all messages synchronously. The option jms.forceAsyncSend does the exact opposite – it forces the client to send all messages asynchronously.

Logging

The Apache Qpid JMS client for AMQP 1.0 uses Simple Logging Facade for Java (SLF4J), which serves as a simple facade or abstraction for various logging frameworks. SLF4J allows the end-user to plug in the desired logging framework at deployment time. There is only a single mandatory dependency, namely slf4j-api library for using the client API.

According to the desired logging framework, one of the following SLF4J bindings can be included:

  • slf4j-log4j12 - Binding for log4j, a widely used logging framework. Needs to place log4j.jar on the class path.
  • slf4j-jdk14 - Binding for java.util.logging, also referred to as JDK 1.4 logging
  • slf4j-nop - Binding for NOP, silently discarding all logging.
  • slf4j-simple - Binding for Simple implementation, which outputs all events to System.err. Only messages of level INFO and higher are printed. This binding may be useful in the context of small applications.
  • slf4j-jcl - Binding for Jakarta Commons Logging. This binding will delegate all SLF4J logging to JCL.

The simplest way to see the Java Qpid API log messages is to include the slf4j-simple binding library into the project path. All Qpid messages from level INFO and higher will be printed into the standard error output.

For more advanced configuration and also if the project already uses log4j logging framework, one can use slf4j-log4j12 binding. It is required to place also the log4j.jar library into the project’s class path. Configuration for the log4j is extensive and can be found on the project’s website (http://logging.apache.org/log4j/). By default when no configuration file could be located the DefaultConfiguration will be used. This will cause logging output to go to the console.

To enable the logging of AMQP frames sent / received by the client, the Frame logger can be enabled by using following option in the connection URI:

amqp.traceFrames=true

Another option can be used to display the raw AMQP bytes which the client sends / receives:

amqp.traceBytes=true

9. Client APIs from other providers will use a different approach for defining message targets and sources. Please refer to the documentation of the used API for more details.