かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その35 )( Docker Compose でサーバを構築する、Kafka 編2 )

概要

記事一覧はこちらです。

Kafka+zookeeper の環境を Docker Compose で構築して Spring Integration を利用したサンプルを作成するだけのつもりでいたのですが、KafkaApache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) を読んでみたら結構面白いサーバであることが分かったので、しばらくいろいろ試してみることにします。

  • docker-compose.yml を confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう書き直します。
  • partition を複数作成すれば複数の consumer で分散して受信できるようなので試します。

参照したサイト・書籍

  1. Kafka

    Kafka

    Kafka

  2. Apache Kafka 分散メッセージングシステムの構築と活用

    Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE)

    Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE)

  3. confluentinc/cp-kafka(Docker Hub のページ)
    https://hub.docker.com/r/confluentinc/cp-kafka

  4. confluentinc/cp-zookeeper(Docker Hub のページ)
    https://hub.docker.com/r/confluentinc/cp-zookeeper

  5. confluentinc/cp-docker-images
    https://github.com/confluentinc/cp-docker-images

    • examples の下に Kafka を cluster 構成にする docker-compose.yml のサンプルがあります。
  6. Confluent Platform Docker Images
    https://docs.confluent.io/current/installation/docker/index.html

  7. Confluent Platform Quick Start (Docker)
    https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html

    • control-center や ksql 等 confluentinc/cp-docker-images で入れられる機能を試したい場合の手順が記載されています(ただし今回の記事では試していません)。
  8. Generic Spring Kafka Listener
    https://stackoverflow.com/questions/42045033/generic-spring-kafka-listener

    • @KafkaListener による consumer で topicPartitions 属性で partition を指定したい場合の例が書かれています。

目次

  1. confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう docker-compose.yml を書き直す
  2. topic に partition を複数作成すればメッセージの受信を分散できる

手順

confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう docker-compose.yml を書き直す

Spring Boot + Spring Integration でいろいろ試してみる ( その34 )( Docker Compose でサーバを構築する、Kafka 編 ) で Kafka の Docker コンテナを作成した時に Kafka は Official Images がないのかな?と思っていたのですが、Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) を読むと Kafka のコミッタの半数以上が Confluent という会社に所属していることとインストールで Confluent が配布している Confluent Platform の OSS 版を利用することが記載されていました。

Docker Hub で "confluent" で検索してみると confluentinc/cp-kafkaconfluentinc/cp-zookeeper を見つけたので、この2つを利用するよう docker-compose.yml を書き直すことにします。

cp-docker-images/examples/ の下にサンプルが多数ありますので、今回は kafka-single-node/docker-compose.yml を参考にします(というよりほぼコピペです)。

Conflulent の Docker Image で利用可能なオプションは Docker Configuration に記載があります。

version: '3'

services:
  #############################################################################
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it cp-zookeeper /bin/bash
  #############################################################################
  # 単体 zookeeper
  cp-zookeeper:
    image: confluentinc/cp-zookeeper:5.2.2
    container_name: cp-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  #############################################################################
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it cp-kafka /bin/bash
  #############################################################################
  # 単体 Kafka
  # topic の作成・一覧取得・詳細表示
  # kafka-topics --zookeeper cp-zookeeper:2181 --create --topic Topic1 --partitions 1 --replication-factor 1 --if-not-exists
  # kafka-topics --zookeeper cp-zookeeper:2181 --alter --topic Topic1 --partitions 3
  # kafka-topics --zookeeper cp-zookeeper:2181 --list
  # kafka-topics --zookeeper cp-zookeeper:2181 --topic Topic1 --describe
  # kafka-console-producer --broker-list localhost:9092 --topic Topic1
  # kafka-console-consumer --bootstrap-server localhost:9092 --topic Topic1
  cp-kafka:
    image: confluentinc/cp-kafka:5.2.2
    container_name: cp-kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - cp-zookeeper

docker-compose up -d コマンドを実行します。

f:id:ksby:20190713130313p:plain

IntelliJ IDEA の docker plugin を見ると kafka と zookeeper のコンテナが起動していることが確認できます。

f:id:ksby:20190713130411p:plain

topic を作成します。

f:id:ksby:20190713133500p:plain

kafka-console-producerkafka-console-consumer コマンドでメッセージの送受信が出来ることを確認します。

f:id:ksby:20190713133827p:plain

wurstmeister/kafka と比較すると Topic を作成するのに KAFKA_CREATE_TOPICS が使用できなくなりますが(コンテナに接続して kafka-topics --create コマンドを実行する必要があります)、cp-docker-images/examples/cp-all-in-one/docker-compose.yml を見ると control-center(管理画面らしい)や schema-registry(serializer, deserializer に Apache Avro を利用するのに必要らしい)、 ksql-server, ksql-cli(KSQLが試せる?)等もあるようなので、Kafka のいろいろな機能を試すならこちらの方が良さそうな気がします。

topic に partition を複数作成すればメッセージの受信を分散できる

partition が1つだけだと1つの consumer しか受信できませんでしたが、複数の partition を作成すれば複数の consumer で分散して受信できるようです。

Topic1 を partition = 3 で作成して試してみます。

f:id:ksby:20190713192841p:plain

src/test/java/ksbysample/eipapp/kafka/KafkaSendAndReceiveTest.java を以下のように変更してメッセージを 10件連続で送信できるようにする、かつ @KafkaListener による consumer は削除する、ようにします。またメッセージ送信時に正常に送信できていることを確認するよう kafkaTemplate.send(TOPIC_NAME, String.valueOf(i));kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get(); に変更します(こうすると送信時にエラーが出た時には例外が throw されます)。

    @Test
    void sendToKafkaTest() throws ExecutionException, InterruptedException {
//        kafkaTemplate.send(TOPIC_NAME, "test message");
        for (int i = 1; i <= 10; i++) {
            kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get();
        }
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
//        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

//        @KafkaListener(topics = TOPIC_NAME)
//        public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) {
//            log.warn(cr.toString());
//        }

    }

コマンドプロンプトを2つ起動して、どちらも kafka-console-consumer --bootstrap-server localhost:9092 --topic Topic1 --group topic1-consumer-group1 を実行します。

f:id:ksby:20190713193538p:plain

テストを実行してメッセージを 10件送信すると、2つの consumer で重複せずにメッセージを受信するようになりました。

f:id:ksby:20190713194257p:plain

テストクラスで複数の consumer で分散して受信してみます。KafkaSendAndReceiveTest.java を以下のように変更して consumer を2つ作成してから、

    @Test
    void sendToKafkaTest() throws ExecutionException, InterruptedException {
        // @KafkaListener の consumer が登録されるまで少し時間がかかるので 15秒 sleep する
        SleepUtils.sleep(15_000);

//        kafkaTemplate.send(TOPIC_NAME, "test message");
        for (int i = 1; i <= 10; i++) {
            kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get();
        }
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

        @KafkaListener(topics = TOPIC_NAME)
        public void listenByConsumerRecord1(ConsumerRecord<?, ?> cr) {
            log.warn(String.format("partition = %d, message = %s", cr.partition(), cr.value()));
        }

        @KafkaListener(topics = TOPIC_NAME)
        public void listenByConsumerRecord2(ConsumerRecord<?, ?> cr) {
            log.error(String.format("partition = %d, message = %s", cr.partition(), cr.value()));
        }

    }

テストを実行すると consumer が受信する partition がアサインされて、

f:id:ksby:20190713203516p:plain

それぞれの consumer でメッセージを受信します。この場合、partition = 0, 1 がアサインされたのが listenByConsumerRecord1 で、partition = 2 がアサインされたのが listenByConsumerRecord2 でした。

