jeudi 18 juin 2009

Utiliser les Advanced Queue en Java sur Oracle !

Après 3 jours de nuits blanches à chercher sur des milliers de sites pour intégrer les queues et JMS (Java Message Service) sur mon serveur Oracle 9i, j'ai enfin réussi à faire ce que je voulais... ouf! :)

Voici ce que je cherche à faire :
J'ai une base Oracle 9i, lorsqu'un enregistrement dans une table (par exemple la table Banque) est modifié/créé, je veux que mon application Java soit directement mise au courant lorsque l'opération est effectuée sur la base de données.

Pour "mettre au courant" mon application, on utilise un service très pratique introduit dans J2EE : JMS (Java Message Service). Comme son nom l'indique, c'est un service de message. Les messages sont produits dans une Queue et lus par les destinataires à partir de celle çi.

La base Oracle, à la modification/création d'un élément de la table Banque, produira un message JMS qui sera capté par mon application Java. Oui vous avez bien lus : Oracle est capable (je crois depuis la version 8) de produire des messages JMS : ce système sur Oracle s'appelle Advanced Queue (AQ).

Pour réaliser ce tour de magie, on définit tout d'abord un trigger sur la table (trigger en update, create,delete etc...). Ce trigger appellera une procédure stockée qui se chargera d'écrire et d'envoyer un message JMS. Coté Java, on aura besoin de se connecter à la queue Oracle et d'être prévenu lorsqu'un message arrive pour le traiter ensuite.

