lunes, 11 de abril de 2022

WSO2 MI: Envío y Recepción con ActiveMQ

Buenas, hoy veremos un ejemplo completo de como interacturar desde WSO2 con una cola ActiveMQ. Mostraremos como configurar el Micro Integrator, MI, escribir y leer de la cola y consultar cuantos registros tiene. 

Para el ejemplo utilizaremos las siguientes versiones:

  • Micro Integrator 4.0.0
  • Active MQ 5.8.14. 

    1. Docker Compose

Por un lado debemos incluir las librerías y dependencias de ActiveMQ en el docker compose base que tenemos:

  • activemq-broker, activemq-client y activemq-openwire-legacy 5.14.0
  • geronimo-j2ee-management_1_1_spec-1.0.1 y geronimo-jms_1.1_spec-1.1.1
  • hawtbu-1.11
He incluir la instancia de ActiveMQ:

  activemq:
    image: rmohr/activemq:5.14.5
    mem_limit: 1G
    hostname: activemq
    container_name: activemq
    ports:
      - 8161:8161   #UI
      - 61616:61616 #JMS
    networks:
      wso2-net:
        ipv4_address: 172.16.238.20

Si no configuramos nada más, cuando intentemos conectarnos con ActiveMQ, recibiremos el siguiente error:

Unexpected error during sending message out org.apache.axis2.AxisFault: 
The system cannot infer the transport information from the jms:/.....

Por tanto deberemos modificar el fichero de configuración deployment.toml para añadir:

[[transport.jms.sender]]
name = "myQueueSender"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://activemq:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.cache_level = "producer"

Y para poder leer de una cola necesitamos configurar un JMS Listener también en el fichero de configuración deployment.toml

[[transport.jms.listener]]
name = "myQueueConnectionFactory"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "failover:tcp://activemq:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.cache_level = "consumer"

[[transport.jms.listener]]
name = "default"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "failover:tcp://activemq:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"

En esta configuración es muy importante indicar la configuración de failover. Puesto que de lo contrario si se cayese el nodo de ActiveMQ, no se podría volver a conectar con el mismo a menos que se reiniciase el MI. 

    2. Envío de mensajes

Ahora procederemos a crear el primer recurso de una API, con el cual enviaremos información al activeMQ. Pero previamente transformaremos el mensaje, simplemente por añadir una pequeña lógica al ejemplo.