f:id:ksby:20190713203615p:plain

履歴

2019/07/17
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その34 )( Docker Compose でサーバを構築する、Kafka 編 )

概要

記事一覧はこちらです。

Spring Integration のアプリケーションで使用するサーバを Docker Compose で構築します。

  • Kafka+zookeeper の環境を構築します。
  • Kafka の Dockerイメージは wurstmeister/kafka を使用します。
  • zookeeper の Dockerイメージは zookeeper を使用します。
  • 今回は Kafka、zookeeper どちらも単体サーバで構成します。

参照したサイト・書籍

  1. kafka - Documentation
    https://kafka.apache.org/documentation/

  2. wurstmeister/kafka(Docker Hub のページ)
    https://hub.docker.com/r/wurstmeister/kafka/

  3. wurstmeister/kafka-docker(GitHub のページ)
    https://github.com/wurstmeister/kafka-docker

  4. zookeeper(Docker Hub のページ)
    https://hub.docker.com/_/zookeeper

  5. 31z4/zookeeper-docker(GitHub のページ)
    https://github.com/31z4/zookeeper-docker

  6. Deploy a Kafka broker in a Docker container
    https://www.kaaproject.org/kafka-docker

  7. Apache Kafkaをインストールして、コマンドラインツールを試す
    http://pppurple.hatenablog.com/entry/2018/10/10/232733

  8. Spring for Apache Kafka
    https://docs.spring.io/spring-kafka/reference/html/

  9. Intro to Apache Kafka with Spring
    https://www.baeldung.com/spring-kafka

  10. Apache Kafkaって本当に大丈夫?~故障検証のオーバービューと興味深い挙動の紹介~
    https://www.slideshare.net/hadoopxnttdata/hadoop-spark-conference-japan-2019nttdata

  11. LINEの大規模データパイプラインを支える、Apache Kafkaプラットフォームの運用の裏側
    https://logmi.jp/tech/articles/320330

  12. Apache Kafkaの概要とアーキテクチャ
    https://qiita.com/sigmalist/items/5a26ab519cbdf1e07af3

目次

  1. ksbysample-eipapp-kafka プロジェクトを作成する
  2. docker-compose.yml を作成する
  3. サーバを起動する
  4. KafkaTemplate と @KafkaListener でデータを送受信するサンプルを作成する
  5. group.id が同じ consumer はどれか1つでしかメッセージを受信しない
  6. group.id が異なれば、group1 の consumer でメッセージを受信した後 group2 の consumer でもメッセージを受信する

手順

ksbysample-eipapp-kafka プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。

plugins {
    id "org.springframework.boot" version "2.1.6.RELEASE"
    id "java"
    id "idea"
}

apply plugin: "io.spring.dependency-management"

group = "ksbysample.eipapp"
version = "0.0.1-SNAPSHOT"

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

configurations {
    // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする
    testAnnotationProcessor.extendsFrom annotationProcessor
    testImplementation.extendsFrom compileOnly
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.5.0")
    }
}

dependencies {
    def lombokVersion = "1.18.8"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}
  • dependencies block に implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE") を追加します。spring-integration-kafka は Appendix F. Dependency versions に記載されていませんので、バージョン番号も明記します。

    spring-integration-kafka を追加すると以下のモジュールが依存関係に追加されます。

\--- org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE
     +--- org.springframework.integration:spring-integration-core:5.1.6.RELEASE (*)
     \--- org.springframework.kafka:spring-kafka:2.2.7.RELEASE
          +--- org.springframework:spring-context:5.1.7.RELEASE -> 5.1.8.RELEASE (*)
          +--- org.springframework:spring-messaging:5.1.7.RELEASE -> 5.1.8.RELEASE (*)
          +--- org.springframework:spring-tx:5.1.7.RELEASE -> 5.1.8.RELEASE (*)
          +--- org.springframework.retry:spring-retry:1.2.4.RELEASE
          \--- org.apache.kafka:kafka-clients:2.0.1
               +--- org.lz4:lz4-java:1.4.1
               +--- org.xerial.snappy:snappy-java:1.1.7.1
               \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.26

ディレクトリ構成は以下のようにしています。

f:id:ksby:20190707100947p:plain

docker-compose.yml を作成する

今回は zookeeper、kafka どちらも単体のサーバで起動します。

プロジェクトのルートディレクトリ直下に docker-compose.yml を新規作成し、以下の内容を記述します。

version: '3'

services:
  #############################################################################
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it zookeeper /bin/sh
  #############################################################################
  # 単体 zookeeper
  zookeeper:
    image: zookeeper:3.5.5
    container_name: zookeeper-1
    hostname: zookeeper-1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181
    restart: always

  #############################################################################
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it kafka /bin/sh
  #############################################################################
  # 単体 Kafka
  kafka:
    image: wurstmeister/kafka:2.12-2.2.1
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      # Topic1 will have 1 partition and 1 replicas
      KAFKA_CREATE_TOPICS: "Topic1:1:1"
    depends_on:
      - zookeeper

サーバを起動する

コマンドプロンプトを起動し docker-compose.yml のあるディレクトリへ移動して docker-compose up -d コマンドを実行して起動します。

f:id:ksby:20190707103616p:plain

IntelliJ IDEA の docker plugin を見ると kafka と zookeeper のコンテナが起動していることが確認できます。

f:id:ksby:20190707103808p:plain

kafka コンテナに接続して kafka-topics.sh で topics の作成状況を確認します(kafka-topics.sh は kafka の Documentation 参照)。

f:id:ksby:20190707104734p:plain

kafka-console-consumer.sh と kafka-console-producer.sh でメッセージの送受信のテストをしてみます。

  • kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1 を実行して Topic1 にメッセージが送信されるのを待機します。
  • kafka-console-producer.sh --broker-list localhost:9092 --topic Topic1 を実行後、文字列を入力してメッセージを送信します。

f:id:ksby:20190707110012p:plain

kafka-console-producer.sh を起動した方でメッセージを入力すると kafka-console-consumer.sh を起動した方で入力されたメッセージが表示されました。メッセージの送受信は問題なく出来るようです。kafka-console-producer.sh は Ctrl+D を、kafka-console-consumer.sh は Ctrl+C を入力して終了します。

Topic1 のメッセージをクリアしたいので、docker-compose downdocker-compose up -d コマンドを実行してコンテナを起動し直します。

KafkaTemplate と @KafkaListener でデータを送受信するサンプルを作成する

Spring Integration DSL で書く前に KafkaTemplate と @KafkaListener でデータを送受信するサンプルを JUnit 5 のテストで作成します。

src/main/resources/application.properties を以下のように変更します。

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=topic1-consumer-group1

src/test/java/ksbysample/eipapp/kafka を作成した後、その下に KafkaSendAndReceiveTest.java を新規作成して以下の内容を記述します。

package ksbysample.eipapp.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

@Slf4j
@SpringBootTest
public class KafkaSendAndReceiveTest {

    private static final String TOPIC_NAME = "Topic1";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void sendToKafkaTest() {
        kafkaTemplate.send(TOPIC_NAME, "test message");
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

        @KafkaListener(topics = TOPIC_NAME)
        public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) {
            log.warn(cr.toString());
        }

    }

    static class SleepUtils {

        public static void sleep(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

    }

}

テストを実行すると送信したメッセージがを @KafkaListener アノテーションを付与した listenByConsumerRecord メソッドで受信してログが出力されます。

f:id:ksby:20190707122134p:plain

log.warn(cr.toString()); では ConsumerRecord(topic = Topic1, partition = 0, offset = 16, CreateTime = 1562469618138, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test message) というメッセージが出力されていました。

