かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その32 )( Aggregator のサンプルを作ってみる2 )

概要

記事一覧はこちらです。

  • 引き続き Spring Integration DSL8.4. Aggregator を使用したサンプルを作成します。
  • 今回は Aggregator で動作がよく分かっていない点をテストクラスを作成して確認してみます。

参照したサイト・書籍

目次

  1. Aggretator の動作をまとめてみる(よく分かっていない点も含めて)
  2. Spring Boot を 2.1.5 → 2.1.6 にバージョンアップし、JUnit 5 のモジュールを依存関係に追加する
  3. correlationId は同じだが sequenceSize を 3,4,5 と増やしたメッセージを A1→A2→A3 の順に送信した場合(sequenceSize = 3 が正しい)、Aggregator は A3 を受信した時に release するのか?
  4. complete フラグが true の MESSAGE_STORE が残っている時に同じ correlationId を持つメッセージが送信されてきたらどうなるのか?
  5. aggregator が複数ある場合(a1,a2を用意)に同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信したら a1 で release されるのか?

手順

Aggretator の動作をまとめてみる(よく分かっていない点も含めて)

  • メッセージの header にある correlationId を元に MESSAGE_GROUP を作成する。CorrelationStrategy を実装して correlationId 以外のデータをキーとして蓄積させることも可能。
  • MESSAGE_GROUP にメッセージの header にある sequenceSize の数だけメッセージが蓄積されたら、蓄積されているメッセージの payload のデータを List にまとめて、そのデータを payload にセットしたメッセージを作成して次に送信する(release する)。sequenceSize は一番最初のメッセージのものを参照するのか、最新のメッセージのものを見るのかは不明。
  • ReleaseStrategy を実装すれば release する条件を sequenceSize 以外に設定することが可能。
  • ReleaseStrategy はメッセージを受信する毎に実行される。
  • release された MESSAGE_GROUP の complete フラグは false → true に変わる。.expireGroupsUponCompletion(true) が指定されていると complete フラグが true になった MESSAGE_GROUP は即時削除される。.expireGroupsUponCompletion(false) の場合(記述していない場合は false になる)、SimpleMessageStore ならば削除されるがそれ以外(RedisMessageStore を含む)は残る。
    • org.springframework.integration.aggregator.AggregatingMessageHandler#afterRelease には以下のように実装されており、 f:id:ksby:20190623133649p:plain
    • SimpleMessageStore や RedisMessageStore のダイアログを作成してみると以下のようになる。 f:id:ksby:20190623133828p:plain
  • complete フラグが true になって残ったままになっている MESSAGE_STORE と同じ correlationId を持つメッセージが送信されてきた場合、Aggregator は何もしない。メッセージをそのまま次には流さないが、破棄するのか discardChannel に送信するのかは不明。
  • MESSAGE_GROUP に蓄積されたメッセージは無期限で待つ。.groupTimeout(...) を記述すればタイムアウトする時間を設定できる。タイムアウトすると蓄積されていたメッセージ、及び後から遅れてきたメッセージが全て discardChannel に送信される(.discardChannel(...) で設定する)。discardChannel を指定していなければ破棄される。

不明な点をサンプルを作成して確認してみます。

Spring Boot を 2.1.5 → 2.1.6 にバージョンアップし、JUnit 5 のモジュールを依存関係に追加する

今回はテストクラスで動作確認をしたいので JUnit 5 を使用可能にします。また Spring Boot の 2.1.6 がリリースされていたのでバージョンアップします。

build.gradle を以下のように変更します。

plugins {
    id 'org.springframework.boot' version '2.1.6.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'ksbysample.eipapp'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする
    testAnnotationProcessor.extendsFrom annotationProcessor
    testImplementation.extendsFrom compileOnly
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.4.2")
    }
}