Ca parait très simple sur le papier, et ça l'est en pratique à condition que ça marche du premier coup qui n'a pas été mon cas !! Les docs et tutorials Oracles sur la toile sont assez nombreux et simples mais malheureusement (je peux citer par exemple http://gbowyer.freeshell.org/oracle-aq.html), en cas d'erreur (j'avais une exception que je ne comprenais pas d'où elle venait) on est vite paumé pour trouver une solution. Heureusement, à travers cet article et mon expérience acquise, je pense que je pourrai en aider certains !

Les pré requis nécessaires :
  • Une base Oracle 8 au minimum installée sur un serveur. Cet article se base sur la 9i
  • Les outils clients d'Oracle pour votre base (ces outils intégrent les drivers JDBC et le nécessaire pour lire les messages Oracle) installés sur votre machine de développement. ATTENTION : je tiens à avertir, car j'ai eu le cas, comme j'avais la version 9i, ça me paraissais logique d'installer les outils clients pour la 9i. Ben non ! j'ai pas du tout réussi à faire marcher mon programmes avec les librairies pour la 9i !! J'ai mis un jour et demi pour m'en rendre compte que ça venait des bibliothèques d'Oracle ! :( (j'avais des exceptions du type AbstractError à la connexion à Oracle). J'ai téléchargé les outils pour la version 10g (pas essayé sur la version 11g), et ça marche nickel même sur la base 9i !! (Peut être téléchargé à l'adresse : http://www.oracle.com/technology/software/products/database/oracle10g/htdocs/10201winsoft.html)
  • Un environnement de développement J2EE (J2EE 5 conseillé), j'utilise pour ma part RAD 7 (Rational Application Developper)
  • Un projet J2EE avec Spring de configuré (pas la peine de présenter Spring ? ...)
Passons à la mise en pratique :

1. Création de la queue dans Oracle

Pour commencer, on va s'attacher à créer la queue sur la base Oracle.

Connectez vous sur la base avec SQL*Plus en tant que SysDBA.

Il faut créer un utilisateur qui aura accès aux queues (et pourra écrire dedans). Une fois l'utilisateur créé, il faut lui donner les droits suivants : DBMS_AQ et DBMS_AQADM
GRANT EXECUTE ON DBMS_AQ TO MON_USER;
GRANT EXECUTE ON DBMS_AQADM TO MON_USER;
où MON_USER est le nom de l'utilisateur créé


Pour créer la queue, il suffit de lancer la commande suivante :
EXEC dbms_aqadm.create_queue_table(queue_table=>'content_queue', queue_payload_type=>'sys.aq$_jms_text_message', multiple_consumers=>TRUE);
EXEC dbms_aqadm.create_queue(queue_name=>'cnt_queue', queue_table=>'content_queue');
EXEC dbms_aqadm.start_queue(queue_name=>'cnt_queue');
commit;


cnt_queue est le nom de la queue à créer.
content_queue le nom de la table où Oracle va stocker les données.
queue_payload_type est le type des données dans les messages. Ici j'ai mis texte.
Je vous laisse le soin de lire par exemple http://hell.org.ua/Docs/oreilly/oracle/bipack/ch05_05.htm pour la liste complète des paramètres.

2. Création de la procédure et du trigger sur une table

Je le rappelle, le but de la procédure est d'écrire un message JMS dans la queue créée plus haut. Cette procédure sera appelée par un trigger sur la table.

Voici comment créer la procédure, toujours à partir de SQL*Plus :
CREATE OR REPLACE PROCEDURE PROC_CACHE_CTRL_NOTIFICATION(TABLE_NAME IN VARCHAR) AS
msg SYS.AQ$_JMS_TEXT_MESSAGE;
queue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
msg_props DBMS_AQ.MESSAGE_PROPERTIES_T;
msg_id RAW(16);
no_consumers_error EXCEPTION;
PRAGMA EXCEPTION_INIT(no_consumers_error, -24033);
BEGIN
-- Ensure what is sent will be a JMS message
msg := SYS.AQ$_JMS_TEXT_MESSAGE.CONSTRUCT();

msg.set_text('TABLE_NAME=' || TABLE_NAME);
DBMS_AQ.ENQUEUE( queue_name => 'cnt_queue'
, enqueue_options => queue_options
, message_properties => msg_props
, payload => msg
, msgid => msg_id);
-- Without the following, the procedure will die if noone
-- Is listening for cache control
EXCEPTION
WHEN no_consumers_error
THEN
-- Output it in case, but otherwise swallow
DBMS_OUTPUT.PUT_LINE('No interested parties are listening to cache-control messages');
END;


Cette petite procédure va écrire un message texte TABLE_NAME=la valeur passée en paramètre de la procèdure. Je vous laisse le loisir de personnaliser un peu cette procédure. Notez l'utilisation de DBMS_AQ.ENQUEUE pour poster un message.

Il faut créer désormais le trigger : sur la table Banque par exemple :
CREATE OR REPLACE TRIGGER TRGR_BANQUE
AFTER DELETE OR INSERT OR UPDATE
ON BANQUE
CALL PROC_CACHE_CTRL_NOTIFICATION('BANQUE');


3. Création des classes Java pour la connexion et réception des messages

Ouvrir votre projet J2EE avec Spring, on va créer trois classes pour la reception des messages.

Mais pour commencer, il faut modifier le classpath (Chemin de génération JAVA sous Eclipse ou RAD) :
  • Ajouter les jars aqapi13.jar et jmscommon.jar. Ces jars ce situent dans le répertoire rdbms/jlib du répertoire d'installation des outils clients d'Oracle (cf. plus haut). Si elles n'y sont pas, c'est que vous avez pas tous installé....
  • Ajouter le jar ojdbc14.jar. Ce jar est dans le répertoire jdbc/lib d'Oracle
Créer la classe de création de la connexion à la base Oracle :

package trigger;

import javax.jms.ConnectionFactory;
import oracle.jms.AQjmsFactory;

public class OracleAQTopicConnectionFactory
{

public ConnectionFactory createConnectionFactory() throws Exception
{
String url="jdbc:oracle:thin:MON_USER/MON_MDP@MON_SERVER:MON_PORT:MON_SID";

ConnectionFactory c= AQjmsFactory.getTopicConnectionFactory(url,null);
return c;
}
}

où MON_USER et MON_MDP sont respectivement le login et mot de passe de l'utilisateur ayant accés aux queues sur la base Oracle (voir plus haut)
MON_SERVEUR, MON_PORT sont le nom du serveur et le port pour se connecter à la base
MON_SID : le nom de l'instance Oracle

Note importante : vous remarquez que j'ai mis le login et mot de passe dans l'URL de connexion, je sais pas pourquoi, la connexion ne MARCHERA PAS si ces deux infos ne sont pas présentes ... (j'ai mis 3h pour m'en apercevoir !)

La deuxième classe à créer est une classe de connexion à la queue Oracle (Topic) :
package trigger;

import org.springframework.beans.factory.FactoryBean;
import oracle.jms.AQjmsSession;

import javax.jms.Topic;
import javax.jms.TopicConnectionFactory;

public class OracleAQTopicDestinationFactory implements FactoryBean {

private TopicConnectionFactory connectionFactory;
private String queueUser;
private String queueName;


public Object getObject() throws Exception {

AQjmsSession session = (AQjmsSession) connectionFactory
.createTopicConnection()
.createTopicSession(true, 0);


Topic t= session.getTopic(queueUser , queueName);
return t;
}

public Class getObjectType() {
return javax.jms.Topic.class;
}

public boolean isSingleton() {
return true;
}

public void setConnectionFactory(TopicConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

public void setQueueUser(String queueUser) {
this.queueUser = queueUser;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}
}


Enfin la troisième, est une classe qui écoute les messages. C'est celle çi qui va recevoir un message dès qu'Oracle en publie un.

package trigger;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;


public class OracleNotificationListener implements MessageListener
{

public void onMessage(Message message)
{
try
{
String text = ((TextMessage) message).getText();
System.out.println("Message reçus : " + text);

} catch (Throwable t)
{
t.printStackTrace();
}
}
}

A la réception d'un message, on l'affiche bêtement dans la console.

3. Paramétrage de Spring

Il ne reste plus qu'a paramétrer dans Spring pour que tout fonctionne. Dans votre applicationContext.xml, ajouter les lignes :

<beans xmlns="http://www.springframework.org/schema/beans" xsi="http://www.w3.org/2001/XMLSchema-instance" tx="http://www.springframework.org/schema/tx" schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd ">

<bean id="oracleAQJMSListener" class="trigger.OracleNotificationListener">

</bean>

<bean id="oracleAqConnFactory" bean="oracleAqConnectionFactoryHandler" method="createConnectionFactory">

<bean id="oracleAqConnectionFactoryHandler" class="trigger.OracleAQTopicConnectionFactory">
</bean>


<bean id="cnt_queue" class="trigger.OracleAQTopicDestinationFactory">
<property name="connectionFactory" ref="oracleAqConnFactory">
<property name="queueName" value="cnt_queue">
<property name="queueUser" value="MON_USER">
</property>

<bean id="oracleAqJMSQueue" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oracleAqConnFactory">
<property name="destination" ref="cnt_queue">
<property name="messageListener" ref="oracleAQJMSListener">
<property name="pubSubDomain" value="true">
</property>
</property></property></property></bean></property></property></bean></bean></beans>

Il ne faut pas oublier d'adapter les valeur class en respectant vos packages et nom de classes (ici les classes sont dans le package trigger).
Ici la queue s'appelle cnt_queue.
Le bean oracleAqJMSQueue est le bean d'écoute des messages (troisième classe créé).

Voila c'est tout !

Pour tester :

Lancer votre application.
Effectuez une modification sur la table où est le trigger
Un message devrait apparaitre dans la console... ça marche !!

1 commentaire:

  1. Hi,
    IT is a nice article. But how do we create a durable subscriber with the same configuration

    Thanks
    phani
    vsphani@gmail.com

    RépondreSupprimer