group.id が同じ consumer はどれか1つでしかメッセージを受信しない

kafka-console-consumer.sh を使って動作確認します。

まずコマンドプロンプトを2つ起動してから docker exec -it kafka /bin/sh コマンドで kafka のコンテナに接続した後、kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1 --group topic1-consumer-group1 コマンドを実行して同じ group.id でメッセージ送信を待ちます。

f:id:ksby:20190707140347p:plain

KafkaSendAndReceiveTest.java から SleepUtils.sleep(5_000); と @KafkaListener をコメントアウトして、メッセージを受信せず、かつ送信後すぐにテストが終了するようにします。

    @Test
    void sendToKafkaTest() {
        kafkaTemplate.send(TOPIC_NAME, "test message");
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
//        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

//        @KafkaListener(topics = TOPIC_NAME)
//        public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) {
//            log.warn(cr.toString());
//            SleepUtils.sleep(1_000);
//        }

    }

テストを実行してメッセージを送信すると、2つ起動している consumer の内1つだけ受信します。

f:id:ksby:20190707141042p:plain

追加で2回テストを実行してみるとメッセージを受信したのは最初に受信した consumer でした。

f:id:ksby:20190707141314p:plain

テストクラスで 1000件連続してメッセージを送信するように変更してみましたが、

    @Test
    void sendToKafkaTest() {
//        kafkaTemplate.send(TOPIC_NAME, "test message");
        for (int i = 1; i <= 1000; i++) {
            kafkaTemplate.send(TOPIC_NAME, String.valueOf(i));
            kafkaTemplate.flush();
        }
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
//        SleepUtils.sleep(5_000);
    }

やっぱり一方の consumer だけしか受信しません。

f:id:ksby:20190707141842p:plain

受信していた consumer を落とすと、今度はもう一方の接続していた consumer でメッセージを受信しました。

f:id:ksby:20190707142330p:plain

ここまでの動作をまとめると、

  • 同じ group.id の consumer を複数接続してもメッセージを受信するのはその内の1つ。
  • 大量のメッセージを送信してもメッセージを受信するのは決められた1つだけである。複数接続している consumer で分散して受信する訳ではない。
  • 分散する設定があるのかは不明。

group.id が異なれば、group1 の consumer でメッセージを受信した後 group2 の consumer でもメッセージを受信する

docker-compose downdocker-compose up -d コマンドを実行してコンテナを起動し直した後、今度は group.id が異なる kafka-console-consumer.sh を実行します。

f:id:ksby:20190707150931p:plain

テストを実行してメッセージを送信すると、各 consumer でメッセージを受信しました。

f:id:ksby:20190707151254p:plain

もう1点。コンテナを起動し直して Topic1 のメッセージをクリアしてから group.id が topic1-consumer-group1 の kafka-console-consumer.sh だけ起動しておきます。

f:id:ksby:20190707151649p:plain

テストを実行してメッセージを送信すると起動しておいた consumer でメッセージを受信しますが、

f:id:ksby:20190707151837p:plain

後から別の group.id(topic1-consumer-group2)の kafka-console-consumer.sh を起動すると、この時は topic1-consumer-group2 の consumer ではメッセージを受信しません。

f:id:ksby:20190707152612p:plain

ただし1度 group.id = topic1-consumer-group2 の接続情報が登録されると、一旦 topic1-consumer-group2 の kafka-console-consumer.sh を終了してから、

f:id:ksby:20190707152847p:plain

テストを実行してメッセージを送信して topic1-consumer-group1 の kafka-console-consumer.sh でメッセージを受信した後、

f:id:ksby:20190707153024p:plain

topic1-consumer-group2 の kafka-console-consumer.sh を起動すると、起動前に送信されたメッセージが受信されます。

f:id:ksby:20190707153204p:plain

KafkaSendAndReceiveTest.java 内で異なる group.id の @KafkaListener を用意してメッセージを受信するには以下のように変更します。

    @Test
    void sendToKafkaTest() {
        kafkaTemplate.send(TOPIC_NAME, "test message");
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

        @KafkaListener(topics = TOPIC_NAME, groupId = "topic1-consumer-group1")
        public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) {
            log.warn(cr.toString());
        }

        // group.id は @KafkaListener アノテーションの groupId 属性で指定可能
        // メソッドの引数にはメッセージのデータの型(今回は String)の引数を指定することも可能
        @KafkaListener(topics = TOPIC_NAME, groupId = "topic1-consumer-group2")
        public void listenByString(String msg) {
            log.error(msg);
        }

    }

テストを実行すると @KafkaListener アノテーションを付与したメソッドそれぞれでメッセージを受信していることが確認できます。

f:id:ksby:20190707153939p:plain

Spring Integration DSL でのサンプルや Kafka、zookeeper を複数サーバ起動する場合のサンプルを作成したいので、あと数回続きます。

履歴

2019/07/07
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その33 )( 5.1 からの新機能 Java Functions Improvements を試してみる )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. Spring Integration 5.1 goes GA! - Java Functions Improvements
    https://spring.io/blog/2018/10/29/spring-integration-5-1-goes-ga#java-functions-improvements

  2. spring-integration/src/reference/asciidoc/configuration.adoc
    https://github.com/spring-projects/spring-integration/blob/master/src/reference/asciidoc/configuration.adoc

目次

  1. ksbysample-eipapp-functions プロジェクトを作成する
  2. Supplier&Function&Consumer の簡単なサンプルを作成する
  3. Supplier は Supplier<Message<?>> と書くことで Message オブジェクトを返すことが可能
  4. Function#apply で IntegrationFlow Bean の処理を呼び出す

手順

ksbysample-eipapp-functions プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。

plugins {
    id "org.springframework.boot" version "2.1.6.RELEASE"
    id "java"
    id "idea"
}

apply plugin: "io.spring.dependency-management"

group = "ksbysample.eipapp"
version = "0.0.1-SNAPSHOT"

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

configurations {
    // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする
    testAnnotationProcessor.extendsFrom annotationProcessor
    testImplementation.extendsFrom compileOnly
}

repositories {
    mavenCentral()
}

dependencies {
    def lombokVersion = "1.18.6"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
}

ディレクトリ構成は以下のようにしています。

f:id:ksby:20190630230404p:plain

Supplier&Function&Consumer の簡単なサンプルを作成する

package ksbysample.eipapp.functions;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

@Slf4j
@Configuration
public class SupplierAndFunctionAndConsumerFlow {

    private AtomicInteger count = new AtomicInteger(0);

    // Supplier<?> は ? 型のオブジェクトが payload にセットされた Message を返す
    // MessageSource として利用できる
    @Bean
    public Supplier<Integer> countSupplier() {
        return () -> count.addAndGet(1);
    }

    @Bean
    public IntegrationFlow countDisplayFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .transform(DoubleAndToStrFunc())
                .handle(addDotFunc())
                .handle(printFunc())
                .get();
    }

    // Function<?, ?> は .transform(...) で利用したり、
    public Function<Integer, String> DoubleAndToStrFunc() {
        return v -> String.valueOf(v * 2);
    }

    // .handle(...) で利用できる
    public Function<String, String> addDotFunc() {
        return s -> s + "...";
    }

    // Consumer<?> は .handle(...) で利用できる
    public Consumer<String> printFunc() {
        return s -> log.warn(s);
    }

}

上のサンプルを実行すると "2..."、"4..."、"6..." と順に出力されます。

f:id:ksby:20190703010213p:plain

同じ処理を Supplier は MessageSource で置き換えて、Function と Consumer は .transform(...)、.handle(...) に直接記述すると以下のようになります。

