Split Flux Reactor

La question de comment faire pour splitter un Flux Reactor revient souvent sur stackoverflow. Comment faire en sorte qu’un flux exécute des opérations différentes sur ces éléments en fonction d’un critère prédéfini ? Et surtout que la notion de stream soit conservée sur ces actions ?

Split Reactor Flux

Contexte

Nous avons rencontré la problématique pour l’un de nos modules réactifs. Un fichier CSV, fourni par un prestataire, contenant des lignes de données à intégrer dans nos bases. La première colonne indique le type d’opération à effectuer sur la donnée : INSERT, UPDATE ou DELETE.

Tout le composant est réactif, on souhaite streamer le contenu de ce fichier qui peut peser plusieurs centaines de Mo. Nous avons donc besoin, à une étape du traitement, de diviser notre flux de lignes en 3 Flux distincts pour chacune des 3 opérations possibles et re-fusionner le tout en un flux de sortie.

Prenons l’exemple simplifié des traitements suivant :

    public Flux<String> delete(Flux<CsvLine> toBeDeleted) {
        return toBeDeleted.map(csvLine -> "Delete " + csvLine.data);
    }

    public Flux<String> update(Flux<CsvLine> toBeUpdated) {
        return toBeUpdated.map(csvLine -> "Update " + csvLine.data);
    }

    public Flux<String> insert(Flux<CsvLine> toBeInserted) {
        return toBeInserted.map(csvLine -> "Insert " + csvLine.data);
    }

La méthode groupBy

C’est la réponse qui revient presque toujours sur stackoverflow. Utiliser la fonction groupBy présente sur les Flux Reactor pour traiter la problématique. On pourrait l’écrire de la façon suivante :

    @Test
    void should_split_flux_with_groupby() {
        Flux<String> actual = Flux.fromIterable(List.of(
                CsvLine.of("DELETE", "1", "data 1"),
                CsvLine.of("UPDATE", "2", "data 2"),
                CsvLine.of("UPDATE", "3", "data 3"),
                CsvLine.of("INSERT", "4", "data 4"),
                CsvLine.of("DELETE", "5", "data 5")

        )).groupBy(l -> l.type).flatMap(gf -> {
            switch (gf.key()) {
                case "DELETE":
                    return delete(gf);
                case "UPDATE":
                    return update(gf);
                case "INSERT":
                    return insert(gf);
                default:
                    return Flux.error(new IllegalArgumentException("Unknown key " + gf.key()));
            }
        });

        StepVerifier.create(actual)
                .expectNext(
                        "Delete data 1",
                        "Update data 2",
                        "Update data 3",
                        "Insert data 4",
                        "Delete data 5"
                ).expectComplete()
                .verify();
    }

Cette méthode fonctionne bien. Il s’agit de la solution initialement mise en place dans notre code pour gérer la problématique de division de flux. Mais très vite nous avons constaté des comportements “aléatoires” et parfois des blocages.

En se penchant sur la javadoc de la fonction groupBy on peut lire :

Les groupes doivent être drainés et consommés en aval pour que groupBy fonctionne correctement. Notamment lorsque les critères produisent une grande quantité de groupes, cela peut conduire à un blocage si les groupes ne sont pas correctement consommés en aval (par exemple en raison d’un flatMap avec un paramètre maxConcurrency défini trop bas).
JavaDoc Flux::groupBy

L’exemple précédent n’utilise que trois groupes mais notre module en avait quelques autres. De plus le traitement en aval des données consiste à écrire en base de données. Ce n’est pas forcément le traitement le plus rapide pour consommer les groupes. Enfin, cette implémentation, bien qu’efficace pour des cas simples, reste assez rigide et ne permettra pas des traitements plus élaborés.

De l’expérience que nous en avons eu, même si le nombre de clés n’est pas important, le groupBy n’est pas adapté à des volumes importants d’éléments et à des sous-flux complexes.

L’utilisation des Sinks

Anciennement connus comme les EmitterProcessor, les Sinks permettent de créer des Flux qui seront peuplés programmatiquement. Le Publisher est le traitement, il n’y a pas de Publisher pré-défini.

