sábado, 22 de octubre de 2022

Apache Camel: ActiveMQ

Hace poco vimos como hacer una integración con WSO2 y ActiveMQ, que puedes ver aquí. Esta vez. veremos el mismo ejemplo pero con Apache Camel y Spring Boot. Donde además añadiremos la complejidad de utilizar colas que requieren autenticación. Para el ejemplo utilizaremos las siguientes versiones:

  • Apache Camel 3.11.0
  • Spring Boot 2.5.1
  • Active MQ 5.16.2 

1. Docker Compose y configuración

activemq:
  image: rmohr/activemq
  mem_limit: 1G
  hostname: sandbox-activemq
  container_name: sandbox-activemq
  ports:
    - 8161:8161
    - 61616:61616
  volumes:
    - ./activemq.xml:/opt/apache-activemq-5.15.6/conf/activemq.xml

Sobre esta configuración podemos indicar lo siguiente:

  • El puerto 8161 es el de administración y el que nos permitirá acceder a la consola de gestión a través de la URL http://localhost:8161/admin/index.jsp. 
  • El puerto 61616 es el puerto TCP utilizado para la comunicación con ActiveMQ
  • El fichero activemq.xml, el cual se puede encontrar dentro de la misma imagen, es el fichero que nos permitirá configurar el comportamiento de ActiveMQ. Entre otras cosas, la gestión de los usuarios que tengan acceso a las colas o topics. 
Si queremos autenticación para hacer uso de las colas deberemos incluir el siguiente trozo de código en el fichero activemq.xml:

<plugins>
    <simpleAuthenticationPlugin anonymousAccessAllowed="true">
        <users>
            <authenticationUser username="privateUser" password="P#s5W0rd" groups="admins" />
            <authenticationUser username="system" password="manager" groups="admins,publishers,consumers"/>
        </users>
    </simpleAuthenticationPlugin>
    <authorizationPlugin>
        <map>
            <authorizationMap>
                <authorizationEntries>
                    <authorizationEntry queue="privateQueue" read="admins" write="admins" admin="admins" /> 
                    <authorizationEntry queue=">" write="anonymous,admins" read="anonymous,admins" admin="anonymous,admins" />
                    <authorizationEntry topic="ActiveMQ.Advisory.>" write="*" read="*" admin="*" />
                </authorizationEntries>
            </authorizationMap>
        </map>
     </authorizationPlugin>
</plugins>

Con este código estamos configurando lo siguiente:

  • simpleAuthenticationPlugin: Nos permite configurar manualmente usuarios, contraseñas y los grupos a los que están asociados. La otra opción para securizar el acceso, más profesional,  sería a través del JAAS plugin. 
  • anonymousAccessAllowed: Nos permite indicar que también permitiremos el acceso anónimo. De esta forma podemos tener algunas colas securizadas y otras no. El usuario y grupo asociado por defecto es anonymous
  • Hay que añadir al usuario system para evitar errores al realizar operaciones desde la consola. 
  • authorizationEntries: Es la configuración que queremos aplicar a las colas de forma genérica o específica. En esta configuración permitimos:
    • Crear, escribir o leer en la cola 'privateQueue' solo a los usuarios administradores. 
    • Crear, escribir o leer de cualquier cola. El símbolo '>' indica 'todos'. Esta configuración esta asociada a la inferior, que permite la creación de los canales de asesoramiento a cualquier usuario, 'ActiveMQ.Advisory.>'. 
Para temrinar la configuración, respecto a Apache Camel/Spring Boot, como usamos la dependencia camel-activemq-starter, no necesitaremos configurar nada especial. Simplemente cual es la ruta de comunicación del broker:
camel.component.activemq.brokerUrl=tcp://localhost:61616

2. Envío de mensajes

Ahora que lo tenemos correctamente configurado podemos proceder a crear el recurso que nos permita realizar el envío de mensajes. La idea es crear un servicio REST que almacene la información que recibe en la cola de ActiveMQ. 

rest().post("activemq/public").produces(MediaType.APPLICATION_JSON_VALUE)
  .consumes(MediaType.APPLICATION_JSON_VALUE).route()
  .routeId("postPublicQueue").log("Message incoming - ${body}")
  .to("activemq:queue:publicQueue?exchangePattern=InOnly")
  .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202));

Como vemos en la configuración, necesitamos indicar al Apache que no se quede esperando por respuesta, ya que no recibirá ninguna del ActiveMQ. Esto lo podemos hacer a través de la configuración 'exchangePattern=InOnly'. De otra forma, en cada llamada, recibiríamos un mensaje de error similar al siguiente:

org.apache.camel.ExchangeTimedOutException: The OUT message was not received
within: 20000 millis due reply message with correlationID

Ahora sí hiciéramos la siguiente invocación podríamos mandar un mensaje a la cola pública. 

curl --location --request POST 'http://localhost:8080/camel/activemq/public' \
--header 'Content-Type: application/json' \
--data-raw '{"prop":"value"}'

Y para la cola privada será un poco más complicado. Primero creamos una conexión de tipo ActiveMQConnectionFactory que permita configurar un usuario y contraseña. Por defecto usa SingleConnectionFactory y este no admite la autenticación. 

@Bean
private ConnectionFactory activeMQConnectionFactory() {
  return new ActiveMQConnectionFactory();
}

Y segundo, será indicar en la cadena de conexión el usuario y contraseña. Si lo quisiésemos de forma generalizada podríamos configurarlo a nivel del fichero application.properties. Pero en este caso solo se lo queremos aplicar a un route

rest().post("activemq/private").produces(MediaType.APPLICATION_JSON_VALUE)
    .consumes(MediaType.APPLICATION_JSON_VALUE).route().routeId("postPrivateQueue")
    .log("Private message incoming - ${body}")
    .to("activemq:queue:privateQueue?connectionFactory=activeMQConnectionFactory&exchangePattern=InOnly&username=privateUser&password={{activemq.privateQueue.password}}")
    .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202));

3. Lectura de mensajes en la cola

Ahora veremos como hacer un recurso de la API que nos permita leer de la cola. Esta es la parte más sencilla, pues simplemente será cambiar el método to que nos permite producir, por el método from que nos permite consumir. La configuración será la misma. 

// Consumers
from("activemq:queue:publicQueue").log("Reading public message incoming - ${body}").end();

from(
    "activemq:queue:privateQueue?connectionFactory=activeMQConnectionFactory&exchangePattern=InOnly&username=privateUser&password={{activemq.privateQueue.password}}")
        .log("Reading private message incoming - ${body}").end();

Y esto ha sido todo, espero que haya sido util. Yy como siempre puedes ver todo el código aquí

No hay comentarios:

Publicar un comentario