package ksbysample.eipapp.functions;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.messaging.support.MessageBuilder;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
@Configuration
public class MessageSourceAndLambdaFlow {

    private AtomicInteger count = new AtomicInteger(0);

    @Bean
    public MessageSource<Integer> countMessageSource() {
        return () -> MessageBuilder
                .withPayload(count.addAndGet(1))
                .build();
    }

    @Bean
    public IntegrationFlow countDisplayFlow() {
        return IntegrationFlows
                .from(countMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<Integer, String>transform(v -> String.valueOf(v * 2))
                .handle((Function<String, String>) s -> s + "...")
                .handle((Consumer<String>) s -> log.warn(s))
                .get();
    }

}

f:id:ksby:20190703012428p:plain

lambda 式を使っているのに (Function<String, String>)(Consumer<String>) とキャストしないと build 時にエラーが出るのがなんか今ひとつな気がします。。。

Supplier は Supplier<Message<?>> と書くことで Message オブジェクトを返すことが可能

package ksbysample.eipapp.functions;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

@Configuration
public class SupplierAndFunction2Flow {

    @Autowired
    private NullChannel nullChannel;

    private AtomicInteger count = new AtomicInteger(0);

    // Supplier<Message<?>> という記述にして Message オブジェクトを返すようにすることも出来る
    // この場合 MessageSource の時のように header を追加することが可能。
    @Bean
    public Supplier<Message<Integer>> countSupplier() {
        return () -> MessageBuilder
                .withPayload(count.addAndGet(1))
                .setHeader("random", ThreadLocalRandom.current().nextInt(100))
                .build();
    }

    @Bean
    public IntegrationFlow countDisplayFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log(LoggingHandler.Level.WARN)
                .channel(nullChannel)
                .get();
    }

}

実行すると headers の中に random というデータが追加されています。

f:id:ksby:20190703013814p:plain

Function や Consumer でも Function<Message<Integer>, Message<String>>Consumer<Message<String>> のように試してみたのですが、こちらは実行時にエラーになりました。この書き方が出来るのは Supplier だけのようです。

Function#apply で IntegrationFlow Bean の処理を呼び出す

以前 @MessagingGateway アノテーションを付加した interface を用意して IntegrationFlow の処理を呼び出す方法を記載しましたが、Function<?, ?> 型のフィールドを用意して Function#apply を呼び出すと IntegrationFlow の処理を呼び出すことができます。

package ksbysample.eipapp.functions;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;

import java.util.function.Function;

import static java.util.Collections.singletonMap;

@Slf4j
@Component
public class FunctionSample implements CommandLineRunner {

    // IntegrationFlow で定義した処理を呼び出すには、
    // * IntegrationFlows.from(Function.class)... と定義する
    // * Function<?, ?> 型の変数をフィールドに定義し 
    //   @Autowired, @Qualifier("<IntegrationFlow名>.gateway"), @Lazy アノテーションを付与する  
    @Autowired
    @Qualifier("throwExceptionWithRetryFlow.gateway")
    @Lazy
    private Function<String, String> throwExceptionWithRetryFlowFunc;

    @Override
    public void run(String... args) throws Exception {
        System.out.println(throwExceptionWithRetryFlowFunc.apply("success???"));
    }

    @Configuration
    static class FlowConfig {

        /**
         * リトライ回数は最大5回、リトライ時は2秒待機する RetryTemplate を生成する
         *
         * @return {@link RetryTemplate} object
         */
        @Bean
        public RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();

            // SimpleRetryPolicy
            retryTemplate.setRetryPolicy(
                    new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));

            // FixedBackOffPolicy
            FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
            fixedBackOffPolicy.setBackOffPeriod(2000);
            retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

            return retryTemplate;
        }

        @Bean
        public RequestHandlerRetryAdvice retryAdvice() {
            RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
            retryAdvice.setRetryTemplate(retryTemplate());
            // リトライしても処理が成功しなかった場合には
            // MessageHandlingException#getCause#getMessage で取得したメッセージを payload
            // にセットした Message オブジェクトが次の処理に渡されるようにする
            retryAdvice.setRecoveryCallback(context -> {
                MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
                String errMsg = e.getCause().getMessage();
                log.error(errMsg);
                return errMsg;
            });
            return retryAdvice;
        }

        @Bean
        public IntegrationFlow throwExceptionWithRetryFlow() {

            return IntegrationFlows
                    .from(Function.class)
                    .handle((GenericHandler<Object>) (p, h) -> {
                        RetryContext retryContext = RetrySynchronizationManager.getContext();
                        log.warn("★★★ リトライ回数 = " + retryContext.getRetryCount());

                        // リトライ処理をさせたいので強制的に RuntimeException を throw する
                        if (true) {
                            throw new RuntimeException("error!!");
                        }
                        return p;
                    }, e -> e.advice(retryAdvice()))
                    .get();
        }

    }

}

実行すると5回リトライして、最後に RuntimeException に渡した "error!!" のメッセージが表示されました。

f:id:ksby:20190703235126p:plain

started throwExceptionWithRetryFlow.gatewaystopped throwExceptionWithRetryFlow.gateway のメッセージが何度も出力されるのが気になります。

複数出力される理由を調べてみると、まず org.springframework.integration.gateway.GatewayProxyFactoryBean#onInit で7個の method 分 gatewayMap に gateway(throwExceptionWithRetryFlow.gateway)が put されて、

f:id:ksby:20190705004210p:plain

org.springframework.integration.gateway.GatewayProxyFactoryBean の doStart、doStop メソッドで gateway.start();gateway.stop(); が呼び出されるからでした。

f:id:ksby:20190705004550p:plain

1つ呼び出す仕組みをつくるだけで gateway が7個 start するとは。。。 あまり多用する仕組みではないのかもしれません。

履歴

2019/07/05
新規作成。

Spring Boot + Spring Integration でいろいろ試してみる ( その32 )( Aggregator のサンプルを作ってみる2 )

概要

記事一覧はこちらです。

  • 引き続き Spring Integration DSL8.4. Aggregator を使用したサンプルを作成します。
  • 今回は Aggregator で動作がよく分かっていない点をテストクラスを作成して確認してみます。

参照したサイト・書籍

目次

  1. Aggretator の動作をまとめてみる(よく分かっていない点も含めて)
  2. Spring Boot を 2.1.5 → 2.1.6 にバージョンアップし、JUnit 5 のモジュールを依存関係に追加する
  3. correlationId は同じだが sequenceSize を 3,4,5 と増やしたメッセージを A1→A2→A3 の順に送信した場合(sequenceSize = 3 が正しい)、Aggregator は A3 を受信した時に release するのか?
  4. complete フラグが true の MESSAGE_STORE が残っている時に同じ correlationId を持つメッセージが送信されてきたらどうなるのか?
  5. aggregator が複数ある場合(a1,a2を用意)に同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信したら a1 で release されるのか?

手順