<resource methods="POST" uri-template="/activeMQ/">
    <inSequence>
        <payloadFactory media-type="xml">
            <format>
                <m:getQuote xmlns:m="http://services.samples">
                    <m:request>
                        <m:book>
                            <m:isbn>$1</m:isbn>
                            <m:release>$2</m:release>
                        </m:book>
                    </m:request>
                </m:getQuote>
            </format>
            <args>
                <arg expression="$.isbn" evaluator="json"/>
                <arg expression="$.year" evaluator="json"/>
            </args>
        </payloadFactory>
        <property name="OUT_ONLY" value="true"/>
        <call>
        <endpoint>
            <address uri="jms:/myqueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory
                   &amp;java.naming.provider.url=failover:(tcp://activemq:61616)
                   &amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
                   &amp;transport.jms.DestinationType=queue&amp;redeliveryPolicy.redeliveryDelay=30000
                   &amp;maximumRedeliveries=10" />
        </endpoint>
        </call>
        <respond />
    </inSequence>
    <outSequence/>
    <faultSequence/>
</resource>

En este código hay dos apartados interesantes:

  • La configuración de la propiedad OUT_ONLY que nos permite indicarle al MI que no se quede a la espera de una respuesta por parte del ActiveMQ. 
  • La configuración de la invocación al ActiveMQ, donde aparte de los valores básicos para la configuración,  podemos incluir la política de reintentos a través de las propiedades de redelivery o aumentar la resiliencia con la configuración de failover, donde podemos incluir varios nodos de ActiveMQ.
Si hubiésemos configurado con valores más específicos, no necesitaríamos un address endpoint con tanto detalle. Solo necesitaríamos indicar la cola y el transport sender a utilizar. 

<address uri="jms:/myqueue?transport.jms.ConnectionFactory=myCustomSender" />

    3. Lectura de mensajes en la cola

Desde la versión 5.8 ActiveMQ incluye una API a través de la cual podemos enviar y consultar mensajes. Además con la ayuda de Jolokia pone a disposición de los usuarios una API de gestión. Esta  API será la que utilicemos para ver cuantos mensajes hay disponibles en la cola.   

La lógica de nuestro recurso será sencillo: 
    - Creamos una cabecera de autorización básica para la invocación de la API de activeMQ.
    - En la llamada a la API indicamos el broker y la cola de la cual deseamos los datos. Los cuales obtenemos desde nuestro propio recurso.
    - Leemos la respuesta y en caso de ser correcta en base al valor del campo status. (a tener en cuenta que siempre devuelve HTTP Status Code a 200 y Content-Type a text/plain). Leemos el campo value y lo devolvemos.  

<resource methods="GET" uri-template="/activeMQ/{broker}/{queue}/count">
    <inSequence>
        <property name="Authorization" expression="fn:concat('Basic ', base64Encode('admin:admin'))" scope="transport"/>  
        <send>
            <endpoint xmlns="http://ws.apache.org/ns/synapse" name="mock_v1_activemqCount_httpEp">
                <http method="GET" uri-template="http://activemq:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={uri.var.broker},destinationType=Queue,destinationName={uri.var.queue}/QueueSize" />
            </endpoint>
        </send>
    </inSequence>
    <outSequence>
        <property name="ContentType" scope="axis2" type="STRING" value="application/json"/>
        <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
        <switch source="json-eval($.status)">
            <case regex="200">
                <payloadFactory media-type="json">
                    <format> { "count": $1 } </format>
                    <args> <arg evaluator="json" expression="json-eval($.value)"/> </args>
                </payloadFactory>
            </case>
            <default>
                <property name="HTTP_SC" value="500" scope="axis2" />
                <payloadFactory media-type="json">
                    <format>{ "msg": "Unexpected error"}</format>
                    <args />
                </payloadFactory>
            </default>
        </switch>
        <respond/>
    </outSequence>
    <faultSequence/>
</resource>

    4. Lectura de mensajes 

Por último crearemos un Proxy Service que nos permitirá la lectura de esa misma cola. En este caso es importante también devolverle una respuesta a la cola, puesto que de lo contrarío reenviaría el mensaje a la cola de Dead Letter Channel, ActiveMQ.DLQ. 

<proxy xmlns="http://ws.apache.org/ns/synapse" name="activemqExample_v1_consumer_pr" transports="jms" statistics="disable" trace="disable" startOnLoad="true">
    <target>
        <inSequence>
            <log level="full">
                <property name="MSG" value="Receiving messages from Queue" />
            </log>
            <property action="set" name="OUT_ONLY" value="true"/>
            <drop />
        </inSequence>
        <faultSequence />
    </target>
    <parameter name="redeliveryPolicy.maximumRedeliveries">1</parameter>
    <parameter name="transport.jms.DestinationType">queue</parameter>
    <parameter name="transport.jms.SessionTransacted">true</parameter>
    <parameter name="transport.jms.Destination">myqueue</parameter>
    <parameter name="redeliveryPolicy.redeliveryDelay">2000</parameter>
    <parameter name="transport.jms.CacheLevel">consumer</parameter>
    <description />
</proxy>

Ahora podremos realizar la siguiente invocación y comprobar como por un lado enviamos el mensaje y lo leemos por el otro. 

curl --request POST 'https://localhost:8253/mock/activeMQ' \
--header 'Content-Type: application/json' \
--data-raw '{
    "author":"Alfred Bester",
    "name":"the stars my destination",
    "isbn":"1-85798-814-0",
    "year": 1956
}'

Con esto hemos visto un completo ejemplo sobre que podemos hacer para leer y escribir en una cola de ActiveMQ con WSO2. Si nos adentramos un poco más en la configuración podremos realizar tareas más complejas. Como siempre, todo el código lo podemos ver aquí.

No hay comentarios:

Publicar un comentario