Spring Boot + Spring Integration でいろいろ試してみる ( その35 )( Docker Compose でサーバを構築する、Kafka 編2 )
概要
記事一覧はこちらです。
Kafka+zookeeper の環境を Docker Compose で構築して Spring Integration を利用したサンプルを作成するだけのつもりでいたのですが、Kafka や Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) を読んでみたら結構面白いサーバであることが分かったので、しばらくいろいろ試してみることにします。
- docker-compose.yml を confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう書き直します。
- partition を複数作成すれば複数の consumer で分散して受信できるようなので試します。
参照したサイト・書籍
Kafka
- 作者: Neha Narkhede,Gwen Shapira,Todd Palino,上岡真也,笹井崇司
- 出版社/メーカー: オライリージャパン
- 発売日: 2018/08/03
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
Apache Kafka 分散メッセージングシステムの構築と活用
Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE)
- 作者: 株式会社NTTデータ,佐々木徹,岩崎正剛,猿田浩輔,都築正宜,吉田耕陽,下垣徹,土橋昌
- 出版社/メーカー: 翔泳社
- 発売日: 2018/10/30
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
confluentinc/cp-kafka(Docker Hub のページ)
https://hub.docker.com/r/confluentinc/cp-kafkaconfluentinc/cp-zookeeper(Docker Hub のページ)
https://hub.docker.com/r/confluentinc/cp-zookeeperconfluentinc/cp-docker-images
https://github.com/confluentinc/cp-docker-images- examples の下に Kafka を cluster 構成にする docker-compose.yml のサンプルがあります。
Confluent Platform Docker Images
https://docs.confluent.io/current/installation/docker/index.htmlConfluent Platform Quick Start (Docker)
https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html- control-center や ksql 等 confluentinc/cp-docker-images で入れられる機能を試したい場合の手順が記載されています(ただし今回の記事では試していません)。
Generic Spring Kafka Listener
https://stackoverflow.com/questions/42045033/generic-spring-kafka-listener- @KafkaListener による consumer で topicPartitions 属性で partition を指定したい場合の例が書かれています。
目次
- confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう docker-compose.yml を書き直す
- topic に partition を複数作成すればメッセージの受信を分散できる
手順
confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう docker-compose.yml を書き直す
Spring Boot + Spring Integration でいろいろ試してみる ( その34 )( Docker Compose でサーバを構築する、Kafka 編 ) で Kafka の Docker コンテナを作成した時に Kafka は Official Images がないのかな?と思っていたのですが、Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) を読むと Kafka のコミッタの半数以上が Confluent という会社に所属していることとインストールで Confluent が配布している Confluent Platform の OSS 版を利用することが記載されていました。
Docker Hub で "confluent" で検索してみると confluentinc/cp-kafka と confluentinc/cp-zookeeper を見つけたので、この2つを利用するよう docker-compose.yml を書き直すことにします。
cp-docker-images/examples/ の下にサンプルが多数ありますので、今回は kafka-single-node/docker-compose.yml を参考にします(というよりほぼコピペです)。
Conflulent の Docker Image で利用可能なオプションは Docker Configuration に記載があります。
version: '3' services: ############################################################################# # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it cp-zookeeper /bin/bash ############################################################################# # 単体 zookeeper cp-zookeeper: image: confluentinc/cp-zookeeper:5.2.2 container_name: cp-zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ############################################################################# # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it cp-kafka /bin/bash ############################################################################# # 単体 Kafka # topic の作成・一覧取得・詳細表示 # kafka-topics --zookeeper cp-zookeeper:2181 --create --topic Topic1 --partitions 1 --replication-factor 1 --if-not-exists # kafka-topics --zookeeper cp-zookeeper:2181 --alter --topic Topic1 --partitions 3 # kafka-topics --zookeeper cp-zookeeper:2181 --list # kafka-topics --zookeeper cp-zookeeper:2181 --topic Topic1 --describe # kafka-console-producer --broker-list localhost:9092 --topic Topic1 # kafka-console-consumer --bootstrap-server localhost:9092 --topic Topic1 cp-kafka: image: confluentinc/cp-kafka:5.2.2 container_name: cp-kafka ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - cp-zookeeper
docker-compose up -d
コマンドを実行します。
IntelliJ IDEA の docker plugin を見ると kafka と zookeeper のコンテナが起動していることが確認できます。
topic を作成します。
kafka-console-producer
、kafka-console-consumer
コマンドでメッセージの送受信が出来ることを確認します。
wurstmeister/kafka と比較すると Topic を作成するのに KAFKA_CREATE_TOPICS が使用できなくなりますが(コンテナに接続して kafka-topics --create
コマンドを実行する必要があります)、cp-docker-images/examples/cp-all-in-one/docker-compose.yml を見ると control-center(管理画面らしい)や schema-registry(serializer, deserializer に Apache Avro を利用するのに必要らしい)、 ksql-server, ksql-cli(KSQLが試せる?)等もあるようなので、Kafka のいろいろな機能を試すならこちらの方が良さそうな気がします。
topic に partition を複数作成すればメッセージの受信を分散できる
partition が1つだけだと1つの consumer しか受信できませんでしたが、複数の partition を作成すれば複数の consumer で分散して受信できるようです。
Topic1 を partition = 3 で作成して試してみます。
src/test/java/ksbysample/eipapp/kafka/KafkaSendAndReceiveTest.java を以下のように変更してメッセージを 10件連続で送信できるようにする、かつ @KafkaListener による consumer は削除する、ようにします。またメッセージ送信時に正常に送信できていることを確認するよう kafkaTemplate.send(TOPIC_NAME, String.valueOf(i));
→ kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get();
に変更します(こうすると送信時にエラーが出た時には例外が throw されます)。
@Test void sendToKafkaTest() throws ExecutionException, InterruptedException { // kafkaTemplate.send(TOPIC_NAME, "test message"); for (int i = 1; i <= 10; i++) { kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get(); } // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する // SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { // @KafkaListener(topics = TOPIC_NAME) // public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) { // log.warn(cr.toString()); // } }
コマンドプロンプトを2つ起動して、どちらも kafka-console-consumer --bootstrap-server localhost:9092 --topic Topic1 --group topic1-consumer-group1
を実行します。
テストを実行してメッセージを 10件送信すると、2つの consumer で重複せずにメッセージを受信するようになりました。
テストクラスで複数の consumer で分散して受信してみます。KafkaSendAndReceiveTest.java を以下のように変更して consumer を2つ作成してから、
@Test void sendToKafkaTest() throws ExecutionException, InterruptedException { // @KafkaListener の consumer が登録されるまで少し時間がかかるので 15秒 sleep する SleepUtils.sleep(15_000); // kafkaTemplate.send(TOPIC_NAME, "test message"); for (int i = 1; i <= 10; i++) { kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get(); } // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { @KafkaListener(topics = TOPIC_NAME) public void listenByConsumerRecord1(ConsumerRecord<?, ?> cr) { log.warn(String.format("partition = %d, message = %s", cr.partition(), cr.value())); } @KafkaListener(topics = TOPIC_NAME) public void listenByConsumerRecord2(ConsumerRecord<?, ?> cr) { log.error(String.format("partition = %d, message = %s", cr.partition(), cr.value())); } }
テストを実行すると consumer が受信する partition がアサインされて、
それぞれの consumer でメッセージを受信します。この場合、partition = 0, 1 がアサインされたのが listenByConsumerRecord1 で、partition = 2 がアサインされたのが listenByConsumerRecord2 でした。
履歴
2019/07/17
初版発行。
Spring Boot + Spring Integration でいろいろ試してみる ( その34 )( Docker Compose でサーバを構築する、Kafka 編 )
概要
記事一覧はこちらです。
Spring Integration のアプリケーションで使用するサーバを Docker Compose で構築します。
- Kafka+zookeeper の環境を構築します。
- Kafka の Dockerイメージは wurstmeister/kafka を使用します。
- zookeeper の Dockerイメージは zookeeper を使用します。
- 今回は Kafka、zookeeper どちらも単体サーバで構成します。
参照したサイト・書籍
kafka - Documentation
https://kafka.apache.org/documentation/wurstmeister/kafka(Docker Hub のページ)
https://hub.docker.com/r/wurstmeister/kafka/wurstmeister/kafka-docker(GitHub のページ)
https://github.com/wurstmeister/kafka-dockerzookeeper(Docker Hub のページ)
https://hub.docker.com/_/zookeeper31z4/zookeeper-docker(GitHub のページ)
https://github.com/31z4/zookeeper-dockerDeploy a Kafka broker in a Docker container
https://www.kaaproject.org/kafka-dockerApache Kafkaをインストールして、コマンドラインツールを試す
http://pppurple.hatenablog.com/entry/2018/10/10/232733Spring for Apache Kafka
https://docs.spring.io/spring-kafka/reference/html/Intro to Apache Kafka with Spring
https://www.baeldung.com/spring-kafkaApache Kafkaって本当に大丈夫?~故障検証のオーバービューと興味深い挙動の紹介~
https://www.slideshare.net/hadoopxnttdata/hadoop-spark-conference-japan-2019nttdataLINEの大規模データパイプラインを支える、Apache Kafkaプラットフォームの運用の裏側
https://logmi.jp/tech/articles/320330Apache Kafkaの概要とアーキテクチャ
https://qiita.com/sigmalist/items/5a26ab519cbdf1e07af3
目次
- ksbysample-eipapp-kafka プロジェクトを作成する
- docker-compose.yml を作成する
- サーバを起動する
- KafkaTemplate と @KafkaListener でデータを送受信するサンプルを作成する
- group.id が同じ consumer はどれか1つでしかメッセージを受信しない
- group.id が異なれば、group1 の consumer でメッセージを受信した後 group2 の consumer でもメッセージを受信する
手順
ksbysample-eipapp-kafka プロジェクトを作成する
Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。
plugins { id "org.springframework.boot" version "2.1.6.RELEASE" id "java" id "idea" } apply plugin: "io.spring.dependency-management" group = "ksbysample.eipapp" version = "0.0.1-SNAPSHOT" sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } configurations { // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする testAnnotationProcessor.extendsFrom annotationProcessor testImplementation.extendsFrom compileOnly } repositories { mavenCentral() } dependencyManagement { imports { mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES) mavenBom("org.junit:junit-bom:5.5.0") } } dependencies { def lombokVersion = "1.18.8" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE") testImplementation("org.springframework.boot:spring-boot-starter-test") // for lombok // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している annotationProcessor("org.projectlombok:lombok:${lombokVersion}") compileOnly("org.projectlombok:lombok:${lombokVersion}") // for JUnit 5 testCompile("org.junit.jupiter:junit-jupiter") testRuntime("org.junit.platform:junit-platform-launcher") } test { // for JUnit 5 useJUnitPlatform() testLogging { events "STARTED", "PASSED", "FAILED", "SKIPPED" } }
dependencies block に
implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE")
を追加します。spring-integration-kafka は Appendix F. Dependency versions に記載されていませんので、バージョン番号も明記します。spring-integration-kafka を追加すると以下のモジュールが依存関係に追加されます。
\--- org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE +--- org.springframework.integration:spring-integration-core:5.1.6.RELEASE (*) \--- org.springframework.kafka:spring-kafka:2.2.7.RELEASE +--- org.springframework:spring-context:5.1.7.RELEASE -> 5.1.8.RELEASE (*) +--- org.springframework:spring-messaging:5.1.7.RELEASE -> 5.1.8.RELEASE (*) +--- org.springframework:spring-tx:5.1.7.RELEASE -> 5.1.8.RELEASE (*) +--- org.springframework.retry:spring-retry:1.2.4.RELEASE \--- org.apache.kafka:kafka-clients:2.0.1 +--- org.lz4:lz4-java:1.4.1 +--- org.xerial.snappy:snappy-java:1.1.7.1 \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.26
ディレクトリ構成は以下のようにしています。
docker-compose.yml を作成する
今回は zookeeper、kafka どちらも単体のサーバで起動します。
プロジェクトのルートディレクトリ直下に docker-compose.yml を新規作成し、以下の内容を記述します。
version: '3' services: ############################################################################# # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it zookeeper /bin/sh ############################################################################# # 単体 zookeeper zookeeper: image: zookeeper:3.5.5 container_name: zookeeper-1 hostname: zookeeper-1 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 restart: always ############################################################################# # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it kafka /bin/sh ############################################################################# # 単体 Kafka kafka: image: wurstmeister/kafka:2.12-2.2.1 container_name: kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181 # Topic1 will have 1 partition and 1 replicas KAFKA_CREATE_TOPICS: "Topic1:1:1" depends_on: - zookeeper
サーバを起動する
コマンドプロンプトを起動し docker-compose.yml のあるディレクトリへ移動して docker-compose up -d
コマンドを実行して起動します。
IntelliJ IDEA の docker plugin を見ると kafka と zookeeper のコンテナが起動していることが確認できます。
kafka コンテナに接続して kafka-topics.sh で topics の作成状況を確認します(kafka-topics.sh は kafka の Documentation 参照)。
kafka-console-consumer.sh と kafka-console-producer.sh でメッセージの送受信のテストをしてみます。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1
を実行して Topic1 にメッセージが送信されるのを待機します。kafka-console-producer.sh --broker-list localhost:9092 --topic Topic1
を実行後、文字列を入力してメッセージを送信します。
kafka-console-producer.sh を起動した方でメッセージを入力すると kafka-console-consumer.sh を起動した方で入力されたメッセージが表示されました。メッセージの送受信は問題なく出来るようです。kafka-console-producer.sh は Ctrl+D を、kafka-console-consumer.sh は Ctrl+C を入力して終了します。
Topic1 のメッセージをクリアしたいので、docker-compose down
、docker-compose up -d
コマンドを実行してコンテナを起動し直します。
KafkaTemplate と @KafkaListener でデータを送受信するサンプルを作成する
Spring Integration DSL で書く前に KafkaTemplate と @KafkaListener でデータを送受信するサンプルを JUnit 5 のテストで作成します。
src/main/resources/application.properties を以下のように変更します。
spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=topic1-consumer-group1
src/test/java/ksbysample/eipapp/kafka を作成した後、その下に KafkaSendAndReceiveTest.java を新規作成して以下の内容を記述します。
package ksbysample.eipapp.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; @Slf4j @SpringBootTest public class KafkaSendAndReceiveTest { private static final String TOPIC_NAME = "Topic1"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Test void sendToKafkaTest() { kafkaTemplate.send(TOPIC_NAME, "test message"); // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { @KafkaListener(topics = TOPIC_NAME) public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) { log.warn(cr.toString()); } } static class SleepUtils { public static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
テストを実行すると送信したメッセージがを @KafkaListener アノテーションを付与した listenByConsumerRecord メソッドで受信してログが出力されます。
log.warn(cr.toString());
では ConsumerRecord(topic = Topic1, partition = 0, offset = 16, CreateTime = 1562469618138, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test message)
というメッセージが出力されていました。
group.id が同じ consumer はどれか1つでしかメッセージを受信しない
kafka-console-consumer.sh を使って動作確認します。
まずコマンドプロンプトを2つ起動してから docker exec -it kafka /bin/sh
コマンドで kafka のコンテナに接続した後、kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1 --group topic1-consumer-group1
コマンドを実行して同じ group.id でメッセージ送信を待ちます。
KafkaSendAndReceiveTest.java から SleepUtils.sleep(5_000);
と @KafkaListener をコメントアウトして、メッセージを受信せず、かつ送信後すぐにテストが終了するようにします。
@Test void sendToKafkaTest() { kafkaTemplate.send(TOPIC_NAME, "test message"); // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する // SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { // @KafkaListener(topics = TOPIC_NAME) // public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) { // log.warn(cr.toString()); // SleepUtils.sleep(1_000); // } }
テストを実行してメッセージを送信すると、2つ起動している consumer の内1つだけ受信します。
追加で2回テストを実行してみるとメッセージを受信したのは最初に受信した consumer でした。
テストクラスで 1000件連続してメッセージを送信するように変更してみましたが、
@Test void sendToKafkaTest() { // kafkaTemplate.send(TOPIC_NAME, "test message"); for (int i = 1; i <= 1000; i++) { kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)); kafkaTemplate.flush(); } // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する // SleepUtils.sleep(5_000); }
やっぱり一方の consumer だけしか受信しません。
受信していた consumer を落とすと、今度はもう一方の接続していた consumer でメッセージを受信しました。
ここまでの動作をまとめると、
- 同じ group.id の consumer を複数接続してもメッセージを受信するのはその内の1つ。
- 大量のメッセージを送信してもメッセージを受信するのは決められた1つだけである。複数接続している consumer で分散して受信する訳ではない。
- 分散する設定があるのかは不明。
group.id が異なれば、group1 の consumer でメッセージを受信した後 group2 の consumer でもメッセージを受信する
docker-compose down
、docker-compose up -d
コマンドを実行してコンテナを起動し直した後、今度は group.id が異なる kafka-console-consumer.sh を実行します。
テストを実行してメッセージを送信すると、各 consumer でメッセージを受信しました。
もう1点。コンテナを起動し直して Topic1 のメッセージをクリアしてから group.id が topic1-consumer-group1 の kafka-console-consumer.sh だけ起動しておきます。
テストを実行してメッセージを送信すると起動しておいた consumer でメッセージを受信しますが、
後から別の group.id(topic1-consumer-group2)の kafka-console-consumer.sh を起動すると、この時は topic1-consumer-group2 の consumer ではメッセージを受信しません。
ただし1度 group.id = topic1-consumer-group2 の接続情報が登録されると、一旦 topic1-consumer-group2 の kafka-console-consumer.sh を終了してから、
テストを実行してメッセージを送信して topic1-consumer-group1 の kafka-console-consumer.sh でメッセージを受信した後、
topic1-consumer-group2 の kafka-console-consumer.sh を起動すると、起動前に送信されたメッセージが受信されます。
KafkaSendAndReceiveTest.java 内で異なる group.id の @KafkaListener を用意してメッセージを受信するには以下のように変更します。
@Test void sendToKafkaTest() { kafkaTemplate.send(TOPIC_NAME, "test message"); // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { @KafkaListener(topics = TOPIC_NAME, groupId = "topic1-consumer-group1") public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) { log.warn(cr.toString()); } // group.id は @KafkaListener アノテーションの groupId 属性で指定可能 // メソッドの引数にはメッセージのデータの型(今回は String)の引数を指定することも可能 @KafkaListener(topics = TOPIC_NAME, groupId = "topic1-consumer-group2") public void listenByString(String msg) { log.error(msg); } }
テストを実行すると @KafkaListener アノテーションを付与したメソッドそれぞれでメッセージを受信していることが確認できます。
Spring Integration DSL でのサンプルや Kafka、zookeeper を複数サーバ起動する場合のサンプルを作成したいので、あと数回続きます。
履歴
2019/07/07
初版発行。
Spring Boot + Spring Integration でいろいろ試してみる ( その33 )( 5.1 からの新機能 Java Functions Improvements を試してみる )
概要
記事一覧はこちらです。
- Spring Integration DSL でも RSocket が使えるようになるのか。。。と spring-integration/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java を見ていたのですが、その中に
return IntegrationFlows.from(Function.class)...
という見慣れない実装を見つけました。 - 調べたところ https://spring.io/blog/2018/10/29/spring-integration-5-1-goes-ga#java-functions-improvements に「Java Functions Improvements」という記述がありました。Spring Integration 5.1 からの新機能でした。興味が湧いたので試してみます。
参照したサイト・書籍
Spring Integration 5.1 goes GA! - Java Functions Improvements
https://spring.io/blog/2018/10/29/spring-integration-5-1-goes-ga#java-functions-improvementsspring-integration/src/reference/asciidoc/configuration.adoc
https://github.com/spring-projects/spring-integration/blob/master/src/reference/asciidoc/configuration.adoc
目次
- ksbysample-eipapp-functions プロジェクトを作成する
- Supplier&Function&Consumer の簡単なサンプルを作成する
- Supplier は Supplier<Message<?>> と書くことで Message オブジェクトを返すことが可能
- Function#apply で IntegrationFlow Bean の処理を呼び出す
手順
ksbysample-eipapp-functions プロジェクトを作成する
Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。
plugins { id "org.springframework.boot" version "2.1.6.RELEASE" id "java" id "idea" } apply plugin: "io.spring.dependency-management" group = "ksbysample.eipapp" version = "0.0.1-SNAPSHOT" sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } configurations { // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする testAnnotationProcessor.extendsFrom annotationProcessor testImplementation.extendsFrom compileOnly } repositories { mavenCentral() } dependencies { def lombokVersion = "1.18.6" implementation("org.springframework.boot:spring-boot-starter-integration") testImplementation("org.springframework.boot:spring-boot-starter-test") // for lombok // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している annotationProcessor("org.projectlombok:lombok:${lombokVersion}") compileOnly("org.projectlombok:lombok:${lombokVersion}") }
ディレクトリ構成は以下のようにしています。
Supplier&Function&Consumer の簡単なサンプルを作成する
package ksbysample.eipapp.functions; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @Slf4j @Configuration public class SupplierAndFunctionAndConsumerFlow { private AtomicInteger count = new AtomicInteger(0); // Supplier<?> は ? 型のオブジェクトが payload にセットされた Message を返す // MessageSource として利用できる @Bean public Supplier<Integer> countSupplier() { return () -> count.addAndGet(1); } @Bean public IntegrationFlow countDisplayFlow() { return IntegrationFlows .from(countSupplier() , e -> e.poller(Pollers.fixedDelay(1000))) .transform(DoubleAndToStrFunc()) .handle(addDotFunc()) .handle(printFunc()) .get(); } // Function<?, ?> は .transform(...) で利用したり、 public Function<Integer, String> DoubleAndToStrFunc() { return v -> String.valueOf(v * 2); } // .handle(...) で利用できる public Function<String, String> addDotFunc() { return s -> s + "..."; } // Consumer<?> は .handle(...) で利用できる public Consumer<String> printFunc() { return s -> log.warn(s); } }
上のサンプルを実行すると "2..."、"4..."、"6..." と順に出力されます。
同じ処理を Supplier は MessageSource で置き換えて、Function と Consumer は .transform(...)、.handle(...) に直接記述すると以下のようになります。
package ksbysample.eipapp.functions; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.messaging.support.MessageBuilder; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @Slf4j @Configuration public class MessageSourceAndLambdaFlow { private AtomicInteger count = new AtomicInteger(0); @Bean public MessageSource<Integer> countMessageSource() { return () -> MessageBuilder .withPayload(count.addAndGet(1)) .build(); } @Bean public IntegrationFlow countDisplayFlow() { return IntegrationFlows .from(countMessageSource() , e -> e.poller(Pollers.fixedDelay(1000))) .<Integer, String>transform(v -> String.valueOf(v * 2)) .handle((Function<String, String>) s -> s + "...") .handle((Consumer<String>) s -> log.warn(s)) .get(); } }
lambda 式を使っているのに (Function<String, String>)
や (Consumer<String>)
とキャストしないと build 時にエラーが出るのがなんか今ひとつな気がします。。。
Supplier は Supplier<Message<?>> と書くことで Message オブジェクトを返すことが可能
package ksbysample.eipapp.functions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.NullChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.handler.LoggingHandler; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @Configuration public class SupplierAndFunction2Flow { @Autowired private NullChannel nullChannel; private AtomicInteger count = new AtomicInteger(0); // Supplier<Message<?>> という記述にして Message オブジェクトを返すようにすることも出来る // この場合 MessageSource の時のように header を追加することが可能。 @Bean public Supplier<Message<Integer>> countSupplier() { return () -> MessageBuilder .withPayload(count.addAndGet(1)) .setHeader("random", ThreadLocalRandom.current().nextInt(100)) .build(); } @Bean public IntegrationFlow countDisplayFlow() { return IntegrationFlows .from(countSupplier() , e -> e.poller(Pollers.fixedDelay(1000))) .log(LoggingHandler.Level.WARN) .channel(nullChannel) .get(); } }
実行すると headers の中に random というデータが追加されています。
Function や Consumer でも Function<Message<Integer>, Message<String>>
や Consumer<Message<String>>
のように試してみたのですが、こちらは実行時にエラーになりました。この書き方が出来るのは Supplier だけのようです。
Function#apply で IntegrationFlow Bean の処理を呼び出す
以前 @MessagingGateway アノテーションを付加した interface を用意して IntegrationFlow の処理を呼び出す方法を記載しましたが、Function<?, ?> 型のフィールドを用意して Function#apply を呼び出すと IntegrationFlow の処理を呼び出すことができます。
package ksbysample.eipapp.functions; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.handler.GenericHandler; import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; import org.springframework.messaging.MessageHandlingException; import org.springframework.retry.RetryContext; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetrySynchronizationManager; import org.springframework.retry.support.RetryTemplate; import org.springframework.stereotype.Component; import java.util.function.Function; import static java.util.Collections.singletonMap; @Slf4j @Component public class FunctionSample implements CommandLineRunner { // IntegrationFlow で定義した処理を呼び出すには、 // * IntegrationFlows.from(Function.class)... と定義する // * Function<?, ?> 型の変数をフィールドに定義し // @Autowired, @Qualifier("<IntegrationFlow名>.gateway"), @Lazy アノテーションを付与する @Autowired @Qualifier("throwExceptionWithRetryFlow.gateway") @Lazy private Function<String, String> throwExceptionWithRetryFlowFunc; @Override public void run(String... args) throws Exception { System.out.println(throwExceptionWithRetryFlowFunc.apply("success???")); } @Configuration static class FlowConfig { /** * リトライ回数は最大5回、リトライ時は2秒待機する RetryTemplate を生成する * * @return {@link RetryTemplate} object */ @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // SimpleRetryPolicy retryTemplate.setRetryPolicy( new SimpleRetryPolicy(5, singletonMap(Exception.class, true))); // FixedBackOffPolicy FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(2000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice(); retryAdvice.setRetryTemplate(retryTemplate()); // リトライしても処理が成功しなかった場合には // MessageHandlingException#getCause#getMessage で取得したメッセージを payload // にセットした Message オブジェクトが次の処理に渡されるようにする retryAdvice.setRecoveryCallback(context -> { MessageHandlingException e = (MessageHandlingException) context.getLastThrowable(); String errMsg = e.getCause().getMessage(); log.error(errMsg); return errMsg; }); return retryAdvice; } @Bean public IntegrationFlow throwExceptionWithRetryFlow() { return IntegrationFlows .from(Function.class) .handle((GenericHandler<Object>) (p, h) -> { RetryContext retryContext = RetrySynchronizationManager.getContext(); log.warn("★★★ リトライ回数 = " + retryContext.getRetryCount()); // リトライ処理をさせたいので強制的に RuntimeException を throw する if (true) { throw new RuntimeException("error!!"); } return p; }, e -> e.advice(retryAdvice())) .get(); } } }
実行すると5回リトライして、最後に RuntimeException に渡した "error!!" のメッセージが表示されました。
started throwExceptionWithRetryFlow.gateway
と stopped throwExceptionWithRetryFlow.gateway
のメッセージが何度も出力されるのが気になります。
複数出力される理由を調べてみると、まず org.springframework.integration.gateway.GatewayProxyFactoryBean#onInit で7個の method 分 gatewayMap に gateway(throwExceptionWithRetryFlow.gateway)が put されて、
org.springframework.integration.gateway.GatewayProxyFactoryBean の doStart、doStop メソッドで gateway.start();
、gateway.stop();
が呼び出されるからでした。
1つ呼び出す仕組みをつくるだけで gateway が7個 start するとは。。。 あまり多用する仕組みではないのかもしれません。
履歴
2019/07/05
新規作成。
Spring Boot + Spring Integration でいろいろ試してみる ( その32 )( Aggregator のサンプルを作ってみる2 )
概要
記事一覧はこちらです。
- 引き続き Spring Integration DSL で 8.4. Aggregator を使用したサンプルを作成します。
- 今回は Aggregator で動作がよく分かっていない点をテストクラスを作成して確認してみます。
参照したサイト・書籍
目次
- Aggretator の動作をまとめてみる(よく分かっていない点も含めて)
- Spring Boot を 2.1.5 → 2.1.6 にバージョンアップし、JUnit 5 のモジュールを依存関係に追加する
- correlationId は同じだが sequenceSize を 3,4,5 と増やしたメッセージを A1→A2→A3 の順に送信した場合(sequenceSize = 3 が正しい)、Aggregator は A3 を受信した時に release するのか?
- complete フラグが true の MESSAGE_STORE が残っている時に同じ correlationId を持つメッセージが送信されてきたらどうなるのか?
- aggregator が複数ある場合(a1,a2を用意)に同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信したら a1 で release されるのか?
手順
Aggretator の動作をまとめてみる(よく分かっていない点も含めて)
- メッセージの header にある correlationId を元に MESSAGE_GROUP を作成する。CorrelationStrategy を実装して correlationId 以外のデータをキーとして蓄積させることも可能。
- MESSAGE_GROUP にメッセージの header にある sequenceSize の数だけメッセージが蓄積されたら、蓄積されているメッセージの payload のデータを List にまとめて、そのデータを payload にセットしたメッセージを作成して次に送信する(release する)。sequenceSize は一番最初のメッセージのものを参照するのか、最新のメッセージのものを見るのかは不明。
- ReleaseStrategy を実装すれば release する条件を sequenceSize 以外に設定することが可能。
- ReleaseStrategy はメッセージを受信する毎に実行される。
- release された MESSAGE_GROUP の complete フラグは false → true に変わる。
.expireGroupsUponCompletion(true)
が指定されていると complete フラグが true になった MESSAGE_GROUP は即時削除される。.expireGroupsUponCompletion(false)
の場合(記述していない場合は false になる)、SimpleMessageStore ならば削除されるがそれ以外(RedisMessageStore を含む)は残る。- org.springframework.integration.aggregator.AggregatingMessageHandler#afterRelease には以下のように実装されており、
- SimpleMessageStore や RedisMessageStore のダイアログを作成してみると以下のようになる。
- complete フラグが true になって残ったままになっている MESSAGE_STORE と同じ correlationId を持つメッセージが送信されてきた場合、Aggregator は何もしない。メッセージをそのまま次には流さないが、破棄するのか discardChannel に送信するのかは不明。
- MESSAGE_GROUP に蓄積されたメッセージは無期限で待つ。
.groupTimeout(...)
を記述すればタイムアウトする時間を設定できる。タイムアウトすると蓄積されていたメッセージ、及び後から遅れてきたメッセージが全て discardChannel に送信される(.discardChannel(...) で設定する
)。discardChannel を指定していなければ破棄される。
不明な点をサンプルを作成して確認してみます。
Spring Boot を 2.1.5 → 2.1.6 にバージョンアップし、JUnit 5 のモジュールを依存関係に追加する
今回はテストクラスで動作確認をしたいので JUnit 5 を使用可能にします。また Spring Boot の 2.1.6 がリリースされていたのでバージョンアップします。
build.gradle を以下のように変更します。
plugins { id 'org.springframework.boot' version '2.1.6.RELEASE' id 'java' } apply plugin: 'io.spring.dependency-management' group = 'ksbysample.eipapp' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする testAnnotationProcessor.extendsFrom annotationProcessor testImplementation.extendsFrom compileOnly } repositories { mavenCentral() } dependencyManagement { imports { mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES) mavenBom("org.junit:junit-bom:5.4.2") } } dependencies { def lombokVersion = "1.18.6" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.boot:spring-boot-starter-data-redis") implementation("org.springframework.integration:spring-integration-redis") implementation("org.springframework.boot:spring-boot-starter-json") { exclude group: "org.springframework", module: "spring-web" } testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("org.assertj:assertj-core:3.12.2") // for lombok // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している annotationProcessor("org.projectlombok:lombok:${lombokVersion}") compileOnly("org.projectlombok:lombok:${lombokVersion}") // for JUnit 5 testCompile("org.junit.jupiter:junit-jupiter") testRuntime("org.junit.platform:junit-platform-launcher") } test { // for JUnit 5 useJUnitPlatform() testLogging { events "STARTED", "PASSED", "FAILED", "SKIPPED" } }
- plugins block 内で
id 'org.springframework.boot' version '2.1.5.RELEASE'
→id 'org.springframework.boot' version '2.1.6.RELEASE'
に変更します。 dependencyManagement { ... }
を追加します。- dependencies block に以下の記述を追加します。
testImplementation("org.assertj:assertj-core:3.12.2")
testCompile("org.junit.jupiter:junit-jupiter")
testRuntime("org.junit.platform:junit-platform-launcher")
test { ... }
を追加します。
correlationId は同じだが sequenceSize を 3,4,5 と増やしたメッセージを A1→A2→A3 の順に送信した場合(sequenceSize = 3 が正しい)、Aggregator は A3 を受信した時に release するのか?
src/main/java/ksbysample/eipapp/aggregator/SplitZundokoFlowConfig.java のクラスに付与した @Configuration アノテーションをコメントアウトした後、
@Slf4j //@Configuration public class SplitZundokoFlowConfig {
src/test/java/ksbysample/eipapp/aggregator を新規作成してから、その下に IncrementSequenceSizeFlowTest.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.aggregator; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.context.IntegrationObjectSupport; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.redis.store.RedisMessageStore; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import java.util.Arrays; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @Slf4j @SpringBootTest public class IncrementSequenceSizeFlowTest { @Autowired private MessageBuilderFactory messageBuilderFactory; @Autowired private IntegrationFlow aggregateFlow; @Autowired private QueueChannel outputChannel; @Test void sequenceSizeをインクリメントするメッセージを3件送信した時に最後のメッセージを受信したaggregagorがreleaseするかのテスト() { // メッセージで使用する correlationId を発番する UUID correlationId = IntegrationObjectSupport.generateId(); // A1, A2, A3 のメッセージを送信する // sequenceSize を 3, 4, 5 と1つずつ増やす(3が正しい値) Arrays.asList(1, 2, 3).forEach(n -> { String payload = String.format("A%d", n); Message<String> msg = messageBuilderFactory .fromMessage(MessageBuilder.withPayload(payload).build()) .pushSequenceDetails(correlationId, n, n + 2) .build(); aggregateFlow.getInputChannel().send(msg); }); // aggregator が release しているか確認する Message<?> result = outputChannel.receive(5_000); assertThat(result).isNotNull(); assertThat(result.getPayload()).isEqualTo(Arrays.asList("A1", "A2", "A3")); assertThat(result.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE)) .isEqualTo(3); } @TestConfiguration static class FlowConfig { @Autowired private RedisConnectionFactory redisConnectionFactory; /** * Redis を MessageStore として使用するための設定。 * Redis に格納する message は JSON フォーマットにする。 * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store * 参照。 * * @return {@link RedisMessageStore} object */ @Bean public RedisMessageStore redisMessageStore() { RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper); store.setValueSerializer(serializer); return store; } @Bean public IntegrationFlow aggregateFlow() { return f -> f .log(LoggingHandler.Level.WARN) .aggregate(a -> a.messageStore(redisMessageStore())) .log(LoggingHandler.Level.ERROR) .channel(outputChannel()); } @Bean public MessageChannel outputChannel() { return MessageChannels.queue().get(); } } }
テストを実行してみると aggregator は release していますが、release された後のメッセージの sequenceSize は 3 ではなく 5 になるという結果でした。
debug 実行して2件メッセージを送信したところで止めて Redis のデータを見てみると、
MESSAGE_GROUP のデータには sequenceSize は含まれていませんでした。sequenceSize のチェックには一番最初に受信したメッセージのデータを使用し、release 時には一番最後に受信した(release の判定をした)メッセージの sequenceSize を集約したメッセージの header に入れる、という仕様のようです。
complete フラグが true の MESSAGE_STORE が残っている時に同じ correlationId を持つメッセージが送信されてきたらどうなるのか?
以下の内容のテストを作成し、同じ correlationId で sequenceSize = 1 のメッセージを2件送信して動作確認してみました。
@Test void sampleTest() { // メッセージで使用する correlationId を発番する UUID correlationId = IntegrationObjectSupport.generateId(); // A1 メッセージを送信する // sequenceSize = 1 なので、このメッセージを受信すると aggregagtor は release する // MESSAGE_STORE も complete = true になる Message<String> msg = messageBuilderFactory .fromMessage(MessageBuilder.withPayload("A1").build()) .pushSequenceDetails(correlationId, 1, 1) .build(); aggregateFlow.getInputChannel().send(msg); // A1 と correlationId が同じ A2 メッセージを送信する Message<String> msg2 = messageBuilderFactory .fromMessage(MessageBuilder.withPayload("A2").build()) .pushSequenceDetails(correlationId, 2, 1) .build(); aggregateFlow.getInputChannel().send(msg2); }
aggregator の処理は org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler#handleMessageInternal を通るのですが、
- 1件目のメッセージを受信した時には①→②→③→④を通り MESSAGE_GROUP の complete フラグが true になります。
.expireGroupsUponCompletion(true)
を指定していないので、complete フラグが true になった MESSAGE_GROUP は削除されずに残ります。 - 2件目のメッセージを受信した時には①の処理で MESSAGE_GROUP を取得しますが
if (!messageGroup.isComplete() && messageGroup.canAdd(message)) {
が false となるため(messageGroup.isComplete() が true)、②以降の処理が実行されませんでした。
結論として、complete フラグが true の MESSAGE_GROUP が残っている時に同じ correlationId を持つメッセージが送信されてくると何もされません(MessageStore にメッセージが蓄積されず releaseStrategy も実行されません)。
aggregator が複数ある場合(a1,a2を用意)に同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信したら a1 で release されるのか?
1つ上の記述で見た org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler#handleMessageInternal の処理を見ると①~④の処理開始前に lockRegistry から取得した Lock オブジェクトでロックしています。
Spring Integration - Lock Registry を見ると For synchronizing updates across servers where a shared MessageGroupStore is being used, you must configure a shared lock registry.
という記述がありました。
LockRegistry インターフェースの実装クラスを確認すると RedisLockRegistry クラスがあります。
これらの内容から同じ LockRegistry を使用すれば Aggregator が複数存在しても適切にロックしながら処理をするので、同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信しても a1 で release されるものと考えられます。
src/test/java/ksbysample/eipapp/aggregator/MultiAggregatorTest.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.aggregator; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.context.IntegrationObjectSupport; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.redis.store.RedisMessageStore; import org.springframework.integration.redis.util.RedisLockRegistry; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.integration.support.locks.LockRegistry; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.Executors; import static org.assertj.core.api.Assertions.assertThat; @Slf4j @SpringBootTest public class MultiAggregatorTest { @Autowired private MessageBuilderFactory messageBuilderFactory; @Autowired private IntegrationFlow a1Flow; @Autowired private IntegrationFlow a2Flow; @Autowired private QueueChannel outputChannel; @Test void sampleTest() { final int SEQUENCE_SIZE = 3; // メッセージで使用する correlationId を発番する UUID correlationId = IntegrationObjectSupport.generateId(); int sequenceNumber = 1; Message<String> msg = messageBuilderFactory .fromMessage(MessageBuilder.withPayload("A1").build()) .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE) .build(); a1Flow.getInputChannel().send(msg); Message<String> msg2 = messageBuilderFactory .fromMessage(MessageBuilder.withPayload("A2").build()) .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE) .build(); a2Flow.getInputChannel().send(msg2); SleepUtils.sleep(1_000); Message<String> msg3 = messageBuilderFactory .fromMessage(MessageBuilder.withPayload("A3").build()) .pushSequenceDetails(correlationId, sequenceNumber++, SEQUENCE_SIZE) .build(); a1Flow.getInputChannel().send(msg3); Message<?> result = outputChannel.receive(15_000); assertThat(result).isNotNull(); assertThat(result.getPayload()).isEqualTo(Arrays.asList("A1", "A2", "A3")); assertThat(result.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE)) .isEqualTo(3); } @TestConfiguration static class FlowConfig { @Autowired private RedisConnectionFactory redisConnectionFactory; /** * Redis を MessageStore として使用するための設定。 * Redis に格納する message は JSON フォーマットにする。 * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store * 参照。 * * @return {@link RedisMessageStore} object */ @Bean public RedisMessageStore redisMessageStore() { RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper); store.setValueSerializer(serializer); return store; } @Bean public IntegrationFlow a1Flow() { return f -> f .log(LoggingHandler.Level.WARN) // 以降の処理を別スレッドにすることで、send メソッド呼び出し時に待機しないようにする .channel(c -> c.executor(Executors.newFixedThreadPool(1))) .aggregate(a -> a.messageStore(redisMessageStore()) // a1Flow, a2Flow で同じ LockRegistry を使用するよう設定する .lockRegistry(aggretatorLockRegistry()) .releaseStrategy(g -> { log.error("PASS_A1"); return g.getMessages().size() == g.getSequenceSize(); })) .log(LoggingHandler.Level.ERROR) .channel(outputChannel()); } @Bean public IntegrationFlow a2Flow() { return f -> f .log() .channel(c -> c.executor(Executors.newFixedThreadPool(1))) .aggregate(a -> a.messageStore(redisMessageStore()) .lockRegistry(aggretatorLockRegistry()) .releaseStrategy(g -> { SleepUtils.sleep(10_000); log.error("PASS_A2"); return g.getMessages().size() == g.getSequenceSize(); })) .log(); } @Bean public LockRegistry aggretatorLockRegistry() { return new RedisLockRegistry(redisConnectionFactory, "AGGREGATOR_LOCK"); } @Bean public MessageChannel outputChannel() { return MessageChannels.queue().get(); } } static class SleepUtils { public static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
テストを実行すると、
- send メッセージで処理が待機しないように実装しているので(
..aggregate(...)
の処理を別スレッドで実行するように実装しているので)、3件のメッセージ送信は1つ前のメッセージ送信の aggregator の処理を待たずに連続で送信されている。 - 2件目のメッセージの releaseStrategy が処理されるのは送信してから 10秒後。
- 3件目のメッセージの releaseStrategy は2件目のメッセージの releaseStrategy が終わった後に実行される。
- 同じ correlationId のメッセージを a1へ送信→a2へ送信(10秒待機)→a1へ送信すると a1 で release された。
という結果になりました。まあ、実際に複数の aggregator が必要になるようなデータ量を処理することになったらこんな実装をするのかは分かりませんが。。。
履歴
2019/06/23
初版発行。
Spring Boot + Spring Integration でいろいろ試してみる ( その31 )( Aggregator のサンプルを作ってみる )
概要
記事一覧はこちらです。
- Spring Integration DSL で 8.4. Aggregator を使用したサンプルを作成します。
- Aggregator でメッセージがどのように集約されるのかを見られるようにするために、単体の Redis サーバを Docker Compose で構築して MessageStore として利用します。Redis クライアントには JSON のデータを見たいので Medis を使用します。
- サンプルは以前流行ったズンドコキヨシをベースに Aggregator の動きが分かるように少しルールを変えたものを作成してみます。
参照したサイト・書籍
Spring Integration - 8.4. Aggregator
https://docs.spring.io/spring-integration/docs/current/reference/html/#aggregatorluin/medis
https://github.com/luin/medis
目次
- ksbysample-eipapp-aggregator プロジェクトを作成する
- Docker Compose で単一の Redis サーバを構築する
- Redis の GUI クライアントとして Medis をインストールする
- "ずん" あるいは "どこ" のメッセージを送信して5つずつ aggregate するサンプルを作成する
- "ずん"、"ずん"、"ずん"、"ずん"、"どこ" で1セットのメッセージを送信後にバラバラに分解してから再び1セットのメッセージに aggregate するサンプルを作成する
手順
ksbysample-eipapp-aggregator プロジェクトを作成する
Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。
plugins { id 'org.springframework.boot' version '2.1.5.RELEASE' id 'java' } apply plugin: 'io.spring.dependency-management' group = 'ksbysample.eipapp' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' repositories { mavenCentral() } configurations { // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする testAnnotationProcessor.extendsFrom annotationProcessor testImplementation.extendsFrom compileOnly } dependencies { def lombokVersion = "1.18.6" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.boot:spring-boot-starter-data-redis") implementation("org.springframework.integration:spring-integration-redis") implementation("org.springframework.boot:spring-boot-starter-json") { exclude group: "org.springframework", module: "spring-web" } testImplementation("org.springframework.boot:spring-boot-starter-test") // for lombok // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している annotationProcessor("org.projectlombok:lombok:${lombokVersion}") compileOnly("org.projectlombok:lombok:${lombokVersion}") }
- Redis を MessageStore として利用するので以下の行を追加します。
implementation("org.springframework.boot:spring-boot-starter-data-redis")
implementation("org.springframework.integration:spring-integration-redis")
- Redis に JSON フォーマットでメッセージを格納したいので以下の行を追加します。spring-boot-starter-json を追加すると
org.springframework:spring-web
も依存関係に追加されてしまうのですが、今回は不要なので exclude で除外します。implementation("org.springframework.boot:spring-boot-starter-json")
- ログを出力する時に
@Slf4j
アノテーションを使いたいので以下の行を追加して lombok を使えるようにします。annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
compileOnly("org.projectlombok:lombok:${lombokVersion}")
ディレクトリ構成は以下のようにしています。
Docker Compose で単一の Redis サーバを構築する
プロジェクトのルートディレクトリ直下に docker-compose.yml を新規作成し、以下の内容を記述します。
version: '3' services: # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it redis /bin/sh # # 起動したコンテナの redis に redis-cli でアクセスするには以下のコマンドを実行する # docker exec -it redis redis-cli # ############################################################################# # 単体 Redis サーバ redis: image: redis:5.0.5 container_name: redis ports: - "6379:6379"
docker-compose up -d
コマンドを実行して Redis を起動します。
Redis の GUI クライアントとして Medis をインストールする
https://github.com/luin/medis から Medis on Windows のインストーラをダウンロードしてインストールします。
"ずん" あるいは "どこ" のメッセージを送信して5つ ずつ aggregate するサンプルを作成する
以下の動作を行うサンプルを作成します。
- "ずん"、"どこ" をランダムに出力する MessageSource を作成する。”ずん" を 4/5、"どこ" を 1/5 の確立で返す。
- MessageSource からは 15秒ごとに取得する(aggregator の動作を確認するために時間を遅めにする)。
- aggregator は Redis を MessageStore として使用し、メッセージが5つ溜まったら集約して1つのメッセージにして次の処理に流す。タイムアウトは設定しない(5つ溜まるまでずっと待つ)。
- 最後に集約されたメッセージが "ずん", "ずん", "ずん", "ずん", "どこ" だったら "きよし!" とログに出力する。
src/main/java/ksbysample/eipapp/aggregator の下に RandomZundokoFlowConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.aggregator; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.redis.store.RedisMessageStore; import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.messaging.support.MessageBuilder; import java.util.Arrays; import java.util.List; import java.util.Random; @Slf4j @Configuration public class RandomZundokoFlowConfig { private static final Random r = new Random(); private static final String[] ZUNDOKO = new String[]{"ずん", "ずん", "ずん", "ずん", "どこ"}; private static final long POLLER_DELAY_PERIOD = 15000L; // Redis の設定はデフォルト値をそのまま使用するので、application.poperties には何も記載していない // https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html#common-application-properties // の spring.redis.~ を参照。 private final RedisConnectionFactory redisConnectionFactory; public RandomZundokoFlowConfig(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } /** * Redis を MessageStore として使用するための設定。 * Redis に格納する message は JSON フォーマットにする。 * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store * 参照。 * * @return {@link RedisMessageStore} object */ @Bean public RedisMessageStore redisMessageStore() { RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper); store.setValueSerializer(serializer); return store; } @Bean public MessageSource<String> zundokoMessageSource() { return () -> MessageBuilder.withPayload(ZUNDOKO[r.nextInt(5)]).build(); } @Bean public IntegrationFlow zundokoFlow() { return IntegrationFlows.from(zundokoMessageSource() , e -> e.poller(Pollers.fixedDelay(POLLER_DELAY_PERIOD))) // kiyoshiFlow にメッセージを送信する .channel(kiyoshiFlow().getInputChannel()) .get(); } @Bean public IntegrationFlow kiyoshiFlow() { return f -> f // メッセージが5つ溜まったら集約して次の処理に流す // * 流れてくる message の header に correlationId がないので // correlationExpression("1") を記述して固定で "1" という扱いにする。 // これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。 // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを // 次の処理に流した後に再びメッセージが蓄積されるようになる。 .aggregate(a -> a.correlationExpression("1") .messageStore(redisMessageStore()) .releaseStrategy(g -> g.size() == 5) .expireGroupsUponCompletion(true)) // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する .log() .<List<String>>handle((p, h) -> { if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) { log.error("きよし!"); } return null; }); } }
実行して Redis 内に蓄積されている message の状態を見ると、"MESSAGE_GROUP_" + <correlationIdとして使用する文字列(今回は "1" 固定)> のレコードと個々の message のレコードがあることが確認できます。
message が5つ蓄積されて aggregator により1つの message に集約されて次の処理に渡されると、蓄積されていた MESSAGE_GROUP と MESSAGE は削除されます。
コンソールには以下のように出力されます。payload=[ずん, ずん, ずん, ずん, どこ] になっていると "きよし!" の文字列が出力されています。またスレッド名を見るとバラバラでした。1つのスレッドで処理するわけではないようです。
.outputProcessor(...)
を使うと次の処理に渡す時の palyload にセットする値を変更することができます。デフォルトでは List でまとめられますが、以下のように実装するとカンマで結合した String オブジェクトになります。
@Bean public IntegrationFlow kiyoshiFlow() { return f -> f // メッセージが5つ溜まったら集約して次の処理に流す // * 流れてくる message の header に correlationId がないので // correlationExpression("1") を記述して固定で "1" という扱いにする。 // これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。 // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを // 次の処理に流した後に再びメッセージが蓄積されるようになる。 .aggregate(a -> a.correlationExpression("1") .messageStore(redisMessageStore()) .releaseStrategy(g -> g.size() == 5) .outputProcessor(g -> { String msg = g.getMessages().stream() .map(m -> (String) m.getPayload()) .collect(Collectors.joining(",")); if (msg.equals("ずん,ずん,ずん,ずん,どこ")) { msg += ",きよし!"; } return msg; }) .expireGroupsUponCompletion(true)) // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する .log() .nullChannel(); // .<List<String>>handle((p, h) -> { // if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) { // log.error("きよし!"); // } // return null; // }); }
Aggregator 用のクラスを定義して @CorrelationStrategy、@ReleaseStrategy、@Aggregator アノテーションを付与したメソッドを用意する書き方もあります。
@Bean public IntegrationFlow kiyoshiFlow() { return f -> f // メッセージが5つ溜まったら集約して次の処理に流す // * 流れてくる message の header に correlationId がないので // correlationExpression("1") を記述して固定で "1" という扱いにする。 // これで全てのメッセージが1つの MESSAGE_QUEUE に蓄積される。 // * expireGroupsUponCompletion(true) を記述すると5つ集約したメッセージを // 次の処理に流した後に再びメッセージが蓄積されるようになる。 .aggregate(a -> a.processor(new ZundokoAggregator()) .messageStore(redisMessageStore()) .expireGroupsUponCompletion(true)) // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する .log() .<List<String>>handle((p, h) -> { if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) { log.error("きよし!"); } return null; }); } static class ZundokoAggregator { @CorrelationStrategy public String correlationKey(Message<?> message) { return "1"; } @ReleaseStrategy public boolean canRelease(List<Message<?>> messages) { return messages.size() == 5; } @Aggregator public List<String> aggregate(List<String> payloads) { // .outputProcessor(...) での加工処理を実装したい場合にはこのメソッド内に記述する return payloads; } }
"ずん"、"ずん"、"ずん"、"ずん"、"どこ" で1セットのメッセージを送信後分解してから再び1セットのメッセージに aggregate するサンプルを作成する
次に以下の動作を行うサンプルを作成します。
- "ずん", "ずん", "ずん", "ずん", "どこ" の配列が palyload にセットされたメッセージを出力する MessageSource を作成する。
- MessageSource からは 5秒ごとに取得する。
- MessageSource からメッセージを取得したら Splitter で分解して、マルチスレッドで処理させるための channel に渡す。
- マルチスレッド側ではランダムで決められた秒数(最大5秒)待機してから aggregator へ渡す。
- aggregator では Splitter で分割されていたメッセージを全て集めたら1つのメッセージにして次の処理に流す。また3秒以内に分割していたメッセージが全て集まらなかった時には MESSAGE_GROUP を破棄する。
- 集約されたメッセージが "ずん", "ずん", "ずん", "ずん", "どこ" だったら "きよし!" とログに出力する。
src/main/java/ksbysample/eipapp/aggregator/RandomZundokoFlowConfig.java を実行しないためにクラスに付与した @Configuration アノテーションをコメントアウトした後、
@Slf4j //@Configuration public class RandomZundokoFlowConfig {
src/main/java/ksbysample/eipapp/aggregator の下に SplitZundokoFlowConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.aggregator; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.DelayerEndpointSpec; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.redis.store.RedisMessageStore; import org.springframework.integration.support.json.JacksonJsonUtils; import org.springframework.messaging.support.MessageBuilder; import java.util.Arrays; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; @Slf4j @Configuration public class SplitZundokoFlowConfig { private static final String[] ZUNDOKO = new String[]{"ずん", "ずん", "ずん", "ずん", "どこ"}; private static final long POLLER_DELAY_PERIOD = 5000L; private static final int DELAY_MAX_VALUE = 5000; private static final long MESSAGE_GROUP_TIMEOUT = 3000L; // Redis の設定はデフォルト値をそのまま使用するので、application.poperties には何も記載していない // https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html#common-application-properties // の spring.redis.~ を参照。 private final RedisConnectionFactory redisConnectionFactory; public SplitZundokoFlowConfig(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } /** * Redis を MessageStore として使用するための設定。 * Redis に格納する message は JSON フォーマットにする。 * https://docs.spring.io/spring-integration/docs/current/reference/html/#redis-message-store * 参照。 * * @return {@link RedisMessageStore} object */ @Bean public RedisMessageStore redisMessageStore() { RedisMessageStore store = new RedisMessageStore(redisConnectionFactory); ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper(); RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper); store.setValueSerializer(serializer); return store; } @Bean public MessageSource<String[]> zundokoMessageSource() { return () -> MessageBuilder.withPayload(ZUNDOKO).build(); } @Bean public IntegrationFlow zundokoFlow() { return IntegrationFlows.from(zundokoMessageSource() , e -> e.poller(Pollers.fixedDelay(POLLER_DELAY_PERIOD))) // Message<String[]> --> Message<String> x 5 に分割する .split() // ここから下はマルチスレッドで処理する .channel(c -> c.executor(Executors.newFixedThreadPool(5))) // 5秒以内(ランダムで決める)の間 delay する // group timeout した時にどのメッセージが原因だったのかが分かるようにするために // delay header を追加して値をセットする .enrichHeaders(h -> h.headerFunction("delay", m -> String.valueOf(ThreadLocalRandom.current().nextInt(DELAY_MAX_VALUE)))) .delay("ZUNDOKO_DELAYER", (DelayerEndpointSpec e) -> e.delayExpression("headers.delay")) // kiyoshiFlow にメッセージを送信する .channel(kiyoshiFlow().getInputChannel()) .get(); } @Bean public IntegrationFlow kiyoshiFlow() { return f -> f .aggregate(a -> a .messageStore(redisMessageStore()) .releaseStrategy(g -> g.getMessages().size() == g.getSequenceSize()) .expireGroupsUponCompletion(true) // .groupTimeout(...) で指定した時間内に aggregator で処理されなかったメッセージは // .discardChannel(...) で指定した channel に送信される // ※既に MESSAGE_GROUP に蓄積されていたメッセージも discardChannel に送信される .groupTimeout(MESSAGE_GROUP_TIMEOUT) .discardChannel(discardFlow().getInputChannel())) // {"ずん", "ずん", "ずん", "ずん", "どこ"} と揃っていたら "きよし!" を出力する .log() .<List<String>>handle((p, h) -> { if (p.size() == 5 && p.equals(Arrays.asList(ZUNDOKO))) { log.error("きよし!"); } return null; }); } /** * MESSAGE_GROUP_TIMEOUT で指定された時間(3秒)以内に aggregator で処理されなかった MESSAGE GROUP * のメッセージが送信されてくる Flow * * @return {@link IntegrationFlow} object */ @Bean public IntegrationFlow discardFlow() { return f -> f .log(LoggingHandler.Level.WARN) .nullChannel(); } }
実行して Redis 内に蓄積されている message の状態を見ると、"MESSAGE_GROUP_" の後には correlationId としてセットされている UUID が表示されています。
コンソールを見ると "ずん", "ずん", "ずん", "ずん", "どこ" に戻った場合には "きよし!" のログが出力されており、また3秒以内にメッセージが全て集まらなかった場合には "Expiring MessageGroup with correlaionKey[...]" のログと既に蓄積されていたメッセージが出力されて、その後に後からきたメッセージが出力されていました(header の delay を見ると4秒以上の遅延時間がセットされていることが分かります)。
履歴
2019/06/15
初版発行。
IntelliJ IDEA を 2019.1.2 → 2019.1.3 へバージョンアップ
IntelliJ IDEA を 2019.1.2 → 2019.1.3 へバージョンアップする
IntelliJ IDEA の 2019.1.3 がリリースされているのでバージョンアップします。
- IntelliJ IDEA 2019.1.3 is here!
https://blog.jetbrains.com/idea/2019/05/intellij-idea-2019-1-3-is-here/
※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。
IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。
「IDE and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。
Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。
Patch がダウンロードされて IntelliJ IDEA が再起動します。
IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。
IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2019.1.3 へバージョンアップされていることを確認します。
Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。
更新可能な Plugin があるというダイアログが画面右下に表示されたので、再度 IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。
「IDE and Plugin Updates」ダイアログが表示されますので「Update」ボタンをクリックします。
Plugin がインストールされます。画面右下に Restart のダイアログが表示されますので、リンクをクリックして IntelliJ IDEA を再起動します。
IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。
clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。
Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。
IntelliJ IDEA を 2019.1.1 → 2019.1.2 へバージョンアップ
IntelliJ IDEA を 2019.1.1 → 2019.1.2 へバージョンアップする
IntelliJ IDEA の 2019.1.2 がリリースされているのでバージョンアップします。
- IntelliJ IDEA 2019.1.2 is here!
https://blog.jetbrains.com/idea/2019/05/intellij-idea-2019-1-2-is-here/
※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。
IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。
「IDE and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。
Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。
Patch がダウンロードされて IntelliJ IDEA が再起動します。
IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。
IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2019.1.2 へバージョンアップされていることを確認します。
Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。
更新可能な Plugin があるというダイアログが画面右下に表示されたので、再度 IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。
「IDE and Plugin Updates」ダイアログが表示されますので「Update」ボタンをクリックします。
Plugin がインストールされます。画面右下に Restart のダイアログが表示されますので、リンクをクリックして IntelliJ IDEA を再起動します。
IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。
clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。
Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。