Aggretator の動作をまとめてみる(よく分かっていない点も含めて)

  • メッセージの header にある correlationId を元に MESSAGE_GROUP を作成する。CorrelationStrategy を実装して correlationId 以外のデータをキーとして蓄積させることも可能。
  • MESSAGE_GROUP にメッセージの header にある sequenceSize の数だけメッセージが蓄積されたら、蓄積されているメッセージの payload のデータを List にまとめて、そのデータを payload にセットしたメッセージを作成して次に送信する(release する)。sequenceSize は一番最初のメッセージのものを参照するのか、最新のメッセージのものを見るのかは不明。
  • ReleaseStrategy を実装すれば release する条件を sequenceSize 以外に設定することが可能。
  • ReleaseStrategy はメッセージを受信する毎に実行される。
  • release された MESSAGE_GROUP の complete フラグは false → true に変わる。.expireGroupsUponCompletion(true) が指定されていると complete フラグが true になった MESSAGE_GROUP は即時削除される。.expireGroupsUponCompletion(false) の場合(記述していない場合は false になる)、SimpleMessageStore ならば削除されるがそれ以外(RedisMessageStore を含む)は残る。
    • org.springframework.integration.aggregator.AggregatingMessageHandler#afterRelease には以下のように実装されており、 f:id:ksby:20190623133649p:plain
    • SimpleMessageStore や RedisMessageStore のダイアログを作成してみると以下のようになる。 f:id:ksby:20190623133828p:plain
  • complete フラグが true になって残ったままになっている MESSAGE_STORE と同じ correlationId を持つメッセージが送信されてきた場合、Aggregator は何もしない。メッセージをそのまま次には流さないが、破棄するのか discardChannel に送信するのかは不明。
  • MESSAGE_GROUP に蓄積されたメッセージは無期限で待つ。.groupTimeout(...) を記述すればタイムアウトする時間を設定できる。タイムアウトすると蓄積されていたメッセージ、及び後から遅れてきたメッセージが全て discardChannel に送信される(.discardChannel(...) で設定する)。discardChannel を指定していなければ破棄される。

不明な点をサンプルを作成して確認してみます。

Spring Boot を 2.1.5 → 2.1.6 にバージョンアップし、JUnit 5 のモジュールを依存関係に追加する

今回はテストクラスで動作確認をしたいので JUnit 5 を使用可能にします。また Spring Boot の 2.1.6 がリリースされていたのでバージョンアップします。

build.gradle を以下のように変更します。

plugins {
    id 'org.springframework.boot' version '2.1.6.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'ksbysample.eipapp'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする
    testAnnotationProcessor.extendsFrom annotationProcessor
    testImplementation.extendsFrom compileOnly
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.4.2")
    }
}

dependencies {
    def lombokVersion = "1.18.6"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.boot:spring-boot-starter-data-redis")
    implementation("org.springframework.integration:spring-integration-redis")
    implementation("org.springframework.boot:spring-boot-starter-json") {
        exclude group: "org.springframework", module: "spring-web"
    }
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.assertj:assertj-core:3.12.2")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}
  • plugins block 内で id 'org.springframework.boot' version '2.1.5.RELEASE'id 'org.springframework.boot' version '2.1.6.RELEASE' に変更します。
  • dependencyManagement { ... } を追加します。
  • dependencies block に以下の記述を追加します。
    • testImplementation("org.assertj:assertj-core:3.12.2")
    • testCompile("org.junit.jupiter:junit-jupiter")
    • testRuntime("org.junit.platform:junit-platform-launcher")
  • test { ... } を追加します。

correlationId は同じだが sequenceSize を 3,4,5 と増やしたメッセージを A1→A2→A3 の順に送信した場合(sequenceSize = 3 が正しい)、Aggregator は A3 を受信した時に release するのか?

src/main/java/ksbysample/eipapp/aggregator/SplitZundokoFlowConfig.java のクラスに付与した @Configuration アノテーションコメントアウトした後、

@Slf4j
//@Configuration
public class SplitZundokoFlowConfig {

src/test/java/ksbysample/eipapp/aggregator を新規作成してから、その下に IncrementSequenceSizeFlowTest.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

@Slf4j
@SpringBootTest
public class IncrementSequenceSizeFlowTest {

    @Autowired
    private MessageBuilderFactory messageBuilderFactory;

    @Autowired
    private IntegrationFlow aggregateFlow;

    @Autowired
    private QueueChannel outputChannel;

    @Test
    void sequenceSizeをインクリメントするメッセージを3件送信した時に最後のメッセージを受信したaggregagorがreleaseするかのテスト() {
        // メッセージで使用する correlationId を発番する
        UUID correlationId = IntegrationObjectSupport.generateId();

        // A1, A2, A3 のメッセージを送信する
        // sequenceSize を 3, 4, 5 と1つずつ増やす(3が正しい値)
        Arrays.asList(1, 2, 3).forEach(n -> {
            String payload = String.format("A%d", n);
            Message<String> msg = messageBuilderFactory
                    .fromMessage(MessageBuilder.withPayload(payload).build())
                    .pushSequenceDetails(correlationId, n, n + 2)
                    .build();
            aggregateFlow.getInputChannel().send(msg);
        });

        // aggregator が release しているか確認する
        Message<?> result = outputChannel.receive(5_000);
        assertThat(result).isNotNull();
        assertThat(result.getPayload()).isEqualTo(Arrays.asList("A1", "A2", "A3"));
        assertThat(result.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE))
                .isEqualTo(3);
    }

    @TestConfiguration
    static class FlowConfig {

        @Autowired
        private RedisConnectionFactory redisConnectionFactory;

        /**
         * Redis を MessageStore として使用するための設定。
         * Redis に格納する message は JSON フォーマットにする。
         * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store
         * 参照。
         *
         * @return {@link RedisMessageStore} object
         */
        @Bean
        public RedisMessageStore redisMessageStore() {
            RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
            ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
            RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
            store.setValueSerializer(serializer);
            return store;
        }

        @Bean
        public IntegrationFlow aggregateFlow() {
            return f -> f
                    .log(LoggingHandler.Level.WARN)
                    .aggregate(a -> a.messageStore(redisMessageStore()))
                    .log(LoggingHandler.Level.ERROR)
                    .channel(outputChannel());
        }

        @Bean
        public MessageChannel outputChannel() {
            return MessageChannels.queue().get();
        }

    }

}

テストを実行してみると aggregator は release していますが、release された後のメッセージの sequenceSize は 3 ではなく 5 になるという結果でした。

f:id:ksby:20190622211313p:plain

debug 実行して2件メッセージを送信したところで止めて Redis のデータを見てみると、

f:id:ksby:20190622211814p:plain f:id:ksby:20190622211928p:plain

MESSAGE_GROUP のデータには sequenceSize は含まれていませんでした。sequenceSize のチェックには一番最初に受信したメッセージのデータを使用し、release 時には一番最後に受信した(release の判定をした)メッセージの sequenceSize を集約したメッセージの header に入れる、という仕様のようです。

complete フラグが true の MESSAGE_STORE が残っている時に同じ correlationId を持つメッセージが送信されてきたらどうなるのか?

以下の内容のテストを作成し、同じ correlationId で sequenceSize = 1 のメッセージを2件送信して動作確認してみました。

    @Test
    void sampleTest() {
        // メッセージで使用する correlationId を発番する
        UUID correlationId = IntegrationObjectSupport.generateId();

        // A1 メッセージを送信する
        // sequenceSize = 1 なので、このメッセージを受信すると aggregagtor は release する
        // MESSAGE_STORE も complete = true になる
        Message<String> msg = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A1").build())
                .pushSequenceDetails(correlationId, 1, 1)
                .build();
        aggregateFlow.getInputChannel().send(msg);

        // A1 と correlationId が同じ A2 メッセージを送信する
        Message<String> msg2 = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A2").build())
                .pushSequenceDetails(correlationId, 2, 1)
                .build();
        aggregateFlow.getInputChannel().send(msg2);
    }

aggregator の処理は org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler#handleMessageInternal を通るのですが、

f:id:ksby:20190623183947p:plain

  • 1件目のメッセージを受信した時には①→②→③→④を通り MESSAGE_GROUP の complete フラグが true になります。.expireGroupsUponCompletion(true) を指定していないので、complete フラグが true になった MESSAGE_GROUP は削除されずに残ります。
  • 2件目のメッセージを受信した時には①の処理で MESSAGE_GROUP を取得しますが if (!messageGroup.isComplete() && messageGroup.canAdd(message)) { が false となるため(messageGroup.isComplete() が true)、②以降の処理が実行されませんでした。

結論として、complete フラグが true の MESSAGE_GROUP が残っている時に同じ correlationId を持つメッセージが送信されてくると何もされません(MessageStore にメッセージが蓄積されず releaseStrategy も実行されません)。

aggregator が複数ある場合(a1,a2を用意)に同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信したら a1 で release されるのか?

1つ上の記述で見た org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler#handleMessageInternal の処理を見ると①~④の処理開始前に lockRegistry から取得した Lock オブジェクトでロックしています。

Spring Integration - Lock Registry を見ると For synchronizing updates across servers where a shared MessageGroupStore is being used, you must configure a shared lock registry. という記述がありました。

LockRegistry インターフェースの実装クラスを確認すると RedisLockRegistry クラスがあります。

f:id:ksby:20190623213307p:plain

これらの内容から同じ LockRegistry を使用すれば Aggregator が複数存在しても適切にロックしながら処理をするので、同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信しても a1 で release されるものと考えられます。

src/test/java/ksbysample/eipapp/aggregator/MultiAggregatorTest.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;

@Slf4j
@SpringBootTest
public class MultiAggregatorTest {

    @Autowired
    private MessageBuilderFactory messageBuilderFactory;

    @Autowired
    private IntegrationFlow a1Flow;

    @Autowired
    private IntegrationFlow a2Flow;

    @Autowired
    private QueueChannel outputChannel;

    @Test
    void sampleTest() {
        final int SEQUENCE_SIZE = 3;

        // メッセージで使用する correlationId を発番する
        UUID correlationId = IntegrationObjectSupport.generateId();

        int sequenceNumber = 1;
        Message<String> msg = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A1").build())
                .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE)
                .build();
        a1Flow.getInputChannel().send(msg);

        Message<String> msg2 = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A2").build())
                .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE)
                .build();
        a2Flow.getInputChannel().send(msg2);

        SleepUtils.sleep(1_000);
        Message<String> msg3 = messageBuilderFactory
                .fromMessage(MessageBuilder.withPayload("A3").build())
                .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE)
                .build();
        a1Flow.getInputChannel().send(msg3);

        Message<?> result = outputChannel.receive(15_000);
        assertThat(result).isNotNull();
        assertThat(result.getPayload()).isEqualTo(Arrays.asList("A1", "A2", "A3"));
        assertThat(result.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE))
                .isEqualTo(3);
    }

    @TestConfiguration
    static class FlowConfig {

        @Autowired
        private RedisConnectionFactory redisConnectionFactory;

        /**
         * Redis を MessageStore として使用するための設定。
         * Redis に格納する message は JSON フォーマットにする。
         * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store
         * 参照。
         *
         * @return {@link RedisMessageStore} object
         */
        @Bean
        public RedisMessageStore redisMessageStore() {
            RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
            ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
            RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
            store.setValueSerializer(serializer);
            return store;
        }

        @Bean
        public IntegrationFlow a1Flow() {
            return f -> f
                    .log(LoggingHandler.Level.WARN)
                    // 以降の処理を別スレッドにすることで、send メソッド呼び出し時に待機しないようにする
                    .channel(c -> c.executor(Executors.newFixedThreadPool(1)))
                    .aggregate(a -> a.messageStore(redisMessageStore())
                            // a1Flow, a2Flow で同じ LockRegistry を使用するよう設定する
                            .lockRegistry(aggretatorLockRegistry())
                            .releaseStrategy(g -> {
                                log.error("PASS_A1");
                                return g.getMessages().size() == g.getSequenceSize();
                            }))
                    .log(LoggingHandler.Level.ERROR)
                    .channel(outputChannel());
        }

        @Bean
        public IntegrationFlow a2Flow() {
            return f -> f
                    .log()
                    .channel(c -> c.executor(Executors.newFixedThreadPool(1)))
                    .aggregate(a -> a.messageStore(redisMessageStore())
                            .lockRegistry(aggretatorLockRegistry())
                            .releaseStrategy(g -> {
                                SleepUtils.sleep(10_000);
                                log.error("PASS_A2");
                                return g.getMessages().size() == g.getSequenceSize();
                            }))
                    .log();
        }

        @Bean
        public LockRegistry aggretatorLockRegistry() {
            return new RedisLockRegistry(redisConnectionFactory, "AGGREGATOR_LOCK");
        }

        @Bean
        public MessageChannel outputChannel() {
            return MessageChannels.queue().get();
        }

    }

    static class SleepUtils {

        public static void sleep(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

    }

}