dependencies {
    def lombokVersion = "1.18.6"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.boot:spring-boot-starter-data-redis")
    implementation("org.springframework.integration:spring-integration-redis")
    implementation("org.springframework.boot:spring-boot-starter-json") {
        exclude group: "org.springframework", module: "spring-web"
    }
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.assertj:assertj-core:3.12.2")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}
  • plugins block 内で id 'org.springframework.boot' version '2.1.5.RELEASE'id 'org.springframework.boot' version '2.1.6.RELEASE' に変更します。
  • dependencyManagement { ... } を追加します。
  • dependencies block に以下の記述を追加します。
    • testImplementation("org.assertj:assertj-core:3.12.2")
    • testCompile("org.junit.jupiter:junit-jupiter")
    • testRuntime("org.junit.platform:junit-platform-launcher")
  • test { ... } を追加します。

correlationId は同じだが sequenceSize を 3,4,5 と増やしたメッセージを A1→A2→A3 の順に送信した場合(sequenceSize = 3 が正しい)、Aggregator は A3 を受信した時に release するのか?

src/main/java/ksbysample/eipapp/aggregator/SplitZundokoFlowConfig.java のクラスに付与した @Configuration アノテーションコメントアウトした後、

@Slf4j
//@Configuration
public class SplitZundokoFlowConfig {

src/test/java/ksbysample/eipapp/aggregator を新規作成してから、その下に IncrementSequenceSizeFlowTest.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

@Slf4j
@SpringBootTest
public class IncrementSequenceSizeFlowTest {

    @Autowired
    private MessageBuilderFactory messageBuilderFactory;

    @Autowired
    private IntegrationFlow aggregateFlow;

    @Autowired
    private QueueChannel outputChannel;

    @Test
    void sequenceSizeをインクリメントするメッセージを3件送信した時に最後のメッセージを受信したaggregagorがreleaseするかのテスト() {
        // メッセージで使用する correlationId を発番する
        UUID correlationId = IntegrationObjectSupport.generateId();

        // A1, A2, A3 のメッセージを送信する
        // sequenceSize を 3, 4, 5 と1つずつ増やす(3が正しい値)
        Arrays.asList(1, 2, 3).forEach(n -> {
            String payload = String.format("A%d", n);
            Message<String> msg = messageBuilderFactory
                    .fromMessage(MessageBuilder.withPayload(payload).build())
                    .pushSequenceDetails(correlationId, n, n + 2)
                    .build();
            aggregateFlow.getInputChannel().send(msg);
        });

        // aggregator が release しているか確認する
        Message<?> result = outputChannel.receive(5_000);
        assertThat(result).isNotNull();
        assertThat(result.getPayload()).isEqualTo(Arrays.asList("A1", "A2", "A3"));
        assertThat(result.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE))
                .isEqualTo(3);
    }

    @TestConfiguration
    static class FlowConfig {

        @Autowired
        private RedisConnectionFactory redisConnectionFactory;

        /**
         * Redis を MessageStore として使用するための設定。
         * Redis に格納する message は JSON フォーマットにする。
         * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store
         * 参照。
         *
         * @return {@link RedisMessageStore} object
         */
        @Bean
        public RedisMessageStore redisMessageStore() {
            RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
            ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
            RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
            store.setValueSerializer(serializer);
            return store;
        }

        @Bean
        public IntegrationFlow aggregateFlow() {
            return f -> f
                    .log(LoggingHandler.Level.WARN)
                    .aggregate(a -> a.messageStore(redisMessageStore()))
                    .log(LoggingHandler.Level.ERROR)
                    .channel(outputChannel());
        }

        @Bean
        public MessageChannel outputChannel() {
            return MessageChannels.queue().get();
        }

    }

}

テストを実行してみると aggregator は release していますが、release された後のメッセージの sequenceSize は 3 ではなく 5 になるという結果でした。

f:id:ksby:20190622211313p:plain

debug 実行して2件メッセージを送信したところで止めて Redis のデータを見てみると、

f:id:ksby:20190622211814p:plain f:id:ksby:20190622211928p:plain

MESSAGE_GROUP のデータには sequenceSize は含まれていませんでした。sequenceSize のチェックには一番最初に受信したメッセージのデータを使用し、release 時には一番最後に受信した(release の判定をした)メッセージの sequenceSize を集約したメッセージの header に入れる、という仕様のようです。

complete フラグが true の MESSAGE_STORE が残っている時に同じ correlationId を持つメッセージが送信されてきたらどうなるのか?

以下の内容のテストを作成し、同じ correlationId で sequenceSize = 1 のメッセージを2件送信して動作確認してみました。

    @Test
    void sampleTest() {
        // メッセージで使用する correlationId を発番する
        UUID correlationId = IntegrationObjectSupport.generateId();

        // A1 メッセージを送信する
        // sequenceSize = 1 なので、このメッセージを受信すると aggregagtor は release する
        // MESSAGE_STORE も complete = true になる
        Message<String> msg = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A1").build())
                .pushSequenceDetails(correlationId, 1, 1)
                .build();
        aggregateFlow.getInputChannel().send(msg);

        // A1 と correlationId が同じ A2 メッセージを送信する
        Message<String> msg2 = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A2").build())
                .pushSequenceDetails(correlationId, 2, 1)
                .build();
        aggregateFlow.getInputChannel().send(msg2);
    }

aggregator の処理は org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler#handleMessageInternal を通るのですが、

f:id:ksby:20190623183947p:plain

  • 1件目のメッセージを受信した時には①→②→③→④を通り MESSAGE_GROUP の complete フラグが true になります。.expireGroupsUponCompletion(true) を指定していないので、complete フラグが true になった MESSAGE_GROUP は削除されずに残ります。
  • 2件目のメッセージを受信した時には①の処理で MESSAGE_GROUP を取得しますが if (!messageGroup.isComplete() && messageGroup.canAdd(message)) { が false となるため(messageGroup.isComplete() が true)、②以降の処理が実行されませんでした。

結論として、complete フラグが true の MESSAGE_GROUP が残っている時に同じ correlationId を持つメッセージが送信されてくると何もされません(MessageStore にメッセージが蓄積されず releaseStrategy も実行されません)。

aggregator が複数ある場合(a1,a2を用意)に同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信したら a1 で release されるのか?

1つ上の記述で見た org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler#handleMessageInternal の処理を見ると①~④の処理開始前に lockRegistry から取得した Lock オブジェクトでロックしています。

Spring Integration - Lock Registry を見ると For synchronizing updates across servers where a shared MessageGroupStore is being used, you must configure a shared lock registry. という記述がありました。

LockRegistry インターフェースの実装クラスを確認すると RedisLockRegistry クラスがあります。

f:id:ksby:20190623213307p:plain

これらの内容から同じ LockRegistry を使用すれば Aggregator が複数存在しても適切にロックしながら処理をするので、同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信しても a1 で release されるものと考えられます。

src/test/java/ksbysample/eipapp/aggregator/MultiAggregatorTest.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;

@Slf4j
@SpringBootTest
public class MultiAggregatorTest {

    @Autowired
    private MessageBuilderFactory messageBuilderFactory;

    @Autowired
    private IntegrationFlow a1Flow;

    @Autowired
    private IntegrationFlow a2Flow;

    @Autowired
    private QueueChannel outputChannel;

    @Test
    void sampleTest() {
        final int SEQUENCE_SIZE = 3;

        // メッセージで使用する correlationId を発番する
        UUID correlationId = IntegrationObjectSupport.generateId();

        int sequenceNumber = 1;
        Message<String> msg = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A1").build())
                .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE)
                .build();
        a1Flow.getInputChannel().send(msg);

        Message<String> msg2 = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A2").build())
                .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE)
                .build();
        a2Flow.getInputChannel().send(msg2);

        SleepUtils.sleep(1_000);
        Message<String> msg3 = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A3").build())
                .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE)
                .build();
        a1Flow.getInputChannel().send(msg3);

        Message<?> result = outputChannel.receive(15_000);
        assertThat(result).isNotNull();
        assertThat(result.getPayload()).isEqualTo(Arrays.asList("A1", "A2", "A3"));
        assertThat(result.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE))
                .isEqualTo(3);
    }

    @TestConfiguration
    static class FlowConfig {

        @Autowired
        private RedisConnectionFactory redisConnectionFactory;

        /**
         * Redis を MessageStore として使用するための設定。
         * Redis に格納する message は JSON フォーマットにする。
         * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store
         * 参照。
         *
         * @return {@link RedisMessageStore} object
         */
        @Bean
        public RedisMessageStore redisMessageStore() {
            RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
            ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
            RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
            store.setValueSerializer(serializer);
            return store;
        }

        @Bean
        public IntegrationFlow a1Flow() {
            return f -> f
                    .log(LoggingHandler.Level.WARN)
                    // 以降の処理を別スレッドにすることで、send メソッド呼び出し時に待機しないようにする
                    .channel(c -> c.executor(Executors.newFixedThreadPool(1)))
                    .aggregate(a -> a.messageStore(redisMessageStore())
                            // a1Flow, a2Flow で同じ LockRegistry を使用するよう設定する
                            .lockRegistry(aggretatorLockRegistry())
                            .releaseStrategy(g -> {
                                log.error("PASS_A1");
                                return g.getMessages().size() == g.getSequenceSize();
                            }))
                    .log(LoggingHandler.Level.ERROR)
                    .channel(outputChannel());
        }

        @Bean
        public IntegrationFlow a2Flow() {
            return f -> f
                    .log()
                    .channel(c -> c.executor(Executors.newFixedThreadPool(1)))
                    .aggregate(a -> a.messageStore(redisMessageStore())
                            .lockRegistry(aggretatorLockRegistry())
                            .releaseStrategy(g -> {
                                SleepUtils.sleep(10_000);
                                log.error("PASS_A2");
                                return g.getMessages().size() == g.getSequenceSize();
                            }))
                    .log();
        }

        @Bean
        public LockRegistry aggretatorLockRegistry() {
            return new RedisLockRegistry(redisConnectionFactory, "AGGREGATOR_LOCK");
        }

        @Bean
        public MessageChannel outputChannel() {
            return MessageChannels.queue().get();
        }

    }

    static class SleepUtils {

        public static void sleep(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

    }

}

テストを実行すると、

f:id:ksby:20190623214053p:plain

  • send メッセージで処理が待機しないように実装しているので(..aggregate(...) の処理を別スレッドで実行するように実装しているので)、3件のメッセージ送信は1つ前のメッセージ送信の aggregator の処理を待たずに連続で送信されている。
  • 2件目のメッセージの releaseStrategy が処理されるのは送信してから 10秒後。
  • 3件目のメッセージの releaseStrategy は2件目のメッセージの releaseStrategy が終わった後に実行される。
  • 同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信すると a1 で release された。

という結果になりました。まあ、実際に複数の aggregator が必要になるようなデータ量を処理することになったらこんな実装をするのかは分かりませんが。。。

履歴

2019/06/23
初版発行。