かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その31 )( Aggregator のサンプルを作ってみる )

概要

記事一覧はこちらです。

  • Spring Integration DSL8.4. Aggregator を使用したサンプルを作成します。
  • Aggregator でメッセージがどのように集約されるのかを見られるようにするために、単体の Redis サーバを Docker Compose で構築して MessageStore として利用します。Redis クライアントには JSON のデータを見たいので Medis を使用します。
  • サンプルは以前流行ったズンドコキヨシをベースに Aggregator の動きが分かるように少しルールを変えたものを作成してみます。

参照したサイト・書籍

  1. Spring Integration - 8.4. Aggregator
    https://docs.spring.io/spring-integration/docs/current/reference/html/#aggregator

  2. luin/medis
    https://github.com/luin/medis

目次

  1. ksbysample-eipapp-aggregator プロジェクトを作成する
  2. Docker Compose で単一の Redis サーバを構築する
  3. Redis の GUI クライアントとして Medis をインストールする
  4. "ずん" あるいは "どこ" のメッセージを送信して5つずつ aggregate するサンプルを作成する
  5. "ずん"、"ずん"、"ずん"、"ずん"、"どこ" で1セットのメッセージを送信後にバラバラに分解してから再び1セットのメッセージに aggregate するサンプルを作成する

手順

ksbysample-eipapp-aggregator プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。