テストを実行すると、

f:id:ksby:20190623214053p:plain

  • send メッセージで処理が待機しないように実装しているので(..aggregate(...) の処理を別スレッドで実行するように実装しているので)、3件のメッセージ送信は1つ前のメッセージ送信の aggregator の処理を待たずに連続で送信されている。
  • 2件目のメッセージの releaseStrategy が処理されるのは送信してから 10秒後。
  • 3件目のメッセージの releaseStrategy は2件目のメッセージの releaseStrategy が終わった後に実行される。
  • 同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信すると a1 で release された。

という結果になりました。まあ、実際に複数の aggregator が必要になるようなデータ量を処理することになったらこんな実装をするのかは分かりませんが。。。

履歴

2019/06/23
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その31 )( Aggregator のサンプルを作ってみる )

概要

記事一覧はこちらです。

  • Spring Integration DSL8.4. Aggregator を使用したサンプルを作成します。
  • Aggregator でメッセージがどのように集約されるのかを見られるようにするために、単体の Redis サーバを Docker Compose で構築して MessageStore として利用します。Redis クライアントには JSON のデータを見たいので Medis を使用します。
  • サンプルは以前流行ったズンドコキヨシをベースに Aggregator の動きが分かるように少しルールを変えたものを作成してみます。

参照したサイト・書籍

  1. Spring Integration - 8.4. Aggregator
    https://docs.spring.io/spring-integration/docs/current/reference/html/#aggregator

  2. luin/medis
    https://github.com/luin/medis

目次

  1. ksbysample-eipapp-aggregator プロジェクトを作成する
  2. Docker Compose で単一の Redis サーバを構築する
  3. Redis の GUI クライアントとして Medis をインストールする
  4. "ずん" あるいは "どこ" のメッセージを送信して5つずつ aggregate するサンプルを作成する
  5. "ずん"、"ずん"、"ずん"、"ずん"、"どこ" で1セットのメッセージを送信後にバラバラに分解してから再び1セットのメッセージに aggregate するサンプルを作成する

手順

ksbysample-eipapp-aggregator プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。

