Apache Qpid JMS client for AMQP 0-10
Connection Factory
The Apache Qpid JMS API for AMQP 0-10 has its own connection factory, which is used to resolve the JNDI properties. The properties file has to contain the identification of the context factory:
java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory
Connection URI
The connection URI specifies where the AMQP client should connect and what connection parameters should be used. In the JNDI properties, the connection URI should be placed like this:connectionfactory.[jndiname]=<ConnectionURL>
For example:connectionfactory.connection=<ConnectionURL>
For connecting to Eurex Clearing interfaces the connection string needs to specify:
- The correct IP address/hostname of the broker
- The correct port of the broker
- The usage of SSL / TLS
- The usage of EXTERNAL authentication (authentication using certificates)
- The path to the keystore with the client certificate
- Alias of the member certificate in the keystore
- The path to the truststore with the broker public keys
- Passwords for the keystore and truststore
Example connection string:
amqp://<Username>:<Password>@<ClientID>/<VirtualHost>?brokerlist='tcp://
The username and password are not mandatory, because the authentication will be based only on the certificate. The client ID is the identifier of a Member application and can be defined according to the Member’s needs. The virtual host is also not mandatory. An example connection string for the Eurex Clearing FIXML Interface may then look like this:
amqp://:@App1/?brokerlist='tcp://ecag-fixml-simu1.deutsche-boerse.com:10170?ssl='true'&trust_store='truststore.jks'&trust_store_password='123456'&key_store='ABCFR_ABCFRALMMACC1.keystore'&key_store_password='123456'&sasl_mechs='EXTERNAL'&ssl_cert_alias='abcfr_abcfralmmacc1''
The application based on this client is also able to automatically reconnect in case the connection to the AMQP broker is lost:
amqp://:@App1/?brokerlist='tcp://ecag-fixml-simu1.deutsche-boerse.com:10170?ssl='true'&trust_store='truststore.jks'&trust_store_password='123456'&key_store='ABCFR_ABCFRALMMACC1.keystore'&key_store_password='123456'&sasl_mechs='EXTERNAL'&ssl_cert_alias='abcfr_abcfralmmacc1'&retries='10'&heartbeat='60''&failover='roundrobin'&cyclecount='10'
The failover behavior can be modified using additional connection options – for example cyclecount or retries. The failover based on a node list distributed by the broker (using amq.failover exchange) is not supported on Eurex Clearing brokers.
Additionally, the heartbeat option can be used to specify the heartbeat interval. The default heartbeat interval is set to 120s. Using the heartbeat option the interval can be decreased. The heartbeat it self is implemented directly by the Apache Qpid Java API. The recommended heartbeat interval is between 30 and 120 seconds.
A detailed description of all possible parameters can be found in the Apache Qpid and Red Hat MRG Messaging documentation.
Addresses
Addresses are used to describe the message target or message source.8 The address is a string, which is passed as a parameter to a receiver or a sender, where it is handled. An address always resolves to a node – either queue or topic (in AMQP 0-10 a topic is an AMQP exchange). The complete description of the addressing concept can be found in Red Hat MRG Messaging/Apache Qpid documentation. This chapter will focus on the specific address strings, which can be used to interact with the Eurex Clearing interfaces.
Every application will need 4 different address string types in order to fully utilize the Eurex Clearing interfaces:
- Receiving broadcasts
- Receiving responses
- Sending requests
- “ReplyTo” address in requests
In the JNDI properties, the type destination should be used for all addresses:
destination.[jndiname]=<Address>
for example:
destination.requestAddress=<Address>
NOTE: The formatting of the sample addresses below is for display purposes only. Actual address strings are formatted as a single line and do not contain line breaks.
Receiving responses to requests
As described in the Eurex Clearing FIXML/FpML/Margin Calculator Interface Specification, “Volume E: AMQP Setup & Internals”, to receive responses to requests, an auto-delete response queue has to be created and bound to the response exchange. With a proper address string, the receiver will do this automatically when being created. The following address string can be used as a template:
<ResponseQueueName>;
{
create: receiver,
assert: never,
node:
{
type: queue,
x-declare:
{
auto-delete: true,
exclusive: false,
arguments:
{
'qpid.policy_type': ring,
'qpid.maxcount': **<QueueCount>_**,
'qpid.maxsize': **<QueueSize>_**
}
},
x-bindings:
[
{
exchange: '<ResponseExchange>',
queue: '<ResponseQueueName>',
key: '<ResponseRoutingKey>'
}
]
}
}
The placeholders in this template have to be replaced with the appropriate values:
response.ABCFR_ABCFRALMMACC1.response_queue_1;
{
create: receiver,
assert: never,
node:
{
type: queue,
x-declare:
{
auto-delete: true,
exclusive: false,
arguments:
{
'qpid.policy_type': ring,
'qpid.max_count': 1000,
'qpid.max_size': 1000000
}
},
x-bindings:
[
{
exchange: 'response',
queue: 'response.ABCFR_ABCFRALMMACC1.response_queue_1',
key: 'response.ABCFR_ABCFRALMMACC1.response_queue_1'
}
]
}
}
Alternatively, also the AMQP 0-10 client can use the predefined response queue introduced with the AMQP 1.0 support:
<PredefinedResponseQueue>;
{
node:
{
type: queue
},
create: never,
mode: consume,
assert: never
}
e.g.:
response.ABCFR_ABCFRALMMACC1;
{
node:
{
type: queue
},
create: never,
mode: consume,
assert: never
}
Sending requests
The request messages should be sent to the request exchange, which is specific for each Member. Since the request exchange is already predefined, the address string is simpler than the address string for receiving responses:
<RequestExchange>;
{
node:
{
type: topic
},
create: never
}
The placeholders in this template have to be replaced with the appropriate values:
request.ABCFR_ABCFRALMMACC1;
{
node:
{
type: topic
},
create: never
}
“ReplyTo” address in requests
The “ReplyTo” address is assigned as a property to the request message. It encodes both the reply to exchange as well as the reply to routing key:
<ResponseExchange>/<ResponseRoutingKey>;
{
create: receiver,
node:
{
type: topic
}
}
The placeholders in this template have to be replaced with the appropriate values:
response/response.ABCFR_ABCFRALMMACC1.response_queue_1;
{
create: receiver,
node:
{
type: topic
}
}
When using the predefined response queue, the appropriate routing key has to be used:
response/response.ABCFR_ABCFRALMMACC1;
{
create: receiver,
node:
{
type: topic
}
}
The response to a request message sent with the reply to address above can be received by a receiver created using the example address from chapter 3.3.3.1.
Receiving broadcasts
To receive broadcast messages, it is necessary to create a receiver on the broadcast queues predefined during the technical maintenance. The following address string can be used as a template:
<PredefinedBroadcastQueue>;
{
node:
{
type: queue
},
create: never,
mode: consume,
assert: never
}
The <PredefinedBroadcastQueue> placeholder has to be replaced by the real name of the Members’ broadcast queue. The address string corresponding to the queue for the trade confirmation broadcast stream of Member ABCFR, account ABCFR_ABCFRALMMACC1 will be as follows:
broadcast.ABCFR_ABCFRALMMACC1.TradeConfirmation;
{
node:
{
type: queue
},
create: never,
mode: consume,
assert: never
}
Performance
Receive pre-fetching
AMQP brokers typically push messages to client consumers without explicit client requests (asynchronously, in the background) up to a certain number of unacknowledged messages. The next time a message would be passed on to the application code, it is usually taken from this buffer (avoiding synchronous I/O). This buffering capacity of a client is configurable and it is typically set to hundreds of messages by default. Setting it too low can have a negative impact on message throughput (less overlap of message processing and background I/O). Setting it too high can have a negative impact on client memory consumption (pre-fetch buffers need to hold many messages). Also, all messages pre-fetched by one consumer are “locked” to that consumer (and will not be delivered to any other consumer reading the same queue) until the consumer releases/rejects them. This can lead to a less than ideal load balancing in case of parallel consumption and processing of messages from a single broker queue.
A client normally cannot have more outstanding (unacknowledged) messages than the configured pre-fetch since a broker will stop pushing messages in that situation.
In Java, the pre-fetch capacity can be specified in the connection address string, e.g.:
amqp://:@App1/?brokerlist='tcp://ecag-fixml-simu1.deutsche-boerse.com:10170?ssl='true'&truststore='truststore.jks'&trust_store_password='123456'&key_store='ABCFR_ABCFRALMMACC1.keystore'&key_store_password='123456'&sasl_mechs='EXTERNAL'&ssl_cert_alias='abcfr_abcfralmmacc1''**&maxprefetch='100'_**
Message acknowledgement
Message acknowledgement is asynchronous by default. In case a client application requires synchronous message acknowledgement (e.g. to be absolutely sure that a message was removed from a broker queue before proceeding further), it can be enabled via the “sync_ack” connection parameter, e.g.:
amqp://:@App1/?brokerlist='tcp://ecag-fixml-simu1.deutsche-boerse.com:10170?ssl='true'&truststore='truststore.jks'&trust_store_password='123456'&key_store='ABCFR_ABCFRALMMACC1.keystore'&key_store_password='123456'&sasl_mechs='EXTERNAL'&ssl_cert_alias='abcfr_abcfralmmacc1''&maxprefetch='100'&syncack='true'
When using explicit acknowledgement of received messages (as described in section 3.7.4), doing one-by-one synchronous acknowledgement of messages can severely degrade performance. Message consumption rate is then limited by the network round-trip latency between the client and the broker. For example, a round-trip time of 100ms would limit the synchronous throughput to no more than 10 messages per second.
Message receiving performance in the synchronous acknowledge mode can be improved via block processing of messages. The following Java code example is an extension of the code example from section 3.2.6.4 and illustrates block processing. It will receive and process up to 100 messages, then finally acknowledge all received messages in a single call (thus mitigating the impact of network latency):
int maxCount = 100;
Message message, lastMessage = null;
// receive and process up to 100 messages
while ((message = responseConsumer.receiveNoWait()) != null) {
// process the message
System.out.println(message.toString());
// remember the last processed message
lastMessage = message;
// check total block size
if (--maxCount <= 0) {
break;
}
}
// acknowledge ALL previously received messages
if (lastMessage != null) {
lastMessage.acknowledge();
}
Sender synchronization
When sending messages (as shown in section 3.2.6.2), the send call is asynchronous by default. This means that a message being sent is not guaranteed to arrive at a broker by the time the send call returns in a client application.
Send calls in JMS can be made synchronous (i.e. return only after message delivery was confirmed by the broker) via the “sync_publish” connection parameter, e.g.:
amqp://:@App1/?brokerlist='tcp://ecag-fixml-simu1.deutsche-boerse.com:10170?ssl='true'&truststore='truststore.jks'&trust_store_password='123456'&key_store='ABCFR_ABCFRALMMACC1.keystore'&key_store_password='123456'&sasl_mechs='EXTERNAL'&ssl_cert_alias='abcfr_abcfralmmacc1''&maxprefetch='100'&syncpublish='all'
However, this can have a negative impact on performance in case of significant network latency between the client and the server. It is similar to the synchronous acknowledge after every message discussed in section 3.3.4.2.
Unlike in C++, it is not possible to invoke explicit session synchronization in JMS (to achieve block or batch sending and mitigate the negative impact of network latency).
Logging
The Apache Qpid JMS AMQP 0-10 client libraries use Simple Logging Facade for Java (SLF4J), which serves as a simple facade or abstraction for various logging frameworks. SLF4J allows the end-user to plug in the desired logging framework at deployment time. There is only a single mandatory dependency, namely slf4j-api library for using the client API.
According to the desired logging framework, one of the following SLF4J bindings can be included:
- slf4j-log4j12 - Binding for log4j, a widely used logging framework. Needs to place log4j.jar on the class path.
- slf4j-jdk14 - Binding for java.util.logging, also referred to as JDK 1.4 logging
- slf4j-nop - Binding for NOP, silently discarding all logging.
- slf4j-simple - Binding for Simple implementation, which outputs all events to System.err. Only messages of level INFO and higher are printed. This binding may be useful in the context of small applications.
- slf4j-jcl - Binding for Jakarta Commons Logging. This binding will delegate all SLF4J logging to JCL.
The simplest way to see the Java Qpid API log messages is to include the slf4j-simple binding library into the project path. All Qpid messages from level INFO and higher will be printed into the standard error output.
For more advanced configuration and also if the project already uses log4j logging framework, one can use slf4j-log4j12 binding. It is required to place also the log4j.jar library into the project’s class path. Configuration for the log4j is extensive and can be found on the project’s website (http://logging.apache.org/log4j/). By default when no configuration file could be located the DefaultConfiguration will be used. This will cause logging output to go to the console.
8. Client APIs from other providers will use a different approach for defining message targets and sources. Please refer to the documentation of the used API for more details. ↩