Spring Boot + Spring Integration でいろいろ試してみる ( その31 )( Aggregator のサンプルを作ってみる )
概要
記事一覧はこちらです。
- Spring Integration DSL で 8.4. Aggregator を使用したサンプルを作成します。
- Aggregator でメッセージがどのように集約されるのかを見られるようにするために、単体の Redis サーバを Docker Compose で構築して MessageStore として利用します。Redis クライアントには JSON のデータを見たいので Medis を使用します。
- サンプルは以前流行ったズンドコキヨシをベースに Aggregator の動きが分かるように少しルールを変えたものを作成してみます。
参照したサイト・書籍
Spring Integration - 8.4. Aggregator
https://docs.spring.io/spring-integration/docs/current/reference/html/#aggregatorluin/medis
https://github.com/luin/medis
目次
- ksbysample-eipapp-aggregator プロジェクトを作成する
- Docker Compose で単一の Redis サーバを構築する
- Redis の GUI クライアントとして Medis をインストールする
- "ずん" あるいは "どこ" のメッセージを送信して5つずつ aggregate するサンプルを作成する
- "ずん"、"ずん"、"ずん"、"ずん"、"どこ" で1セットのメッセージを送信後にバラバラに分解してから再び1セットのメッセージに aggregate するサンプルを作成する
手順
ksbysample-eipapp-aggregator プロジェクトを作成する
Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。
plugins { id 'org.springframework.boot' version '2.1.5.RELEASE' id 'java' } apply plugin: 'io.spring.dependency-management' group = 'ksbysample.eipapp' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' repositories { mavenCentral() } configurations { // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする testAnnotationProcessor.extendsFrom annotationProcessor testImplementation.extendsFrom compileOnly } dependencies { def lombokVersion = "1.18.6" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.boot:spring-boot-starter-data-redis") implementation("org.springframework.integration:spring-integration-redis") implementation("org.springframework.boot:spring-boot-starter-json") { exclude group: "org.springframework", module: "spring-web" } testImplementation("org.springframework.boot:spring-boot-starter-test") // for lombok // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している annotationProcessor("org.projectlombok:lombok:${lombokVersion}") compileOnly("org.projectlombok:lombok:${lombokVersion}") }
- Redis を MessageStore として利用するので以下の行を追加します。
implementation("org.springframework.boot:spring-boot-starter-data-redis")
implementation("org.springframework.integration:spring-integration-redis")
- Redis に JSON フォーマットでメッセージを格納したいので以下の行を追加します。spring-boot-starter-json を追加すると
org.springframework:spring-web
も依存関係に追加されてしまうのですが、今回は不要なので exclude で除外します。implementation("org.springframework.boot:spring-boot-starter-json")
- ログを出力する時に
@Slf4j
アノテーションを使いたいので以下の行を追加して lombok を使えるようにします。annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
compileOnly("org.projectlombok:lombok:${lombokVersion}")
ディレクトリ構成は以下のようにしています。
Docker Compose で単一の Redis サーバを構築する
プロジェクトのルートディレクトリ直下に docker-compose.yml を新規作成し、以下の内容を記述します。
version: '3' services: # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it redis /bin/sh # # 起動したコンテナの redis に redis-cli でアクセスするには以下のコマンドを実行する # docker exec -it redis redis-cli # ############################################################################# # 単体 Redis サーバ redis: image: redis:5.0.5 container_name: redis ports: - "6379:6379"
docker-compose up -d
コマンドを実行して Redis を起動します。
Redis の GUI クライアントとして Medis をインストールする
https://github.com/luin/medis から Medis on Windows のインストーラをダウンロードしてインストールします。
"ずん" あるいは "どこ" のメッセージを送信して5つ ずつ aggregate するサンプルを作成する
以下の動作を行うサンプルを作成します。
- "ずん"、"どこ" をランダムに出力する MessageSource を作成する。”ずん" を 4/5、"どこ" を 1/5 の確立で返す。
- MessageSource からは 15秒ごとに取得する(aggregator の動作を確認するために時間を遅めにする)。
- aggregator は Redis を MessageStore として使用し、メッセージが5つ溜まったら集約して1つのメッセージにして次の処理に流す。タイムアウトは設定しない(5つ溜まるまでずっと待つ)。
- 最後に集約されたメッセージが "ずん", "ずん", "ずん", "ずん", "どこ" だったら "きよし!" とログに出力する。
src/main/java/ksbysample/eipapp/aggregator の下に RandomZundokoFlowConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.aggregator; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.redis.store.RedisMessageStore; import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.messaging.support.MessageBuilder; import java.util.Arrays; import java.util.List; import java.util.Random; @Slf4j @Configuration public class RandomZundokoFlowConfig { private static final Random r = new Random(); private static final String[] ZUNDOKO = new String[]{"ずん", "ずん", "ずん", "ずん", "どこ"}; private static final long POLLER_DELAY_PERIOD = 15000L; // Redis の設定はデフォルト値をそのまま使用するので、application.poperties には何も記載していない // https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html#common-application-properties // の spring.redis.~ を参照。 private final RedisConnectionFactory redisConnectionFactory; public RandomZundokoFlowConfig(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } /** * Redis を MessageStore として使用するための設定。 * Redis に格納する message は JSON フォーマットにする。 * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store * 参照。 * * @return {@link RedisMessageStore} object */ @Bean public RedisMessageStore redisMessageStore() { RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper); store.setValueSerializer(serializer); return store; } @Bean public MessageSource<String> zundokoMessageSource() { return () -> MessageBuilder.withPayload(ZUNDOKO[r.nextInt(5)]).build(); } @Bean public IntegrationFlow zundokoFlow() { return IntegrationFlows.from(zundokoMessageSource() , e -> e.poller(Pollers.fixedDelay(POLLER_DELAY_PERIOD))) // kiyoshiFlow にメッセージを送信する .channel(kiyoshiFlow().getInputChannel()) .get(); } @Bean public IntegrationFlow kiyoshiFlow() { return f -> f // メッセージが5つ溜まったら集約して次の処理に流す // * 流れてくる message の header に correlationId がないので // correlationExpression("1") を記述して固定で "1" という扱いにする。 // これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。 // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを // 次の処理に流した後に再びメッセージが蓄積されるようになる。 .aggregate(a -> a.correlationExpression("1") .messageStore(redisMessageStore()) .releaseStrategy(g -> g.size() == 5) .expireGroupsUponCompletion(true)) // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する .log() .<List<String>>handle((p, h) -> { if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) { log.error("きよし!"); } return null; }); } }
実行して Redis 内に蓄積されている message の状態を見ると、"MESSAGE_GROUP_" + <correlationIdとして使用する文字列(今回は "1" 固定)> のレコードと個々の message のレコードがあることが確認できます。
message が5つ蓄積されて aggregator により1つの message に集約されて次の処理に渡されると、蓄積されていた MESSAGE_GROUP と MESSAGE は削除されます。
コンソールには以下のように出力されます。payload=[ずん, ずん, ずん, ずん, どこ] になっていると "きよし!" の文字列が出力されています。またスレッド名を見るとバラバラでした。1つのスレッドで処理するわけではないようです。
.outputProcessor(...)
を使うと次の処理に渡す時の palyload にセットする値を変更することができます。デフォルトでは List でまとめられますが、以下のように実装するとカンマで結合した String オブジェクトになります。
@Bean public IntegrationFlow kiyoshiFlow() { return f -> f // メッセージが5つ溜まったら集約して次の処理に流す // * 流れてくる message の header に correlationId がないので // correlationExpression("1") を記述して固定で "1" という扱いにする。 // これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。 // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを // 次の処理に流した後に再びメッセージが蓄積されるようになる。 .aggregate(a -> a.correlationExpression("1") .messageStore(redisMessageStore()) .releaseStrategy(g -> g.size() == 5) .outputProcessor(g -> { String msg = g.getMessages().stream() .map(m -> (String) m.getPayload()) .collect(Collectors.joining(",")); if (msg.equals("ずん,ずん,ずん,ずん,どこ")) { msg += ",きよし!"; } return msg; }) .expireGroupsUponCompletion(true)) // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する .log() .nullChannel(); // .<List<String>>handle((p, h) -> { // if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) { // log.error("きよし!"); // } // return null; // }); }
Aggregator 用のクラスを定義して @CorrelationStrategy、@ReleaseStrategy、@Aggregator アノテーションを付与したメソッドを用意する書き方もあります。
@Bean public IntegrationFlow kiyoshiFlow() { return f -> f // メッセージが5つ溜まったら集約して次の処理に流す // * 流れてくる message の header に correlationId がないので // correlationExpression("1") を記述して固定で "1" という扱いにする。 // これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。 // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを // 次の処理に流した後に再びメッセージが蓄積されるようになる。 .aggregate(a -> a.processor(new ZundokoAggregator()) .messageStore(redisMessageStore()) .expireGroupsUponCompletion(true)) // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する .log() .<List<String>>handle((p, h) -> { if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) { log.error("きよし!"); } return null; }); } static class ZundokoAggregator { @CorrelationStrategy public String correlationKey(Message<?> message) { return "1"; } @ReleaseStrategy public boolean canRelease(List<Message<?>> messages) { return messages.size() == 5; } @Aggregator public List<String> aggregate(List<String> payloads) { // .outputProcessor(...) での加工処理を実装したい場合にはこのメソッド内に記述する return payloads; } }
"ずん"、"ずん"、"ずん"、"ずん"、"どこ" で1セットのメッセージを送信後分解してから再び1セットのメッセージに aggregate するサンプルを作成する
次に以下の動作を行うサンプルを作成します。
- "ずん", "ずん", "ずん", "ずん", "どこ" の配列が palyload にセットされたメッセージを出力する MessageSource を作成する。
- MessageSource からは 5秒ごとに取得する。
- MessageSource からメッセージを取得したら Splitter で分解して、マルチスレッドで処理させるための channel に渡す。
- マルチスレッド側ではランダムで決められた秒数(最大5秒)待機してから aggregator へ渡す。
- aggregator では Splitter で分割されていたメッセージを全て集めたら1つのメッセージにして次の処理に流す。また3秒以内に分割していたメッセージが全て集まらなかった時には MESSAGE_GROUP を破棄する。
- 集約されたメッセージが "ずん", "ずん", "ずん", "ずん", "どこ" だったら "きよし!" とログに出力する。
src/main/java/ksbysample/eipapp/aggregator/RandomZundokoFlowConfig.java を実行しないためにクラスに付与した @Configuration アノテーションをコメントアウトした後、
@Slf4j //@Configuration public class RandomZundokoFlowConfig {
src/main/java/ksbysample/eipapp/aggregator の下に SplitZundokoFlowConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.aggregator; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.DelayerEndpointSpec; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.redis.store.RedisMessageStore; import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.messaging.support.MessageBuilder; import java.util.Arrays; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; @Slf4j @Configuration public class SplitZundokoFlowConfig { private static final String[] ZUNDOKO = new String[]{"ずん", "ずん", "ずん", "ずん", "どこ"}; private static final long POLLER_DELAY_PERIOD = 5000L; private static final int DELAY_MAX_VALUE = 5000; private static final long MESSAGE_GROUP_TIMEOUT = 3000L; // Redis の設定はデフォルト値をそのまま使用するので、application.poperties には何も記載していない // https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html#common-application-properties // の spring.redis.~ を参照。 private final RedisConnectionFactory redisConnectionFactory; public SplitZundokoFlowConfig(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } /** * Redis を MessageStore として使用するための設定。 * Redis に格納する message は JSON フォーマットにする。 * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store * 参照。 * * @return {@link RedisMessageStore} object */ @Bean public RedisMessageStore redisMessageStore() { RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper); store.setValueSerializer(serializer); return store; } @Bean public MessageSource<String[]> zundokoMessageSource() { return () -> MessageBuilder.withPayload(ZUNDOKO).build(); } @Bean public IntegrationFlow zundokoFlow() { return IntegrationFlows.from(zundokoMessageSource() , e -> e.poller(Pollers.fixedDelay(POLLER_DELAY_PERIOD))) // Message<String[]> --> Message<String> x 5 に分割する .split() // ここから下はマルチスレッドで処理する .channel(c -> c.executor(Executors.newFixedThreadPool(5))) // 5秒以内(ランダムで決める)の間 delay する // group timeout した時にどのメッセージが原因だったのかが分かるようにするために // delay header を追加して値をセットする .enrichHeaders(h -> h.headerFunction("delay", m -> String.valueOf(ThreadLocalRandom.current().nextInt(DELAY_MAX_VALUE)))) .delay("ZUNDOKO_DELAYER", (DelayerEndpointSpec e) -> e.delayExpression("headers.delay")) // kiyoshiFlow にメッセージを送信する .channel(kiyoshiFlow().getInputChannel()) .get(); } @Bean public IntegrationFlow kiyoshiFlow() { return f -> f .aggregate(a -> a .messageStore(redisMessageStore()) .releaseStrategy(g -> g.getMessages().size() == g.getSequenceSize()) .expireGroupsUponCompletion(true) // .groupTimeout(...) で指定した時間内に aggregator で処理されなかったメッセージは // .discardChannel(...) で指定した channel に送信される // ※既に MESSAGE_GROUP に蓄積されていたメッセージも discardChannel に送信される .groupTimeout(MESSAGE_GROUP_TIMEOUT) .discardChannel(discardFlow().getInputChannel())) // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する .log() .<List<String>>handle((p, h) -> { if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) { log.error("きよし!"); } return null; }); } /** * MESSAGE_GROUP_TIMEOUT で指定された時間(3秒)以内に aggregator で処理されなかった MESSAGE GROUP * のメッセージが送信されてくる Flow * * @return {@link IntegrationFlow} object */ @Bean public IntegrationFlow discardFlow() { return f -> f .log(LoggingHandler.Level.WARN) .nullChannel(); } }
実行して Redis 内に蓄積されている message の状態を見ると、"MESSAGE_GROUP_" の後には correlationId としてセットされている UUID が表示されています。
コンソールを見ると "ずん", "ずん", "ずん", "ずん", "どこ" に戻った場合には "きよし!" のログが出力されており、また3秒以内にメッセージが全て集まらなかった場合には "Expiring MessageGroup with correlaionKey[...]" のログと既に蓄積されていたメッセージが出力されて、その後に後からきたメッセージが出力されていました(header の delay を見ると4秒以上の遅延時間がセットされていることが分かります)。
履歴
2019/06/15
初版発行。