plugins {
    id 'org.springframework.boot' version '2.1.5.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'ksbysample.eipapp'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

repositories {
    mavenCentral()
}

configurations {
    // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする
    testAnnotationProcessor.extendsFrom annotationProcessor
    testImplementation.extendsFrom compileOnly
}

dependencies {
    def lombokVersion = "1.18.6"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.boot:spring-boot-starter-data-redis")
    implementation("org.springframework.integration:spring-integration-redis")
    implementation("org.springframework.boot:spring-boot-starter-json") {
        exclude group: "org.springframework", module: "spring-web"
    }
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
}
  • Redis を MessageStore として利用するので以下の行を追加します。
    • implementation("org.springframework.boot:spring-boot-starter-data-redis")
    • implementation("org.springframework.integration:spring-integration-redis")
  • Redis に JSON フォーマットでメッセージを格納したいので以下の行を追加します。spring-boot-starter-json を追加すると org.springframework:spring-web も依存関係に追加されてしまうのですが、今回は不要なので exclude で除外します。
    • implementation("org.springframework.boot:spring-boot-starter-json")
  • ログを出力する時に @Slf4j アノテーションを使いたいので以下の行を追加して lombok を使えるようにします。
    • annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    • compileOnly("org.projectlombok:lombok:${lombokVersion}")

ディレクトリ構成は以下のようにしています。

f:id:ksby:20190615005011p:plain

Docker Compose で単一の Redis サーバを構築する

プロジェクトのルートディレクトリ直下に docker-compose.yml を新規作成し、以下の内容を記述します。

version: '3'

services:
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it redis /bin/sh
  #
  # 起動したコンテナの redis に redis-cli でアクセスするには以下のコマンドを実行する
  # docker exec -it redis redis-cli
  #
  #############################################################################
  # 単体 Redis サーバ
  redis:
    image: redis:5.0.5
    container_name: redis
    ports:
      - "6379:6379"

docker-compose up -d コマンドを実行して Redis を起動します。

f:id:ksby:20190615005616p:plain

Redis の GUI クライアントとして Medis をインストールする

https://github.com/luin/medis から Medis on Windowsインストーラをダウンロードしてインストールします。

"ずん" あるいは "どこ" のメッセージを送信して5つ ずつ aggregate するサンプルを作成する

以下の動作を行うサンプルを作成します。

  • "ずん"、"どこ" をランダムに出力する MessageSource を作成する。”ずん" を 4/5、"どこ" を 1/5 の確立で返す。
  • MessageSource からは 15秒ごとに取得する(aggregator の動作を確認するために時間を遅めにする)。
  • aggregator は Redis を MessageStore として使用し、メッセージが5つ溜まったら集約して1つのメッセージにして次の処理に流す。タイムアウトは設定しない(5つ溜まるまでずっと待つ)。
  • 最後に集約されたメッセージが "ずん", "ずん", "ずん", "ずん", "どこ" だったら "きよし!" とログに出力する。

src/main/java/ksbysample/eipapp/aggregator の下に RandomZundokoFlowConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

@Slf4j
@Configuration
public class RandomZundokoFlowConfig {

    private static final Random r = new Random();
    private static final String[] ZUNDOKO = new String[]{"ずん", "ずん", "ずん", "ずん", "どこ"};

    private static final long POLLER_DELAY_PERIOD = 15000L;

    // Redis の設定はデフォルト値をそのまま使用するので、application.poperties には何も記載していない
    // https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html#common-application-properties
    // の spring.redis.~ を参照。
    private final RedisConnectionFactory redisConnectionFactory;

    public RandomZundokoFlowConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    /**
     * Redis を MessageStore として使用するための設定。
     * Redis に格納する message は JSON フォーマットにする。
     * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store
     * 参照。
     *
     * @return {@link RedisMessageStore} object
     */
    @Bean
    public RedisMessageStore redisMessageStore() {
        RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
        ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
        RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
        store.setValueSerializer(serializer);
        return store;
    }

    @Bean
    public MessageSource<String> zundokoMessageSource() {
        return () -> MessageBuilder.withPayload(ZUNDOKO[r.nextInt(5)]).build();
    }

    @Bean
    public IntegrationFlow zundokoFlow() {
        return IntegrationFlows.from(zundokoMessageSource()
                , e -> e.poller(Pollers.fixedDelay(POLLER_DELAY_PERIOD)))
                // kiyoshiFlow にメッセージを送信する
                .channel(kiyoshiFlow().getInputChannel())
                .get();
    }

    @Bean
    public IntegrationFlow kiyoshiFlow() {
        return f -> f
                // メッセージが5つ溜まったら集約して次の処理に流す
                // * 流れてくる message の header に correlationId がないので
                //   correlationExpression("1") を記述して固定で "1" という扱いにする。
                //   これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。
                // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを
                //   次の処理に流した後に再びメッセージが蓄積されるようになる。
                .aggregate(a -> a.correlationExpression("1")
                        .messageStore(redisMessageStore())
                        .releaseStrategy(g -> g.size() == 5)
                        .expireGroupsUponCompletion(true))
                // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する
                .log()
                .<List<String>>handle((p, h) -> {
                    if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) {
                        log.error("きよし!");
                    }
                    return null;
                });
    }

}

実行して Redis 内に蓄積されている message の状態を見ると、"MESSAGE_GROUP_" + <correlationIdとして使用する文字列(今回は "1" 固定)> のレコードと個々の message のレコードがあることが確認できます。

f:id:ksby:20190615100441p:plain f:id:ksby:20190615100523p:plain

message が5つ蓄積されて aggregator により1つの message に集約されて次の処理に渡されると、蓄積されていた MESSAGE_GROUP と MESSAGE は削除されます。

f:id:ksby:20190615100821p:plain

コンソールには以下のように出力されます。payload=[ずん, ずん, ずん, ずん, どこ] になっていると "きよし!" の文字列が出力されています。またスレッド名を見るとバラバラでした。1つのスレッドで処理するわけではないようです。

f:id:ksby:20190615102346p:plain

.outputProcessor(...) を使うと次の処理に渡す時の palyload にセットする値を変更することができます。デフォルトでは List でまとめられますが、以下のように実装するとカンマで結合した String オブジェクトになります。

    @Bean
    public IntegrationFlow kiyoshiFlow() {
        return f -> f
                // メッセージが5つ溜まったら集約して次の処理に流す
                // * 流れてくる message の header に correlationId がないので
                //   correlationExpression("1") を記述して固定で "1" という扱いにする。
                //   これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。
                // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを
                //   次の処理に流した後に再びメッセージが蓄積されるようになる。
                .aggregate(a -> a.correlationExpression("1")
                        .messageStore(redisMessageStore())
                        .releaseStrategy(g -> g.size() == 5)
                        .outputProcessor(g -> {
                            String msg = g.getMessages().stream()
                                    .map(m -> (String) m.getPayload())
                                    .collect(Collectors.joining(","));
                            if (msg.equals("ずん,ずん,ずん,ずん,どこ")) {
                                msg += ",きよし!";
                            }
                            return msg;
                        })
                        .expireGroupsUponCompletion(true))
                // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する
                .log()
                .nullChannel();
//                .<List<String>>handle((p, h) -> {
//                    if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) {
//                        log.error("きよし!");
//                    }
//                    return null;
//                });
    }

f:id:ksby:20190615105148p:plain

Aggregator 用のクラスを定義して @CorrelationStrategy、@ReleaseStrategy、@Aggregator アノテーションを付与したメソッドを用意する書き方もあります。

    @Bean
    public IntegrationFlow kiyoshiFlow() {
        return f -> f
                // メッセージが5つ溜まったら集約して次の処理に流す
                // * 流れてくる message の header に correlationId がないので
                //   correlationExpression("1") を記述して固定で "1" という扱いにする。
                //   これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。
                // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを
                //   次の処理に流した後に再びメッセージが蓄積されるようになる。
                .aggregate(a -> a.processor(new ZundokoAggregator())
                        .messageStore(redisMessageStore())
                        .expireGroupsUponCompletion(true))
                // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する
                .log()
                .<List<String>>handle((p, h) -> {
                    if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) {
                        log.error("きよし!");
                    }
                    return null;
                });
    }

    static class ZundokoAggregator {

        @CorrelationStrategy
        public String correlationKey(Message<?> message) {
            return "1";
        }

        @ReleaseStrategy
        public boolean canRelease(List<Message<?>> messages) {
            return messages.size() == 5;
        }

        @Aggregator
        public List<String> aggregate(List<String> payloads) {
            // .outputProcessor(...) での加工処理を実装したい場合にはこのメソッド内に記述する
            return payloads;
        }

    }

