jueves, 9 de abril de 2020

Java 9: Reactive Streams

Bueno ya hemos visto algo de Java 9 aqui. Ahora vamos a ver un poco más en detalle otra de sus características principales, los Reactive Streams. Los cuales nos llevarán más adelante a ver frameworks que los tienen como base para realizar programación reactiva.

Antes de empezar debemos tener claro en que se basa el paradigma de la programación reactiva. Enfocada en el trabajo con flujos de datos de forma asíncrona, entre un publicador y un subscriptor. Teniendo esto como base, los Reactive Stream se implementan en Java 9 a través de la clase Flow y sus interfaces internas:
  • Publisher, nos permite la publicación de datos.
  • Subscriber, nos permite recibir esos mismos datos. 
  • Subscription, nos permite vincular al suscriptor con el publicador. 
  • Processor, nos permite actuar como suscriptor y publicador.
La interfaz Publisher contará con un método principal, subscribe, el cual recibe un suscriptor. Además también contamos dentro de la JDK con la clase SubmissionPublisher, que implementa la interfaz Publisher. Donde a traves de su método submit, podremos enviar mensajes a todos sus suscriptores. 

Para nuestro ejemplo crearemos una clase que implemente Subscriber. Y para ello debe implementar los métodos:
  • onComplete: Metodo final invocado al terminar el flujo de datos.
  • onError: Método que es invocado cuando ocurre un fallo durante la comunicación.
  • onNext: Método invocado cada vez que el publisher envia un elemento. 
  • onSubscribe: Método invocado antes de comenzar el flujo 
public class IntBasicSubscriber implements Subscriber<Integer> {
 private Subscription subscription;
 @Override
 public void onComplete() {
  System.out.println("Subscription ends");
 }
 @Override
 public void onError(final Throwable throwable) {
  System.out.println("error: " + throwable);
 }
 @Override
 public void onNext(final Integer item) {
  System.out.println("Integer obtain: " + item);
//  subscription.request(1);
 }
 @Override
 public void onSubscribe(final Subscription subscription) {
  System.out.println("Subscription starts");
  this.subscription = subscription;
  this.subscription.request(1);
 }
}

En el código podéis ver como almacenamos el objeto de tipo Subscription que obtenemos en el método onSubscribe. Con este objeto y através del método request, podemos solicitar al Publisher que nos envíe tantos elementos como le digamos.

Para el ejemplo creamos una instancia de SubmissionPublisher, a la que le pasamos 5 números. Para enviar un elemento al publicador, utilizamos el método submit tal y como indicamos antes.

public static void main(final String[] args) throws InterruptedException {
  ExecutorService executor = Executors.newFixedThreadPool(1);
  SubmissionPublisher<Integer> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
  sb.subscribe(new IntBasicSubscriber());
  IntStream.range(0, 5).forEachOrdered(sb::submit);
  sb.close();
  executor.shutdown();
 }

Como en el suscriptor sólo hemos puesto una vez la llamada al método request, la salida que obtendremos será la siguiente:

Subscription starts
Integer obtain: 0

Si quisiéramos obtener más elementos del publicador y ver como se llama tambien al metodo onComplete del suscriptor, podemos descomentar la llamada al método request dentro del metodo onNext. De esta forma obtendremos todos los elementos. 

No hay comentarios:

Publicar un comentario