plugins {
    id 'org.springframework.boot' version '2.1.5.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'ksbysample.eipapp'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

repositories {
    mavenCentral()
}

configurations {
    // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする
    testAnnotationProcessor.extendsFrom annotationProcessor
    testImplementation.extendsFrom compileOnly
}

dependencies {
    def lombokVersion = "1.18.6"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.boot:spring-boot-starter-data-redis")
    implementation("org.springframework.integration:spring-integration-redis")
    implementation("org.springframework.boot:spring-boot-starter-json") {
        exclude group: "org.springframework", module: "spring-web"
    }
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
}
  • Redis を MessageStore として利用するので以下の行を追加します。
    • implementation("org.springframework.boot:spring-boot-starter-data-redis")
    • implementation("org.springframework.integration:spring-integration-redis")
  • Redis に JSON フォーマットでメッセージを格納したいので以下の行を追加します。spring-boot-starter-json を追加すると org.springframework:spring-web も依存関係に追加されてしまうのですが、今回は不要なので exclude で除外します。
    • implementation("org.springframework.boot:spring-boot-starter-json")
  • ログを出力する時に @Slf4j アノテーションを使いたいので以下の行を追加して lombok を使えるようにします。
    • annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    • compileOnly("org.projectlombok:lombok:${lombokVersion}")

ディレクトリ構成は以下のようにしています。

f:id:ksby:20190615005011p:plain

Docker Compose で単一の Redis サーバを構築する

プロジェクトのルートディレクトリ直下に docker-compose.yml を新規作成し、以下の内容を記述します。

version: '3'

services:
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it redis /bin/sh
  #
  # 起動したコンテナの redis に redis-cli でアクセスするには以下のコマンドを実行する
  # docker exec -it redis redis-cli
  #
  #############################################################################
  # 単体 Redis サーバ
  redis:
    image: redis:5.0.5
    container_name: redis
    ports:
      - "6379:6379"

docker-compose up -d コマンドを実行して Redis を起動します。

f:id:ksby:20190615005616p:plain

Redis の GUI クライアントとして Medis をインストールする

https://github.com/luin/medis から Medis on Windowsインストーラをダウンロードしてインストールします。

"ずん" あるいは "どこ" のメッセージを送信して5つ ずつ aggregate するサンプルを作成する

以下の動作を行うサンプルを作成します。

  • "ずん"、"どこ" をランダムに出力する MessageSource を作成する。”ずん" を 4/5、"どこ" を 1/5 の確立で返す。
  • MessageSource からは 15秒ごとに取得する(aggregator の動作を確認するために時間を遅めにする)。
  • aggregator は Redis を MessageStore として使用し、メッセージが5つ溜まったら集約して1つのメッセージにして次の処理に流す。タイムアウトは設定しない(5つ溜まるまでずっと待つ)。
  • 最後に集約されたメッセージが "ずん", "ずん", "ずん", "ずん", "どこ" だったら "きよし!" とログに出力する。

src/main/java/ksbysample/eipapp/aggregator の下に RandomZundokoFlowConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

@Slf4j
@Configuration
public class RandomZundokoFlowConfig {

    private static final Random r = new Random();
    private static final String[] ZUNDOKO = new String[]{"ずん", "ずん", "ずん", "ずん", "どこ"};

    private static final long POLLER_DELAY_PERIOD = 15000L;

    // Redis の設定はデフォルト値をそのまま使用するので、application.poperties には何も記載していない
    // https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html#common-application-properties
    // の spring.redis.~ を参照。
    private final RedisConnectionFactory redisConnectionFactory;

    public RandomZundokoFlowConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    /**
     * Redis を MessageStore として使用するための設定。
     * Redis に格納する message は JSON フォーマットにする。
     * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store
     * 参照。
     *
     * @return {@link RedisMessageStore} object
     */
    @Bean
    public RedisMessageStore redisMessageStore() {
        RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
        ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
        RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
        store.setValueSerializer(serializer);
        return store;
    }

    @Bean
    public MessageSource<String> zundokoMessageSource() {
        return () -> MessageBuilder.withPayload(ZUNDOKO[r.nextInt(5)]).build();
    }

    @Bean
    public IntegrationFlow zundokoFlow() {
        return IntegrationFlows.from(zundokoMessageSource()
                , e -> e.poller(Pollers.fixedDelay(POLLER_DELAY_PERIOD)))
                // kiyoshiFlow にメッセージを送信する
                .channel(kiyoshiFlow().getInputChannel())
                .get();
    }

    @Bean
    public IntegrationFlow kiyoshiFlow() {
        return f -> f
                // メッセージが5つ溜まったら集約して次の処理に流す
                // * 流れてくる message の header に correlationId がないので
                //   correlationExpression("1") を記述して固定で "1" という扱いにする。
                //   これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。
                // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを
                //   次の処理に流した後に再びメッセージが蓄積されるようになる。
                .aggregate(a -> a.correlationExpression("1")
                        .messageStore(redisMessageStore())
                        .releaseStrategy(g -> g.size() == 5)
                        .expireGroupsUponCompletion(true))
                // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する
                .log()
                .<List<String>>handle((p, h) -> {
                    if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) {
                        log.error("きよし!");
                    }
                    return null;
                });
    }

}

実行して Redis 内に蓄積されている message の状態を見ると、"MESSAGE_GROUP_" + <correlationIdとして使用する文字列(今回は "1" 固定)> のレコードと個々の message のレコードがあることが確認できます。

f:id:ksby:20190615100441p:plain f:id:ksby:20190615100523p:plain

message が5つ蓄積されて aggregator により1つの message に集約されて次の処理に渡されると、蓄積されていた MESSAGE_GROUP と MESSAGE は削除されます。

f:id:ksby:20190615100821p:plain

コンソールには以下のように出力されます。payload=[ずん, ずん, ずん, ずん, どこ] になっていると "きよし!" の文字列が出力されています。またスレッド名を見るとバラバラでした。1つのスレッドで処理するわけではないようです。

f:id:ksby:20190615102346p:plain

