Friday, October 7, 2011

Oracle AQ with Spring

In my current project we use a Java SEDA. The MOM to support this is IBM (Websphere) MQ. Our most used object is the queue, which enables us to handle events asynchronously and by multiple consumers which greatly improves scalability and robustness.

From a Java point of view the MOM implementation is really not that important, as it is accessed via the JMS API. So whether its Websphere MQ, JBoss MQ, ... as long as it has JMS support its pretty transparent. We do use some minor MQ specific extensions (PCF) to get queue depths and the likes, but that is more from an operational management point of view.

The choice of MQ was made before I joined the project, probably because other legacy subsystems have the least trouble dealing with MQ since they are already IBM based. Although we don't benefit a lot from it possibilities, since there is no QueueManager to QueueManager communication or the likes in which MQ is really strong. But it has to be said that MQ is a solid and mature product with a lot of possibilities.

The downside is probably its price (especially if you under-use it) and that it requires specific MQ knowledge to operate and maintain a running instance. For example; moving messages from a queue to another natively on Solaris is not a trivial thing if your not into the MQ administration (no, the 'MOVE' command is not supported on MQ Solaris).

Since we are using 2 resources most of the time, this also implies that our backends are running XA transactions to make 2PC work between our MOM and RDBMS (Oracle).
A while ago someone threw the idea on the table to switch to Oracle AQ (Advanced Queues) which is the Oracle MOM implementation. I'm not going in the area of comparing MQ vs AQ, but the fact is that AQ supports JMS and is a fully fledged MOM (It also has Topics, btw), so on paper it is more then enough for our usages.

Cool detail is that JMS Connection that you obtain is actually backed by a normal (JDBC) database connection. In fact, what happens is that the AQ driver uses a datasource under the hood. If you do a Queue.publish() or a Queue.read() the AQ driver will translate that to stored procedure and send them through the SQL datasource you instantiated it with. This also means we could drop our XA, since we only need to enlist a single resource for both our MOM and RDBMS access.


To set this up my first idea was to look for a resource adapter (RAR) which would enable AQ via the application server (Webshere MQ also ships with a JEE RAR). At that point I did not knew how it would handle the JDBC connection sharing if connection would be made via the RAR, but anyway. Quickly I found out that there is no real AQ resource adapter available for other JEE servers then Oracle AS itself (for this I was using Glassfish btw).

There is: genericjmsra but you cannot use it "properties based" like you enter the uri/username/password of the MOM. See here for its AQ specific manual:

Quote:

Oracle JMS client does not allow creation of ConnectionFactory, QueueConnectionFactory or
TopicConnectionFactory utilizing JavaBean approach. The factory creation is only possible through AQjmsFactory class provided in the Oracle jms client api. However fortunately, Oracle does support the JNDI lookup approach. We will be focusing on the JNDI approach for Oracle AQ and glassfish integration

This means you need an Oracle LDAP server in which some remote objects are published which are then looked up by the RA. So sharing the same JDBC connection for relational access and AQ will certainly not be possible this way.

Fortunately you can use the AQJmsFactory (that's the main factory which you feed a datasource and it gives you back a JMS ConnectionFactory) directly from your code, but that would require some boiler plate code as the AQJmsFactory checks that the actual connection is a direct Oracle connection.

If you are using a JDBC pool, like for example C3PO or Commons DBCP they will wrap the connections (in order to suppress closes etc) and these connections will be rejected because they are no direct instance of the Oracle connection. Thankfully a new Spring module was released at the right time and comes to the rescue with: Spring jdbc-extensions.

This is that boiler plate you want to seamlessly integrate Oracle AQ with your existing Spring managed datasource and transactions. The extension will make sure the Oracle AQJmsFactory is given a proxy which will be an instance of the Oracle connection. The proxy enables us to control what we give the Oracle AQ implementation.

For example when it tries to call 'close' we will suppress the call, since we know it will be handled by transaction manager (datasource,hibernate, jta, ...) later on. If your interested in this check the source at: org.springframework.data.jdbc.config.oracle.AqJmsFactoryBeanFactory.
That is the custom namespace handler for the AQ Spring XML config which creates the appropriate beans to do the boiler plate.

In this first example we create a scenario in which an event is received (Q1), a database record is inserted (T1) and a second event is published (Q2). All of this should run in one transaction, so if there is a failure at any point everything should be reverted (1 message back on Q1, no records in T1, and no messages on Q2). If everything succeeds, the message from Q1 should be processed, the record inserted and a new message published on Q2.

To start I'm going to setup the two AQ queue's and their queue table:

EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'Q1_T', queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'Q1',  Queue_table => 'Q1_T', max_retries => 2147483647);
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'Q1');

EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'Q2_T', queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE');
EXECUTE DBMS_AQADM.CREATE_QUEUE (Queue_name => 'Q2',  Queue_table => 'Q2_T', max_retries => 2147483647);
EXECUTE DBMS_AQADM.START_QUEUE (Queue_name => 'Q2');

On AQ each Queue needs to have a corresponding queue table. The queue table is the table where the data is physically stored. You will never talk to a queue table directly, but you can use it with DML to query them via your favorite database IDE. On each you can specifiy additional properties, on the queue table you have to specifiy which payload it will have. On the queue itself you can specifiy after how many unsuccesful dequeues the message is moved to the exception queue.

In our project we make use of an application level failover and DLQ management system with separate queueing. So we don't need this feature. There is however no way to turn this off, so we've chosen the max setting (which is Integer.MAX_VALUE). Btw; the exception queues are generated automatically, you have no control over them.

To check if everything is created:

select * from all_queues where name like 'Q1%' or name like 'AQ$_Q1%' or name like 'Q2%' or name like 'AQ$_Q2%'
The results:

NAME QUEUE_TABLE QID QUEUE_TYPE MAX_RETRIES
Q2 Q2_T 365831 NORMAL_QUEUE 2147483647
AQ$_Q2_T_E Q2_T 365830 EXCEPTION_QUEUE 0
Q1 Q1_T 365816 NORMAL_QUEUE 2147483647
AQ$_Q1_T_E Q1_T 365815 EXCEPTION_QUEUE 0

Next we'll setup our Spring config. The goal is to create a message consumer that listens for messages on Q1 and processes them. Our processing will consist of inserting a record in T1 and putting a message on Q2.
 <!-- Sets up the JMS ConnectionFactory, in this case backed by Oracle AQ -->
 <bean id="oracleNativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.SimpleNativeJdbcExtractor"/>
 <orcl:aq-jms-connection-factory id="connectionFactory" data-source="dataSource" use-local-data-source-transaction="true" native-jdbc-extractor="oracleNativeJdbcExtractor"/>

 <tx:annotation-driven/>

 <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" lazy-init="true">
  <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
  <property name="url" value="jdbc:oracle:thin:host:port:SID"/>
  <property name="username" value="Scott"/>
  <property name="password" value="Tiger"/>
 </bean>

 <!-- Using DataSourceTxManager, but could also be HibernateTxManager or JtaTxManager -->
 <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" lazy-init="true">
  <property name="dataSource" ref="dataSource"/>
 </bean>

 <!-- You can also construct the JMSTemplate in code, but we'll do it here so its all together in one place -->
 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="defaultDestinationName" value="Q2"/>
  <property name="sessionTransacted" value="true"/>
 </bean>

 <bean id="myMessageListener" class="be.error.jms.MyMessageListener">
  <property name="dataSource" ref="dataSource"/>
  <property name="jmsTemplate" ref="jmsTemplate"/>
 </bean>

 <!-- Once it is started, it will try to read messages from Q1 and let 'messageListener' process them -->
 <bean id="messageListenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="transactionManager" ref="transactionManager"/>
  <property name="destinationName" value="Q1"/>
  <property name="messageListener" ref="myMessageListener"/>
  <property name="sessionTransacted" value="true"/>
 </bean>
As you can see the magic is in the orcl:aq-jms-connection-factory which will make a JmsConnectionFactory available under the id 'connectionFactory' and using our datasource to do the AQ queueing.

Very important: if you don't want to spend half a day investigating some weird transaction behaviour (I even mistakenly thought it was a bug and pointed that out here) I suggest to read this:

In my configuration you will see that the 'sessionTransacted' is set to "true" for the JmsTemplate and for the DefaultMessageListenerContainer. This makes sense as we are running outside of a JEE managed environment and we want to have local transactions for our JMS operations. The theory behind it is however a bit more complex.

When running outside of a JEE managed environment you have the choice of letting your session interaction be part of a local transaction. This is controlled by the sessionTransacted setting (it maps directly on the JMS API). This means that if you consume messages from different objects belonging to the same session, they will be controlled in a single transaction.
For example, I create QueueSession #1 and I use it to consume a message from Q1 and consume another message from Q2. After consuming both messages, I can issue a session.rollback() and everything is brought back to its initial state. If I would have used no transactions, I would be working with an acknowledgement mode. Suppose I would have chosen CLIENT_ACKNOWLEDGE then I had to acknowledge on message level whether my message was successfully consumed. So I would have first retrieved message #1 from Q1 and then message #2 to Q2 (all via QueueSession #1). In the end I would have to do:

