SwiftMQ AMQP 1.0 client

Unlike the Apache Qpid client, the SwiftMQ client is not JMS based client. By default, this client doesn’t support the SSL / TLS client authentication. But the support can be easily added using a custom SSL socket factory.11

SSL Client Authentication

While the SwiftMQ client doesn’t support SSL / TLS client authentication, it allows the use of custom SSL socket factories. That can be utilized to add the support for the SSL client authentication. The new socket factory will be based on the original socket factory shipped with the SwiftMQ client, which supports only SSL / TLS server authentication.

The new socket factory will use a custom key manager when initializing the SSL context. Everything else will be left for the original JSSESocketFactory from the package com.swiftmq.net:

public class MySSLSocketFactory extends JSSESocketFactory {

private String alias;

public MySSLSocketFactory(String alias)

{

this.alias = alias;

}

public Socket createSocket(InetAddress addr, int port) throws UnknownHostException, IOException

{

return initializeSSLContext().getSocketFactory().createSocket(addr, port);

}

public Socket createSocket(String host, int port) throws UnknownHostException, IOException

{

return initializeSSLContext().getSocketFactory().createSocket(host, port);

}

private SSLContext initializeSSLContext()

{

SSLContext sslContext = null;

KeyManager[] keyManagers = null;

try {

sslContext = SSLContext.getInstance("TLS");

} catch (NoSuchAlgorithmException e) {

e.printStackTrace();

}

try {

keyManagers = new KeyManager[] { new MyKeyManager(alias) };

} catch (GeneralSecurityException e1) {

e1.printStackTrace();

} catch (IOException e1) {

e1.printStackTrace();

}

try {

sslContext.init(keyManagers, null, null);

} catch (KeyManagementException e) {

e.printStackTrace();

}

return sslContext;

}

}

The key manager will pass the alias of the certificate which should be used for the client authentication:

public class MyKeyManager extends X509ExtendedKeyManager {

private String alias;

private X509ExtendedKeyManager originalKeyManager;

public MyKeyManager(String alias) throws GeneralSecurityException, IOException

{

this.alias = alias;

KeyStore ks = KeyStore.getInstance("JKS");

ks.load(new FileInputStream(new File(System.getProperty("javax.net.ssl.keyStore"))), System.getProperty("javax.net.ssl.keyStorePassword").toCharArray());

KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());

kmf.init(ks, System.getProperty("javax.net.ssl.keyStorePassword").toCharArray());

originalKeyManager = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];

}

@Override

public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) {

return alias;

}

@Override

public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) {

return originalKeyManager.chooseServerAlias(keyType, issuers, socket);

}

public X509Certificate[] getCertificateChain(String alias)

{

return originalKeyManager.getCertificateChain(alias);

}

public String[] getClientAliases(String keyType, Principal[] issuers)

{

return new String[]{alias};

}

public PrivateKey getPrivateKey(String alias)

{

return originalKeyManager.getPrivateKey(alias);

}

public String[] getServerAliases(String keyType, Principal[] issuers)

{

return originalKeyManager.getServerAliases(keyType, issuers);

}

public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine)

{

return alias;

}

public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine)

{

return originalKeyManager.chooseEngineServerAlias(keyType, issuers, engine);

}

}

The key manager will make sure that the proper certificate is used for the authentication.

Preparing connection and session

The keystore and truststore files which will be used for establishing the connection with the AMQP broker are specified as system properties:

System.setProperty("javax.net.ssl.trustStore", <PathToTruststore>);

System.setProperty("javax.net.ssl.trustStorePassword", <TruststorePassword>);

System.setProperty("javax.net.ssl.keyStore", <PathToKeystore>);

System.setProperty("javax.net.ssl.keyStorePassword", <KeystorePassword>);

e.g.:

System.setProperty("javax.net.ssl.trustStore", "truststore");

System.setProperty("javax.net.ssl.trustStorePassword", "123456");

System.setProperty("javax.net.ssl.keyStore", "ABCFR_ABCFRALMMACC1.keystore");

System.setProperty("javax.net.ssl.keyStorePassword", "123456");

With the new socket factory ready, the connection can be established. First, the AMQPContext has to be initialized to specify that the SwiftMQ library should behave as AMQP client. With the context, the Connection can be created. The username and password should be left as an empty string. To be able to connect to Eurex Clearing interfaces, the SASL mechanism needs to be set to EXTERNAL. Additionally, the client has to be instructed to use the the socket factory prepared in previous chapter. Once everything is ready, the client can connect to the broker:

AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);

Connection connection = new Connection(ctx, <Hostname>, <Port>, "", "");

connection.setMechanism("EXTERNAL");

