Replicated Caching using JMS
Introduction
As of version 1.6, JMS can be used as the underlying mechanism for the replicated operations in Ehcache with the jmsreplication module.
JMS (Java Message Service) is a mechanism for interacting with message queues. Message queues themselves are a very mature piece of infrastructure used in many enterprise software contexts. Because they are a required part of the Java EE standard, the large enterprise vendors all provide their own implementations. There are also several open source choices including Open MQ and Active MQ. Ehcache is integration tested against both of these.
The Ehcache jmsreplication module lets organisations with a message queue investment leverage it for caching.
It provides:
- replication between cache nodes using a replication topic, in accordance with ehcache’s standard replication mechanism
- pushing of data directly to cache nodes from external topic publishers, in any language. This is done by sending the data to the replication topic, where it automatically picked up by the cache subscribers.
- a JMSCacheLoader, which sends cache load requests to a queue. Either an Ehcache cluster node, or an external queue receiver can respond.
Ehcache Replication and External Publishers
Ehcache replicates using JMS as follows:
- Each cache node subscribes to a predefined topic, configured as the <topicBindingName> in ehcache.xml.
- Each replicated cache publishes cache
Element
s to that topic. Replication is configured per cache.
To set up replicated caching based on JMS you need to configure a JMSCacheManagerPeerProviderFactory which is done globally for a CacheManager.
For each cache that wishing to replicate, you add a JGroupsCacheReplicatorFactory element to the cache element.
Configuration
Message Queue Configuration
Each cluster needs to use a fixed topic name for replication. Set up a topic using the tools in your message queue. Out of the box, both ActiveMQ and Open MQ support auto creation of destinations, so this step may be optional.
Ehcache Configuration
Configuration is done in the ehcache.xml.
There are two things to configure:
- The JMSCacheManagerPeerProviderFactory which is done once per CacheManager and therefore once per ehcache.xml file.
- The JMSCacheReplicatorFactory which is added to each cache’s configuration if you want that cache replicated.
The main configuration happens in the JGroupsCacheManagerPeerProviderFactory connect sub-property. A connect property is passed directly to the JGroups channel and therefore all the protocol stacks and options available in JGroups can be set.
Configuring the JMSCacheManagerPeerProviderFactory
Following is the configuration instructions as it appears in the sample ehcache.xml shipped with ehcache:
{Configuring JMS replication}. ===========================The JMS PeerProviderFactory uses JNDI to maintain message queue independence. Refer to the manual for full configuration examples using ActiveMQ and Open Message Queue. Valid properties are: * initialContextFactoryName (mandatory) - the name of the factory used to create the message queue initial context. * providerURL (mandatory) - the JNDI configuration information for the service provider to use. * topicConnectionFactoryBindingName (mandatory) - the JNDI binding name for the TopicConnectionFactory * topicBindingName (mandatory) - the JNDI binding name for the topic name * securityPrincipalName - the JNDI java.naming.security.principal * securityCredentials - the JNDI java.naming.security.credentials * urlPkgPrefixes - the JNDI java.naming.factory.url.pkgs * userName - the user name to use when creating the TopicConnection to the Message Queue * password - the password to use when creating the TopicConnection to the Message Queue * acknowledgementMode - the JMS Acknowledgement mode for both publisher and subscriber. The available choices are AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE and SESSION_TRANSACTED. The default is AUTO_ACKNOWLEDGE. * listenToTopic - true or false. If false, this cache will send to the JMS topic but will not listen for updates. * Default is true.
Example Configurations
Usage is best illustrated with concrete examples for Active MQ and Open MQ.
Configuring the JMSCacheManagerPeerProviderFactory for Active MQ
This configuration works with Active MQ out of the box.
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.jms.JMSCacheManagerPeerProviderFactory"
properties="initialContextFactoryName=ExampleActiveMQInitialContextFactory,
providerURL=tcp://localhost:61616,
topicConnectionFactoryBindingName=topicConnectionFactory,
topicBindingName=ehcache"
propertySeparator=","
/>
You need to provide your own ActiveMQInitialContextFactory for the initialContextFactoryName. An example which should work for most purposes is:
public class ExampleActiveMQInitialContextFactory
extends ActiveMQInitialContextFactory {
/**
* {@inheritDoc"/>
*/
@Override
@SuppressWarnings("unchecked")
public Context getInitialContext(Hashtable environment)
throws NamingException
{
Map<String, Object> data = new ConcurrentHashMap<String, Object>();
String factoryBindingName =
(String)environment.get(JMSCacheManagerPeerProviderFactory
.TOPIC_CONNECTION_FACTORY_BINDING_NAME);
try {
data.put(factoryBindingName, createConnectionFactory(environment));
} catch (URISyntaxException e) {
throw new NamingException("Error initialisating ConnectionFactory"
+ " with message "
+ e.getMessage());
"/>
String topicBindingName =
(String)environment.get(JMSCacheManagerPeerProviderFactory
.TOPIC_BINDING_NAME);
data.put(topicBindingName, createTopic(topicBindingName));
return createContext(environment, data);
"/>
}
Configuring the JMSCacheManagerPeerProviderFactory for {Open MQ”/>
This configuration works with an out of the box Open MQ.
<cacheManagerPeerProviderFactory
class="net.sf.ehcache.distribution.jms.JMSCacheManagerPeerProviderFactory"
properties="initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory,
providerURL=file:///tmp,
topicConnectionFactoryBindingName=MyConnectionFactory,
topicBindingName=ehcache"
propertySeparator=","
/>
To set up the Open MQ file system initial context to work with this example use the
following imqobjmgr
commands to create the requires objects in the context.
imqobjmgr add -t tf -l 'MyConnectionFactory' -j java.naming.provider.url \ =file:///tmp -j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory -f imqobjmgr add -t t -l 'ehcache' -o 'imqDestinationName=EhcacheTopicDest' -j java.naming.provider.url\ =file:///tmp -j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory -f
Configuring the JMSCacheReplicatorFactory
This is the same as configuring any of the cache replicators. The class should be
net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory
.
See the following example:
<cache name="sampleCacheAsync"
maxEntriesLocalHeap="1000"
eternal="false"
timeToIdleSeconds="1000"
timeToLiveSeconds="1000"
overflowToDisk="false">
<cacheEventListenerFactory
class="net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory"
properties="replicateAsynchronously=true,
replicatePuts=true,
replicateUpdates=true,
replicateUpdatesViaCopy=true,
replicateRemovals=true,
asynchronousReplicationIntervalMillis=1000"
propertySeparator=","/>
</cache>
External JMS Publishers
Anything that can publish to a message queue can also add cache entries to ehcache. These are called non-cache publishers.
Required Message Properties
Publishers need to set up to four String properties on each message: cacheName, action, mimeType and key.
cacheName
Property
A JMS message property which contains the name of the cache to operate on. If no cacheName is set the message will be <ignored>. A warning log message will indicate that the message has been ignored.
action
Property
A JMS message property which contains the action to perform on the cache.
Available actions are strings labeled PUT
, REMOVE
and REMOVE_ALL
.
If not set no action is performed. A warning log message will indicate that the message has been ignored.
mimeType
Property
A JMS message property which contains the mimeType of the message. Applies to the PUT
action. If not set the message is interpreted as follows:
ObjectMessage - if it is an net.sf.ehcache.Element, then it is treated as such and stored in the cache. For other objects, a new Element is created using the object in the ObjectMessage as the value and the key property as a key. Because objects are already typed, the mimeType is ignored.
TextMessage - Stored in the cache as value of MimeTypeByteArray. The mimeType should be specified. If not specified
it is stored as type text/plain
.
BytesMessage - Stored in the cache as value of MimeTypeByteArray. The mimeType should be specified. If not
specified it is stored as type application/octet-stream
.
Other message types are not supported.
To send XML use a TextMessage or BytesMessage and set the mimeType to application/xml
.It will be stored in the cache
as a value of MimeTypeByteArray.
The REMOVE
and REMOVE_ALL
actions do not require a mimeType
property.
key
Property
The key in the cache on which to operate on. The key is of type String.
The REMOVE_ALL
action does not require a key property.
If an ObjectMessage of type net.sf.ehcache.Element is sent, the key is contained in the element. Any key set as a property is ignored.
If the key is required but not provided, a warning log message will indicate that the message has been ignored.
Code Samples
These samples use Open MQ as the message queue and use it with out of the box defaults. They are heavily based on Ehcache’s own JMS integration tests. See the test source for more details.
Messages should be sent to the topic that Ehcache is listening on. In these samples it is EhcacheTopicDest
.
All samples get a Topic Connection using the following method:
private TopicConnection getMQConnection() throws JMSException {
com.sun.messaging.ConnectionFactory factory =
new com.sun.messaging.ConnectionFactory();
factory.setProperty(ConnectionConfiguration.imqAddressList, "localhost:7676");
factory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
TopicConnection myConnection = factory.createTopicConnection();
return myConnection;
"/>
PUT a Java Object into an Ehcache Cluster
String payload = "this is an object";
TopicConnection connection = getMQConnection();
connection.start();
TopicSession publisherSession =
connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
ObjectMessage message = publisherSession.createObjectMessage(payload);
message.setStringProperty(ACTION_PROPERTY, "PUT");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
//don't set. Should work.
//message.setStringProperty(MIME_TYPE_PROPERTY, null);
//should work. Key should be ignored when sending an element.
message.setStringProperty(KEY_PROPERTY, "1234");
Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);
connection.stop();
Ehcache will create an Element in cache “sampleCacheAsync” with key “1234” and a Java class String value of “this is an object”.
PUT XML into an Ehcache Cluster
TopicConnection connection = getMQConnection();
connection.start();
TopicSession publisherSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
String value = "<?xml version=\"1.0\"?>\n" +
"<oldjoke>\n" +
"<burns>Say <quote>goodnight</quote>,\n" +
"Gracie.</burns>\n" +
"<allen><quote>Goodnight, \n" +
"Gracie.</quote></allen>\n" +
"<applause/>\n" +
"</oldjoke>";
TextMessage message = publisherSession.createTextMessage(value);
message.setStringProperty(ACTION_PROPERTY, "PUT");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
message.setStringProperty(MIME_TYPE_PROPERTY, "application/xml");
message.setStringProperty(KEY_PROPERTY, "1234");
Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);
connection.stop();
Ehcache will create an Element in cache “sampleCacheAsync” with key “1234” and a value of type MimeTypeByteArray.
On a get from the cache the MimeTypeByteArray will be returned. It is an Ehcache value object from which a mimeType and byte[] can be retrieved. The mimeType will be “application/xml”. The byte[] will contain the XML String encoded in bytes, using the platform’s default charset.
PUT arbitrary bytes into an Ehcache Cluster
byte[] bytes = new byte[]{0x34, (byte) 0xe3, (byte) 0x88};
TopicConnection connection = getMQConnection();
connection.start();
TopicSession publisherSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
BytesMessage message = publisherSession.createBytesMessage();
message.writeBytes(bytes);
message.setStringProperty(ACTION_PROPERTY, "PUT");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
message.setStringProperty(MIME_TYPE_PROPERTY, "application/octet-stream");
message.setStringProperty(KEY_PROPERTY, "1234");
Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);
Ehcache will create an Element in cache “sampleCacheAsync” with key “1234” in and a value of type MimeTypeByteArray.
On a get from the cache the MimeTypeByteArray will be returned. It is an Ehcache value object from which a mimeType and byte[] can be retrieved. The mimeType will be “application/octet-stream”. The byte[] will contain the original bytes.
REMOVE
TopicConnection connection = getMQConnection();
connection.start();
TopicSession publisherSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
ObjectMessage message = publisherSession.createObjectMessage();
message.setStringProperty(ACTION_PROPERTY, "REMOVE");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
message.setStringProperty(KEY_PROPERTY, "1234");
Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);
Ehcache will remove the Element with key “1234” from cache “sampleCacheAsync” from the cluster.
REMOVE_ALL
TopicConnection connection = getMQConnection();
connection.start();
TopicSession publisherSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
ObjectMessage message = publisherSession.createObjectMessage();
message.setStringProperty(ACTION_PROPERTY, "REMOVE_ALL");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);
connection.stop();
Ehcache will remove all Elements from cache “sampleCacheAsync” in the cluster.
Using the JMSCacheLoader
The JMSCacheLoader is a CacheLoader which loads objects into the cache by sending requests to a JMS Queue.
The loader places an ObjectMessage of type JMSEventMessage on the getQueue with an Action of type GET.
It is configured with the following String properties, loaderArgument
.
The defaultLoaderArgument, or the loaderArgument if specified on the load request. To work with the JMSCacheManagerPeerProvider this should be the name of the cache to load from. For custom responders, it can be anything which has meaning to the responder.
A queue responder will respond to the request. You can either create your own or use the one built-into the JMSCacheManagerPeerProviderFactory, which attempts to load the queue from its cache.
The JMSCacheLoader uses JNDI to maintain message queue independence. Refer to the manual for full configuration examples using ActiveMQ and Open Message Queue.
It is configured as per the following example:
<cacheLoaderFactory class="net.sf.ehcache.distribution.jms.JMSCacheLoaderFactory"
properties="initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory,
providerURL=file:///tmp,
replicationTopicConnectionFactoryBindingName=MyConnectionFactory,
replicationTopicBindingName=ehcache,
getQueueConnectionFactoryBindingName=queueConnectionFactory,
getQueueBindingName=ehcacheGetQueue,
timeoutMillis=20000
defaultLoaderArgument=/>
Valid properties are:
- initialContextFactoryName (mandatory) - the name of the factory used to create the message queue initial context.
- providerURL (mandatory) - the JNDI configuration information for the service provider to use.
- getQueueConnectionFactoryBindingName (mandatory) - the JNDI binding name for the QueueConnectionFactory
- getQueueBindingName (mandatory) - the JNDI binding name for the queue name used to do make requests.
- defaultLoaderArgument - (optional) - an application specific argument. If not supplied as a cache.load() parameter this default value will be used. The argument is passed in the JMS request as a StringProperty called loaderArgument.
- timeoutMillis - time in milliseconds to wait for a reply.
- securityPrincipalName - the JNDI java.naming.security.principal
- securityCredentials - the JNDI java.naming.security.credentials
- urlPkgPrefixes - the JNDI java.naming.factory.url.pkgs
- userName - the user name to use when creating the TopicConnection to the Message Queue
- password - the password to use when creating the TopicConnection to the Message Queue
- acknowledgementMode - the JMS Acknowledgement mode for both publisher and subscriber. The available choices are AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE and SESSION_TRANSACTED. The default is AUTO_ACKNOWLEDGE.
Example Configuration Using Active MQ
<cache name="sampleCacheNorep"
maxEntriesLocalHeap="1000"
eternal="false"
timeToIdleSeconds="1000"
timeToLiveSeconds="1000"
overflowToDisk="false">
<cacheEventListenerFactory
class="net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory"
properties="replicateAsynchronously=false, replicatePuts=false,
replicateUpdates=false, replicateUpdatesViaCopy=false,
replicateRemovals=false, loaderArgument=sampleCacheNorep"
propertySeparator=","/>
<cacheLoaderFactory
class="net.sf.ehcache.distribution.jms.JMSCacheLoaderFactory"
properties="initialContextFactoryName=net.sf.ehcache.distribution.jms.
TestActiveMQInitialContextFactory,
providerURL=tcp://localhost:61616,
replicationTopicConnectionFactoryBindingName=topicConnectionFactory,
getQueueConnectionFactoryBindingName=queueConnectionFactory,
replicationTopicBindingName=ehcache,
getQueueBindingName=ehcacheGetQueue,
timeoutMillis=10000"/>
</cache>
Example Configuration Using Open MQ
<cache name="sampleCacheNorep"
maxEntriesLocalHeap="1000"
eternal="false"
timeToIdleSeconds="100000"
timeToLiveSeconds="100000"
overflowToDisk="false">
<cacheEventListenerFactory
class="net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory"
properties="replicateAsynchronously=false, replicatePuts=false,
replicateUpdates=false, replicateUpdatesViaCopy=false,
replicateRemovals=false"
propertySeparator=","/>
<cacheLoaderFactory
class="net.sf.ehcache.distribution.jms.JMSCacheLoaderFactory"
properties="initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory,
providerURL=file:///tmp,
replicationTopicConnectionFactoryBindingName=MyConnectionFactory,
replicationTopicBindingName=ehcache,
getQueueConnectionFactoryBindingName=queueConnectionFactory,
getQueueBindingName=ehcacheGetQueue,
timeoutMillis=10000,
userName=test,
password=test"/>
</cache>
Configuring Clients for Message Queue Reliability
Ehcache replication and cache loading is designed to gracefully degrade if the message queue infrastructure stops. Replicates and loads will fail. But when the message queue comes back, these operations will start up again.
For this to work, the ConnectionFactory used with the specific message queue needs to be configured correctly. For example, with Open MQ, reconnection is configured as follows:
- imqReconnect=’true’ - without this reconnect will not happen
- imqPingInterval=’5’ - Consumers will not reconnect until they notice the connection is down. The ping interval
- does this. The default is 30. Set it lower if you want the Ehcache cluster to reform more quickly.
- Finally, unlimited retry attempts are recommended. This is also the default.
For greater reliability consider using a message queue cluster. Most message queues support clustering. The cluster configuration is once again placed in the ConnectionFactory configuration.
Tested Message Queues
Open MQ
This open source message queue is tested in integration tests. It works perfectly.
Active MQ
This open source message queue is tested in integration tests. It works perfectly other than having a problem with temporary reply queues which prevents the use of JMSCacheLoader. JMSCacheLoader is not used during replication.
Oracle AQ
Versions up to an including 0.4 do not work, due to Oracle not supporting the unified API (send) for topics.
JBoss Queue
Works as reported by a user.
Known Issues
Active MQ Temporary Destinatons
ActiveMQ seems to have a bug in at least ActiveMQ 5.1 where it does not cleanup temporary queues, even though they have been deleted. That bug appears to be long standing but was though to have been fixed. See http://issues.apache.org/activemq/browse/AMQ-1255.
The JMSCacheLoader uses temporary reply queues when loading. The Active MQ issue is readily reproduced in Ehcache integration testing. Accordingly, use of the JMSCacheLoader with ActiveMQ is not recommended. Open MQ tests fine.
Active MQ works fine for replication.
WebSphere 5 and 6
Websphere Application Server prevents MessageListeners, which are not MDBs, from being created in the container.
While this is a general Java EE limitation, most other app servers either are permissive or can be configured to be permissive. WebSphere 4 worked, but 5 and 6 enforce the restriction.
Accordingly, Ehcache together with JMS cannot be used with WebSphere 5 and 6.