f:id:ksby:20190615110538p:plain

"ずん"、"ずん"、"ずん"、"ずん"、"どこ" で1セットのメッセージを送信後分解してから再び1セットのメッセージに aggregate するサンプルを作成する

次に以下の動作を行うサンプルを作成します。

  • "ずん", "ずん", "ずん", "ずん", "どこ" の配列が palyload にセットされたメッセージを出力する MessageSource を作成する。
  • MessageSource からは 5秒ごとに取得する。
  • MessageSource からメッセージを取得したら Splitter で分解して、マルチスレッドで処理させるための channel に渡す。
  • マルチスレッド側ではランダムで決められた秒数(最大5秒)待機してから aggregator へ渡す。
  • aggregator では Splitter で分割されていたメッセージを全て集めたら1つのメッセージにして次の処理に流す。また3秒以内に分割していたメッセージが全て集まらなかった時には MESSAGE_GROUP を破棄する。
  • 集約されたメッセージが "ずん", "ずん", "ずん", "ずん", "どこ" だったら "きよし!" とログに出力する。

src/main/java/ksbysample/eipapp/aggregator/RandomZundokoFlowConfig.java を実行しないためにクラスに付与した @Configuration アノテーションコメントアウトした後、

@Slf4j
//@Configuration
public class RandomZundokoFlowConfig {

src/main/java/ksbysample/eipapp/aggregator の下に SplitZundokoFlowConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.DelayerEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

@Slf4j
@Configuration
public class SplitZundokoFlowConfig {

    private static final String[] ZUNDOKO = new String[]{"ずん", "ずん", "ずん", "ずん", "どこ"};

    private static final long POLLER_DELAY_PERIOD = 5000L;
    private static final int DELAY_MAX_VALUE = 5000;
    private static final long MESSAGE_GROUP_TIMEOUT = 3000L;

    // Redis の設定はデフォルト値をそのまま使用するので、application.poperties には何も記載していない
    // https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html#common-application-properties
    // の spring.redis.~ を参照。
    private final RedisConnectionFactory redisConnectionFactory;

    public SplitZundokoFlowConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    /**
     * Redis を MessageStore として使用するための設定。
     * Redis に格納する message は JSON フォーマットにする。
     * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store
     * 参照。
     *
     * @return {@link RedisMessageStore} object
     */
    @Bean
    public RedisMessageStore redisMessageStore() {
        RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
        ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
        RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
        store.setValueSerializer(serializer);
        return store;
    }

    @Bean
    public MessageSource<String[]> zundokoMessageSource() {
        return () -> MessageBuilder.withPayload(ZUNDOKO).build();
    }

    @Bean
    public IntegrationFlow zundokoFlow() {
        return IntegrationFlows.from(zundokoMessageSource()
                , e -> e.poller(Pollers.fixedDelay(POLLER_DELAY_PERIOD)))
                // Message<String[]> --> Message<String> x 5 に分割する
                .split()
                // ここから下はマルチスレッドで処理する
                .channel(c -> c.executor(Executors.newFixedThreadPool(5)))
                // 5秒以内(ランダムで決める)の間 delay する
                // group timeout した時にどのメッセージが原因だったのかが分かるようにするために
                // delay header を追加して値をセットする
                .enrichHeaders(h -> h.headerFunction("delay",
                        m -> String.valueOf(ThreadLocalRandom.current().nextInt(DELAY_MAX_VALUE))))
                .delay("ZUNDOKO_DELAYER",
                        (DelayerEndpointSpec e) -> e.delayExpression("headers.delay"))
                // kiyoshiFlow にメッセージを送信する
                .channel(kiyoshiFlow().getInputChannel())
                .get();
    }

    @Bean
    public IntegrationFlow kiyoshiFlow() {
        return f -> f
                .aggregate(a -> a
                        .messageStore(redisMessageStore())
                        .releaseStrategy(g -> g.getMessages().size() == g.getSequenceSize())
                        .expireGroupsUponCompletion(true)
                        // .groupTimeout(...) で指定した時間内に aggregator で処理されなかったメッセージは
                        // .discardChannel(...) で指定した channel に送信される
                        // ※既に MESSAGE_GROUP に蓄積されていたメッセージも discardChannel に送信される
                        .groupTimeout(MESSAGE_GROUP_TIMEOUT)
                        .discardChannel(discardFlow().getInputChannel()))
                // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する
                .log()
                .<List<String>>handle((p, h) -> {
                    if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) {
                        log.error("きよし!");
                    }
                    return null;
                });
    }

    /**
     * MESSAGE_GROUP_TIMEOUT で指定された時間(3秒)以内に aggregator で処理されなかった MESSAGE GROUP
     * のメッセージが送信されてくる Flow
     *
     * @return {@link IntegrationFlow} object
     */
    @Bean
    public IntegrationFlow discardFlow() {
        return f -> f
                .log(LoggingHandler.Level.WARN)
                .nullChannel();
    }

}

実行して Redis 内に蓄積されている message の状態を見ると、"MESSAGE_GROUP_" の後には correlationId としてセットされている UUID が表示されています。

f:id:ksby:20190615125003p:plain

コンソールを見ると "ずん", "ずん", "ずん", "ずん", "どこ" に戻った場合には "きよし!" のログが出力されており、また3秒以内にメッセージが全て集まらなかった場合には "Expiring MessageGroup with correlaionKey[...]" のログと既に蓄積されていたメッセージが出力されて、その後に後からきたメッセージが出力されていました(header の delay を見ると4秒以上の遅延時間がセットされていることが分かります)。

f:id:ksby:20190615125319p:plain

履歴

2019/06/15
初版発行。

IntelliJ IDEA を 2019.1.2 → 2019.1.3 へバージョンアップ

IntelliJ IDEA を 2019.1.2 → 2019.1.3 へバージョンアップする

IntelliJ IDEA の 2019.1.3 がリリースされているのでバージョンアップします。

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  2. IDE and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20190605084348p:plain

  3. Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20190605084441p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。

  5. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20190605085056p:plain

  6. IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2019.1.3 へバージョンアップされていることを確認します。

  7. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  8. 更新可能な Plugin があるというダイアログが画面右下に表示されたので、再度 IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  9. IDE and Plugin Updates」ダイアログが表示されますので「Update」ボタンをクリックします。

    f:id:ksby:20190605085316p:plain

  10. Plugin がインストールされます。画面右下に Restart のダイアログが表示されますので、リンクをクリックして IntelliJ IDEA を再起動します。

  11. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

  12. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20190605090613p:plain

  13. Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20190605091450p:plain f:id:ksby:20190605092209p:plain

IntelliJ IDEA を 2019.1.1 → 2019.1.2 へバージョンアップ

IntelliJ IDEA を 2019.1.1 → 2019.1.2 へバージョンアップする

IntelliJ IDEA の 2019.1.2 がリリースされているのでバージョンアップします。

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  2. IDE and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20190515205305p:plain

  3. Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20190515205410p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。

  5. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20190515210152p:plain

  6. IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2019.1.2 へバージョンアップされていることを確認します。

  7. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  8. 更新可能な Plugin があるというダイアログが画面右下に表示されたので、再度 IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  9. IDE and Plugin Updates」ダイアログが表示されますので「Update」ボタンをクリックします。

  10. Plugin がインストールされます。画面右下に Restart のダイアログが表示されますので、リンクをクリックして IntelliJ IDEA を再起動します。

  11. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

  12. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20190515211804p:plain

  13. Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20190515232215p:plain f:id:ksby:20190515232935p:plain