connection.setSocketFactory(new MySSLSocketFactory(<CertificateAlias>));

connection.connect();

e.g.:

AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);

Connection connection = new Connection(ctx, ecag-fixml-simu1.deutsche-boerse.de, 10170, "", "");

connection.setMechanism("EXTERNAL");

connection.setSocketFactory(new MySSLSocketFactory("abcfr_abcfralmmacc1"));

connection.connect();

Once the connection is established, the session can be created on top of it:

Session session = connection.createSession(1000, 1000);

The createSession(…) method accepts two parameters. These define the size of the incoming and outgoing windows – how many unsettled (unacknowledged) messages can be received from the broker or sent to the broker.

Creating consumer and producer

Once the session is ready, it can be used to create message consumer or producer. To create a consumer, the createConsumer(…) method should be used. It requires several parameters:

  • The address of the message source (queue name)
  • The quality of service
  • Link credit
  • No local filter
  • Selector filter

The quality of service should be in most case AT_LEAST_ONCE, because AT_MOST_ONCE reliability can result in a message loss and EXACTLY_ONCE reliability is not supported by the AMQP broker used on Eurex Clearing interfaces. The no local filter is irrelevant for Eurex Clearing interfaces, because the client application is not allowed to send messages to the same queues where it receives the messages from. The selector filter can be used to receive only selected messages from the queue.

Consumer c = session.createConsumer(<QueueName>, 1000, QoS.AT_LEAST_ONCE, true, null);

For example to receive responses:

Consumer c = session.createConsumer("response.ABCFR_ABCFRALMMACC1 ", 1000, QoS.AT_LEAST_ONCE, true, null);

Or to receive broadcasts:

Consumer c = session.createConsumer("response. ABCFR_ABCFRALMMACC1.TradeConfirmation", 1000, QoS.AT_LEAST_ONCE, true, null);

Creating a message producer is very similar to the consumer. The createProducer(…) method requires only two parameters:

  • The address of the message target (AMQP exchange / topic name)
  • The quality of service

Again, the quality of service should be in most case AT_LEAST_ONCE, because AT_MOST_ONCE reliability can result in a message loss and EXACTLY_ONCE reliability is not supported by the AMQP broker used on Eurex Clearing interfaces.

Producer p = session.createProducer(<ExchangeName>, QoS.AT_LEAST_ONCE);

For example to send requests:

Producer p = session.createProducer("request.CBKFR_TESTCALMMACC1", QoS.AT_LEAST_ONCE);

Sending messages

Messages sent to Eurex Clearing interfaces should contain the XML based payload as String type value:

AMQPMessage msg = new AMQPMessage();

msg.setAmqpValue(new AmqpValue(new AMQPString("<FIXML>…</FIXML>")));

Every request message should contain the reply to information, which tells Eurex system where should the response message be sent.

Properties msgProp = new Properties();

msgProp.setReplyTo(<ReplyToAddress>);

msg.setProperties(msgProp);

The reply to should be set to response exchange / response routing key:

Properties msgProp = new Properties();

msgProp.setReplyTo(new AddressString("response/response.ABCFR_ABCFRALMMACC1"));

msg.setProperties(msgProp);

Once the message is prepared, it can be sent using the producer:

p.send(msg);

In SwiftMQ, the messages are sent asynchronously.

Receiving messages

Messages can be received from the message consumer using several different receive(…) methods. The SwiftMQ client offers a choice between several blocking and non-blocking variants. It also supports possibility to setup asynchronous callback when a message is available. More details about the different methods can be found in the SwiftMQ documentation.

Following example shows how to receive the message using a blocking receiver with timeout. It also shows how to extract the XML from the message and accept it after it is processed.

while (true) {

respMsg = c.receive(60000);

if(respMsg == null) {

System.out.println("-I- No message in queue for 60 seconds. Finishing ...");

break;

}

System.out.println("-I- Response message received");

Data data = respMsg.getData().get(0);

StringBuilder builder = new StringBuilder();

byte[] bytes = data.getValue();

for (int i = 0; i < bytes.length; i++) {

builder.append((char) bytes[i]);

}

System.out.println("XML payload: " + builder.toString());

// Process message

respMsg.accept();

}

Closing the connection

When the client application is exiting, it should close all consumers, producers, session and the connection itself. All these objects can be closed using their close() method.

p.close();

c.close();

session.close();

connection.close();

Logging

A debugging mode can be enabled by setting following system properties to true:

  • swiftmq.amqp.debug
  • swiftmq.amqp.frame.debug

The first one would print additional debug messages, while the second one would print even the sent / received AMQP frames.

11. http://blog.effectivemessaging.com/2013/10/connecting-with-swiftmq-java-client-to.html