.outputProcessor(...) を使うと次の処理に渡す時の palyload にセットする値を変更することができます。デフォルトでは List でまとめられますが、以下のように実装するとカンマで結合した String オブジェクトになります。

    @Bean
    public IntegrationFlow kiyoshiFlow() {
        return f -> f
                // メッセージが5つ溜まったら集約して次の処理に流す
                // * 流れてくる message の header に correlationId がないので
                //   correlationExpression("1") を記述して固定で "1" という扱いにする。
                //   これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。
                // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを
                //   次の処理に流した後に再びメッセージが蓄積されるようになる。
                .aggregate(a -> a.correlationExpression("1")
                        .messageStore(redisMessageStore())
                        .releaseStrategy(g -> g.size() == 5)
                        .outputProcessor(g -> {
                            String msg = g.getMessages().stream()
                                    .map(m -> (String) m.getPayload())
                                    .collect(Collectors.joining(","));
                            if (msg.equals("ずん,ずん,ずん,ずん,どこ")) {
                                msg += ",きよし!";
                            }
                            return msg;
                        })
                        .expireGroupsUponCompletion(true))
                // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する
                .log()
                .nullChannel();
//                .<List<String>>handle((p, h) -> {
//                    if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) {
//                        log.error("きよし!");
//                    }
//                    return null;
//                });
    }

f:id:ksby:20190615105148p:plain

Aggregator 用のクラスを定義して @CorrelationStrategy、@ReleaseStrategy、@Aggregator アノテーションを付与したメソッドを用意する書き方もあります。

    @Bean
    public IntegrationFlow kiyoshiFlow() {
        return f -> f
                // メッセージが5つ溜まったら集約して次の処理に流す
                // * 流れてくる message の header に correlationId がないので
                //   correlationExpression("1") を記述して固定で "1" という扱いにする。
                //   これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。
                // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを
                //   次の処理に流した後に再びメッセージが蓄積されるようになる。
                .aggregate(a -> a.processor(new ZundokoAggregator())
                        .messageStore(redisMessageStore())
                        .expireGroupsUponCompletion(true))
                // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する
                .log()
                .<List<String>>handle((p, h) -> {
                    if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) {
                        log.error("きよし!");
                    }
                    return null;
                });
    }

    static class ZundokoAggregator {

        @CorrelationStrategy
        public String correlationKey(Message<?> message) {
            return "1";
        }

        @ReleaseStrategy
        public boolean canRelease(List<Message<?>> messages) {
            return messages.size() == 5;
        }

        @Aggregator
        public List<String> aggregate(List<String> payloads) {
            // .outputProcessor(...) での加工処理を実装したい場合にはこのメソッド内に記述する
            return payloads;
        }

    }

f:id:ksby:20190615110538p:plain

"ずん"、"ずん"、"ずん"、"ずん"、"どこ" で1セットのメッセージを送信後分解してから再び1セットのメッセージに aggregate するサンプルを作成する

次に以下の動作を行うサンプルを作成します。

  • "ずん", "ずん", "ずん", "ずん", "どこ" の配列が palyload にセットされたメッセージを出力する MessageSource を作成する。
  • MessageSource からは 5秒ごとに取得する。
  • MessageSource からメッセージを取得したら Splitter で分解して、マルチスレッドで処理させるための channel に渡す。
  • マルチスレッド側ではランダムで決められた秒数(最大5秒)待機してから aggregator へ渡す。
  • aggregator では Splitter で分割されていたメッセージを全て集めたら1つのメッセージにして次の処理に流す。また3秒以内に分割していたメッセージが全て集まらなかった時には MESSAGE_GROUP を破棄する。
  • 集約されたメッセージが "ずん", "ずん", "ずん", "ずん", "どこ" だったら "きよし!" とログに出力する。

src/main/java/ksbysample/eipapp/aggregator/RandomZundokoFlowConfig.java を実行しないためにクラスに付与した @Configuration アノテーションコメントアウトした後、

@Slf4j
//@Configuration
public class RandomZundokoFlowConfig {

src/main/java/ksbysample/eipapp/aggregator の下に SplitZundokoFlowConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.DelayerEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

@Slf4j
@Configuration
public class SplitZundokoFlowConfig {

    private static final String[] ZUNDOKO = new String[]{"ずん", "ずん", "ずん", "ずん", "どこ"};