Le code précédent implémenté avec des Sinks devient :

    @Test
    void should_split_flux_with_sinks() {

        Sinks.Many<CsvLine> toDelete = Sinks.many().unicast().onBackpressureBuffer();
        Sinks.Many<CsvLine> toInsert = Sinks.many().unicast().onBackpressureBuffer();
        Sinks.Many<CsvLine> toUpdate = Sinks.many().unicast().onBackpressureBuffer();
        Sinks.Many<CsvLine> onError = Sinks.many().unicast().onBackpressureBuffer();

        Flux<String> deleted = delete(toDelete.asFlux());
        Flux<String> inserted = insert(toInsert.asFlux());
        Flux<String> updated = update(toUpdate.asFlux());
        Mono<Long> countErrors = onError.asFlux().count().doOnNext(c -> log.error("{} lines with errors", c));

        Flux<String> actual = Flux.fromIterable(List.of(
                CsvLine.of("DELETE", "1", "data 1"),
                CsvLine.of("UPDATE", "2", "data 2"),
                CsvLine.of("UPDATE", "3", "data 3"),
                CsvLine.of("INSERT", "4", "data 4"),
                CsvLine.of("DELETE", "5", "data 5")
        )).doOnNext(csvLine -> {
            switch (csvLine.type) {
                case "DELETE":
                    toDelete.tryEmitNext(csvLine);
                    break;
                case "UPDATE":
                    toUpdate.tryEmitNext(csvLine);
                    break;
                case "INSERT":
                    toInsert.tryEmitNext(csvLine);
                    break;
                default:
                    onError.tryEmitNext(csvLine)
            }
        }).doOnComplete(() -> {
            toDelete.tryEmitComplete();
            toUpdate.tryEmitComplete();
            toInsert.tryEmitComplete();
            onError.tryEmitComplete();
        }).then(countErrors)
                .thenMany(Flux.merge(deleted, inserted, updated));

        StepVerifier.create(actual)
                .expectNext(
                        "Delete data 1",
                        "Delete data 5",
                        "Insert data 4",
                        "Update data 2",
                        "Update data 3"
                ).expectComplete()
                .verify();
    }

Contrairement au premier cas, les Flux sont consommés dans l’ordre où ils sont renseignés dans le merge. Le test est mono-thread donc les éléments des flux suivants sont bufferizés le temps que le flux en cours soit terminé. Cependant en multi-thread, les flux seront traités en parallèle et la bufferization des éléments restera maîtrisée. C’est ce que nous avons sur notre module où les traitements de delete, update et insert sont des appels de base de données souscrits sur des Schedulers dédiés.

La gestion des erreurs

L’utilisation des Sinks, va aussi permettre plus de souplesse dans nos implémentations. Par exemple pour la gestion des erreurs. Une ligne de CSV peut générer une erreur à plusieurs moments de son traitement : le parsing, la transformation, la persistance… En déclarant un Sink pour le traitement d’erreur, il est alors facile de lui pousser les lignes depuis n’importe quelle phase de leur traitement, puis de terminer le flux par le traitement des exceptions.

Il devient aussi possible d’ordonner les Flux secondaires et de faire en sorte qu’un Flux s’exécute après la fin d’un autre. Il est alors possible de paralléliser une partie des Flux et d’exécuter le reste des opérations séquentiellement une fois que les précédents traitements sont terminés.

En production

La solution à base de Sinks est celle qui s’est avérée la plus performante dans notre cas d’usage. En production depuis plus d’un an, elle traite des fichiers de 320K lignes (120Mo) sans problème avec une utilisation de la mémoire stable.

Le groupBy reste une solution très efficace pour la majorité des cas de figure. Cependant, pour les besoins plus complexes, il est intéressant d’étudier l’utilisation des Sinks et de savoir comment les utiliser.

Le code complet du test est disponible sur Github. En plus de JUnit5, vous aurez besoin des dépendances suivantes :

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.5</version>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.4.5</version>
    <scope>test</scope>
</dependency>