messageOne.acknowledge();
//system crashes here
messageTwo.acknowledge();
This could of course create inconsistency as in my example messageOne was marked consumed but messageTwo wasn't. This is only a problem if your unit of work should be treated in an atomic way. If it is you should use at least local transactions.

When you want to consume/produce messages from a Queue and do interaction with another resource (RDBMS) for example you should use a distributed transactionmanager (in our case that would mean JTA). But remember that we are not dealing with different resources here, it all comes down to a single database connection. So in our case the "local transaction" is a bit "longer local" then it would normally be as it also includes all our (SQL) calls made to that same database connection as the JMS infrastructure is using.

In our case the DataSourceTransactionManager will control the local transaction, and that includes JMS operations as well as SQL operations issued via JDBC. It is that component which will call commit or rollback. there is no need for intermediate commits on the queueSession.

So basically: by setting sessionTransacted to true, no one performs intermediate commits and leaves everything to whoever controls the transaction, in our case DataSourceTransactionManager.
Make sure you use JdbcTemplate for direct JDBC access and JmsTemplate for MOM access. Make sure sessionTransacted is set to true when you should create JmsTemplate in code. Also, the DefaultMessageListenerContainer is a JMS receiver and must also be sessionTransacted for the same reason.

You might want to be tempted to remove the sessionTransacted from the JmsTemplate and DefaultMessageListenerContainer if you are running in an JEE environment. The JMS API says that the values to sessionTransacted and acknowledgementMode are ignored in such case.
While this is true in general, it is not true in this case. The Oracle AQ will not properly detected that it is running in a JEE JTA environment if you are using anything else then Oracle AS. If you remove the property, then the driver will perform intermediate commits and your transaction will be broken. So also in JEE mode you will have to leave this set to true!

But don't worry, in the JTA case your datasource will then be XA enabled and the transactionmaanger performing commmits will be the JtaTranasctionManager. As far as the AQ driver is concerned it sees no difference (all transaction handling an coordination is done at an higher level).

Also, I'm using a DataSourceTransactionManager here, since I only require direct JDBC access.
If you would be using hibernate, you could use HibernateTransactionManager. You could then do AQ, plain JDBC access and work with hibernate's SessionFactory at the same time.
If you would still have another resource (maybe a 2nd RDBMS) and still want XA, you can simply plugin the JTA transaction manager without any problem (its just a matter of switching configuration).

For the Java messageListener part, this is all standard:

public class MyMessageListener implements SessionAwareMessageListener<Message> {

 private DataSource dataSource;
 private JmsTemplate jmsTemplate;


 @Override
 public void onMessage(Message message, Session session) throws JMSException {
  //Message received from Q1 via 'messageListenerContainer'
  TextMessage textMessage = (TextMessage) message;
  System.out.println("Received message with content:" + textMessage.getText());

  //Insert its content into T1
  new JdbcTemplate(dataSource).update("insert into T1 values (?)", textMessage.getText());
  System.out.println("Inserted into table T1");

  //Publish a message to Q2
  jmsTemplate.send(new MessageCreator() {
   @Override
   public Message createMessage(Session session) throws JMSException {
    TextMessage textMessage = session.createTextMessage();
    textMessage.setText("payload");
    return textMessage;
   }
  });
  System.out.println("Sended message to Q2");
 }

 public void setDataSource(DataSource dataSource) {
  this.dataSource = dataSource;
 }

 public void setJmsTemplate(JmsTemplate jmsTemplate) {
  this.jmsTemplate = jmsTemplate;
 }
}
I then created a small forever blocking test case to quickly fire up the application context so that the DefaultMessageListenerContainer could start looking for messages on Q1.

@Test
@ContextConfiguration(locations = { "classpath:/spring/aq-test.xml" })
public class OracleAqTransactionResourceTest extends AbstractTestNGSpringContextTests {

 @Autowired
 private DataSource dataSource;
 private JdbcTemplate jdbcTemplate;

 @BeforeMethod
 public void setup() {
  jdbcTemplate = new JdbcTemplate(dataSource);
 }

 public void testSingleTransaction() {
  System.out.println("Running...");
  blockUntillReadyOrTimeout();
  System.out.println("Done.");
 }

 private void blockUntillReadyOrTimeout() {
  while (true) {
   try {
    Thread.sleep(2000);
   } catch (InterruptedException e) {
    throw new RuntimeException(e);
   }
  }
 }
}
After launching the test, I inject a message into Q1 (I use Oracle SQL developer):

