Spring Cloud Steam

Après une journée à rechercher comment fonctionne Spring Cloud Stream, je me rends compte que la dernière version a changé pas mal de choses.

En effet depuis la version 3, la gestion des streams s’est orienté vers du code fonctionnel et on trouve encore beaucoup d’exemples utilisant “l’ancienne” méthode basée sur les annotations. Même dans la documentation officielle, je ne trouve pas ça super clair.

Du coup je vous propose d’étudier un petit exemple de mise en place simple avec cette nouvelle façon de faire.

La création d’évènements

Souvent dans les tutos les évènements sont créés par des Suppliers comme ça :

@Bean
public Supplier<Item> sendItem() {
  return () -> new Item("tomato", 10);
}

Mais dans un projet pour une API, c’est souvent créé par un call HTTP. Dans ce cas il faut utiliser un StreamBridge

streamBridge.send("sendItem-out-0", new Item("tomato", 10));

Avec la nouvelle version fonctionnelle de spring cloud stream, la gestion des définitions des files de message n’est plus faite par annotation mais par la configuration basée sur des règles de nommage.

Donc mon code signifie que je vais envoyer l’object Item comme premier élément de sortie de la fonction sendItem. Il faudra aussi ajouter dans le fichier application.properties la ligne

spring.cloud.stream.bindings.sendItem-out-0.destination=listItems

On était habitué à faire toute la configuration via annotation, ce n’est plus le cas.

Consommer un évènement

Toujours avec un code fonctionnel, un exemple de consommateur :

@Bean
public Consumer<Item> printItem() {
    return data -> LOG.info("Item " + data.name + "(" + data.price + ")");
}

Dans cet exemple on consomme un object Item pour l’afficher dans les logs. Il ne fonctionnera que si dans la configuration on ajoute :

spring.cloud.stream.function.definition=printItem
spring.cloud.stream.bindings.printItem-in-0.destination=listItems

Il faut donc définir la fonction printItem comme fonction gérant un stream et ensuite dire qu’elle consomme les objets de la file “listItems”

Transformer un événement

En plus des consommateurs et des producteurs, on peut faire des fonctions de composition.

@Bean
public Function<Item, Integer> getPrice() {
    return item -> item.price;
}

Dans cet exemple, la fonction transforme un object Item et entier. La encore il faut le configurer

spring.cloud.stream.function.definition=getPrice;printItem
spring.cloud.stream.bindings.getPrice-in-0.destination=buyItems
spring.cloud.stream.bindings.getPrice-out-0.destination=addPrices

Donc on ajoute le nom de la méthode getPrice dans la liste des fonctions gérant des stream et on ajoute le nom de la file dans laquelle on consomme un évènement et celle où on envoie le résultat

Test unitaire

Pour tester que la configuration et le routage fonctionnent correctement on teste qu’en fonction d’une donnée en entrée on récupère bien le bon résultat en sortie pour une fonction de composition

@Test
void testMessages() {
  input.send(new GenericMessage<>(new Item("Tomato", 20)));
  BlockingQueue<Message<?>> messages = collector.forChannel(output);
  assertThat(messages,receivesPayloadThat(CoreMatchers.is("20")));
}

En sachant qu’il est aussi assez simple de tester unitairement la fonction en direct :

@Test
void testFunction() {
  assertEquals(20,service.getPrice().apply(new Item("Tomato",20)).intValue());
}

Et voilà, c’est juste un petit exemple mais j’ai personnellement eu du mal à retrouver tous ces points dans un autre tuto. L’ensemble du code est sur ce repo github.

Dans mon exemple, il y a un docker-compose.yml pour lancer une instance de RabbiMQ avec un compte sécurisé avec un mot de passe et la configuration en découlant dans le fichier application.properties. Et oui je n’utilise pas de YAML pour la configuration de spring, je suis beaucoup trop vieux pour ça !

Références

Doc spring

Bon exemple en Kotlin

Exemples officiels