    private static final long POLLER_DELAY_PERIOD = 5000L;
    private static final int DELAY_MAX_VALUE = 5000;
    private static final long MESSAGE_GROUP_TIMEOUT = 3000L;

    // Redis の設定はデフォルト値をそのまま使用するので、application.poperties には何も記載していない
    // https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html#common-application-properties
    // の spring.redis.~ を参照。
    private final RedisConnectionFactory redisConnectionFactory;

    public SplitZundokoFlowConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    /**
     * Redis を MessageStore として使用するための設定。
     * Redis に格納する message は JSON フォーマットにする。
     * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store
     * 参照。
     *
     * @return {@link RedisMessageStore} object
     */
    @Bean
    public RedisMessageStore redisMessageStore() {
        RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
        ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
        RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
        store.setValueSerializer(serializer);
        return store;
    }

    @Bean
    public MessageSource<String[]> zundokoMessageSource() {
        return () -> MessageBuilder.withPayload(ZUNDOKO).build();
    }

    @Bean
    public IntegrationFlow zundokoFlow() {
        return IntegrationFlows.from(zundokoMessageSource()
                , e -> e.poller(Pollers.fixedDelay(POLLER_DELAY_PERIOD)))
                // Message<String[]> --> Message<String> x 5 に分割する
                .split()
                // ここから下はマルチスレッドで処理する
                .channel(c -> c.executor(Executors.newFixedThreadPool(5)))
                // 5秒以内(ランダムで決める)の間 delay する
                // group timeout した時にどのメッセージが原因だったのかが分かるようにするために
                // delay header を追加して値をセットする
                .enrichHeaders(h -> h.headerFunction("delay",
                        m -> String.valueOf(ThreadLocalRandom.current().nextInt(DELAY_MAX_VALUE))))
                .delay("ZUNDOKO_DELAYER",
                        (DelayerEndpointSpec e) -> e.delayExpression("headers.delay"))
                // kiyoshiFlow にメッセージを送信する
                .channel(kiyoshiFlow().getInputChannel())
                .get();
    }

    @Bean
    public IntegrationFlow kiyoshiFlow() {
        return f -> f
                .aggregate(a -> a
                        .messageStore(redisMessageStore())
                        .releaseStrategy(g -> g.getMessages().size() == g.getSequenceSize())
                        .expireGroupsUponCompletion(true)
                        // .groupTimeout(...) で指定した時間内に aggregator で処理されなかったメッセージは
                        // .discardChannel(...) で指定した channel に送信される
                        // ※既に MESSAGE_GROUP に蓄積されていたメッセージも discardChannel に送信される
                        .groupTimeout(MESSAGE_GROUP_TIMEOUT)
                        .discardChannel(discardFlow().getInputChannel()))
                // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する
                .log()
                .<List<String>>handle((p, h) -> {
                    if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) {
                        log.error("きよし!");
                    }
                    return null;
                });
    }

    /**
     * MESSAGE_GROUP_TIMEOUT で指定された時間(3秒)以内に aggregator で処理されなかった MESSAGE GROUP
     * のメッセージが送信されてくる Flow
     *
     * @return {@link IntegrationFlow} object
     */
    @Bean
    public IntegrationFlow discardFlow() {
        return f -> f
                .log(LoggingHandler.Level.WARN)
                .nullChannel();
    }

}

実行して Redis 内に蓄積されている message の状態を見ると、"MESSAGE_GROUP_" の後には correlationId としてセットされている UUID が表示されています。

f:id:ksby:20190615125003p:plain

コンソールを見ると "ずん", "ずん", "ずん", "ずん", "どこ" に戻った場合には "きよし!" のログが出力されており、また3秒以内にメッセージが全て集まらなかった場合には "Expiring MessageGroup with correlaionKey[...]" のログと既に蓄積されていたメッセージが出力されて、その後に後からきたメッセージが出力されていました(header の delay を見ると4秒以上の遅延時間がセットされていることが分かります)。

f:id:ksby:20190615125319p:plain

履歴

2019/06/15
初版発行。