DECLARE
    msg SYS.AQ$_JMS_TEXT_MESSAGE;
    queue_options       DBMS_AQ.ENQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id RAW(30);

BEGIN
      msg := SYS.AQ$_JMS_TEXT_MESSAGE.CONSTRUCT();  
      msg.set_text('testing 123');
      DBMS_AQ.ENQUEUE(
        queue_name => 'Q1',
        enqueue_options => queue_options,
        message_properties => message_properties,
        payload => msg,
        msgid => message_id);
        commit;
END;
And off we go:

Running...
Received message with content:testing 123
Inserted into table T1
Sended message to Q2
In oracle we see that the message is present on Q2 (at least its queue table):
And that a record is inserted into T1:

You are free to play with some transaction scenario's, as creating multiple (possibly nested) transactions, let them rollback etc. I performed 5 scenario's and they all worked fine.

PS. make sure you use at least spring-jdbc 1.0_M2 (or up) since we discovered a small bug in M1 which could cost you some time to investigate :)

18 comments:

  1. Why not just use a JmsTransactionManager on both the JmsTemplate and the MessageListener instead of setting "sessionTransacted=true" on both? Using a transactionmanager reference it's just a matter of switching bean definitions between JmsTransactionManager for local transactions and JtaTransactionManager for XA transactions, or am I missing something?

    ReplyDelete
  2. That is a good question, but if you re-read my post you'll notice there *IS* a transaction manager handling the transactions (DataSourceTransactionManager). Remember that JMS also boils down to JDBC operations when using AQ. The transaction settings on the JmsTemplate and DefaultListenerContainer are required specifically to let the commit/rollback over to the transaction manager. Without those settings (sessionTransacted = false) you would fallback to acknowledgement mode and you should manually acknowledge messages (or let it be automatically decided by the impl).

    Also, notice that I'm using the DataSourceTransactionManager (or HibernateTranactionManager when using Hibernate) and NOT the JmsTransactionManager. Using the The JmsTransactionManager is a JMS specific "local" transaction manager for transacted sessions. If you have transacted sessions there is someone who has to call 'commit' or 'rollback' on the JMS session when your UOW ends. That is what the JmsTransactionManager will do for you, declaratively if you use Springs @Transactional.

    The JmsTransactionManager is per default not synchronized. This means that if you issue another resource it will not detect a running transaction and will not synchronize with it.

    Supose you have a DefaultMessageListenerContainer that processes a message and you use the JmsTransactionManager. A transaction will be started and the message received in that transaction: all good. But problems will arrise if you try to do JDBC access (via JdbcTemplate for example) since it will not detect a running transaction. In this case a new connection is openened en closed directly after the statement; thus the JDBC operation was not part of the transaction. If somewhere in the processing an exception is thrown, the JmsTransactionManager will rollback the transaction and the message will be left on the queue; but the JDBC operation was commited.

    If you use the DatasourceTransactionManager things are different. Before a message is received a new JDBC connection is opened and attached to a transaction (synchronized). The message is then received (remember: this boils down to a JDBC operation by the AQ driver) and every JDBC call you make (while processing the message) will participate in that transaction as usual, since in this case the transaction IS detected by for example the JdbcTemplate. At the and the DatasourceTransactionManager will commit the JDBC connection (and actually also commit all JMS work you did). Finally the JMS sessions are closed via transaction synchronizaton hooks (when a session was openened it was registered for transaction synchronization). Closing of the session will trigger a second commit, which will again endup on the JDBC connection, but that is no problem and merely a side effect.

    So: if you want to perform JDBC access and make it part of the same transaction as where the JMS message is processed in, you need to use DataSourceTransactionManager.
    If you also use Hibernate, you of course need HibernateTransactionManager instead of DataSourceTransactionManager since this has explicit Hibernate support (such as flushing the session first etc).

    Btw; maybe if you set the transaction synchronization to "always" on the JmsTransactionManager you might get away with it, but then only in the JMS/JDBC case (if you use Hibernate it will still not be enough).

    ReplyDelete
  3. Thanks for such a great post. Following you example, I have one simple scenario. I am reading message from Topic and inserting that information in a table. If my DB writing is failed(I am forcefully throwing exception to make it fail), transaction is rolled back but my message is not selected again by the listener. How can I add behaviour of retry in this case.

    ReplyDelete
  4. Very interesting. Since "the magic is in the orcl:aq-jms-connection-factory", where do we get that?

    ReplyDelete
  5. I wasn't very clear about that in the article:

    This is a custom namespace parsed by Spring data JDBC extensions.
    You can find info here: http://static.springsource.org/spring-data/jdbc/docs/1.0.0.RELEASE/reference/htmlsingle/

    You will have to declare the namespace (xmlns:orcl="http://www.springframework.org/schema/data/orcl") and then you'll be able to use the aq-jms-connection-factory in your beans config.

    ReplyDelete
  6. Just so it's clear...this solution still needs the Oracle aqapi.jar file, correct? And thanks for explaining the need for the Spring extensions.

    ReplyDelete
  7. Yes, it needs the aqapi.jar and of course the oracle jdbc driver (ojdbcX.jar).
    You will find them inside the Oracle database installation (perform a search starting from the oracle database home dir and you'll get there).

    ReplyDelete
  8. And again-- thanks for your very helpful advice.

    ReplyDelete
  9. Hi

    I followed the sample and when i submit message to the queue, i don't see it being grabbed by the listener..am i missing anything?

    ReplyDelete
  10. I think that default (unless you changed the log level) failures are silent. So in such cases set logging for org.springframework.jms.listener.DefaultMessageListenerContainer on TRACE. Or put a breakpoint in the inner class AsyncMessageListenerInvoker inside the DefaultMessageListenerContainer. There is a line: messageReceived = (invokeListener() || messageReceived); If it fail's you can check the exception there what is going on.

    ReplyDelete
  11. Hi Koen,

    Its already an old post, but I am wondering why you explicitly define the transactionManager on the DMLC. Are you using a JTA transactionManager?



    This effectively disables caching on the DMLC, see:

    org/springframework/jms/listener/DefaultMessageListenerContainer.java:514

    this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);

    Did you do any performance tests on your AQ dequeue rates? Best I get on my local machine is a dequeue rate of 135 messages p/s on a 4 cpu machine. Which is not really impressive IMHO.

    Cheers, Stephan

    ReplyDelete
    Replies
    1. Hi Stephan,

      We are indeed using JTA. I did not yet look into (read: I was not aware:) ) of the caching thing you mentioned. However, we do not have any performance issues with dequeues. I have to say we do use multiple threads by default and our messages are not spectacular in size.

      Delete
  12. Hi Koen,

    Thanks for your wonderful post.

    Quick question:
    https://jira.spring.io/plugins/servlet/mobile#issue/DATAJDBC-8

    Is this issue resolved?

    I'm facing the same problem with Hibernate transaction manager - memory leak. Please refer to the below issue.

    http://stackoverflow.com/questions/36239392/jdbc-connection-leak-on-jdbc-pool-when-using-aq

    Appreciate your great help.

    Thanks,
    Ravs

    ReplyDelete
    Replies
    1. Hi,

      Yes, our problem got fixed. In fact we are using it for several years now without any problems. A suggestion is to strip your application to a very small test-case which still can reproduce the issue (in that case I can also have a look at it if you want). Simultaneously you could also try to tackle it from another angle by setting a break point on the raw oracle connection 'close' method. If you are experiencing leaks in the jdbc pool, it is either because connections are not closed, or, the pool is not aware of connections being closed. The latter is the one pointed out in the jira. While this particular issue got fixed, it might be that there is another bug or you are somehow using the connection outside springs control.

      Delete
    2. Hi Koen,

      Thanks for your response. When I replace Hibernate Transaction Manager with JDBC Template everything works fine. Yes, it definitely not closing connections and connection not going back to pool. Unfortunately I can't reproduce in local. I can only reproduce this in Unix environment. Thanks for your help.

      Cheers,
      Ravi

      Delete
  13. Hi Koen,
    Great post. This will sound strange but i cant find the correct Oracle jms client api (with the AQjmsFactory class inside) anywhere. where can i download this jar ? I have found aqapi13.jar but the getQueueConnectionFactory method has other input params (java.lang.NoSuchMethodError: oracle.jms.AQjmsFactory.getConnectionFactory(Ljavax/sql/DataSource;)Ljavax/jms/ConnectionFactory)

    Thanks in advance,

    ReplyDelete
    Replies
    1. Hi Julian,

      This file can normally be found together somewhere in the installation directories of your Oracle RDBMS. After installation do a search in the installation dir (I don't remember the original name, but chances are that it at least contains 'aq' and ends with '.jar'). Btw, if I'm not mistaking the "13" reflects the Java version (1.3) which maybe indicate this is a rather old version.

      Delete
    2. you're right! i'll try with this new version. many thanks !

      Delete