Spring Boot + Spring Integration でいろいろ試してみる ( その34 )( Docker Compose でサーバを構築する、Kafka 編 )
概要
記事一覧はこちらです。
Spring Integration のアプリケーションで使用するサーバを Docker Compose で構築します。
- Kafka+zookeeper の環境を構築します。
- Kafka の Dockerイメージは wurstmeister/kafka を使用します。
- zookeeper の Dockerイメージは zookeeper を使用します。
- 今回は Kafka、zookeeper どちらも単体サーバで構成します。
参照したサイト・書籍
kafka - Documentation
https://kafka.apache.org/documentation/wurstmeister/kafka(Docker Hub のページ)
https://hub.docker.com/r/wurstmeister/kafka/wurstmeister/kafka-docker(GitHub のページ)
https://github.com/wurstmeister/kafka-dockerzookeeper(Docker Hub のページ)
https://hub.docker.com/_/zookeeper31z4/zookeeper-docker(GitHub のページ)
https://github.com/31z4/zookeeper-dockerDeploy a Kafka broker in a Docker container
https://www.kaaproject.org/kafka-dockerApache Kafkaをインストールして、コマンドラインツールを試す
http://pppurple.hatenablog.com/entry/2018/10/10/232733Spring for Apache Kafka
https://docs.spring.io/spring-kafka/reference/html/Intro to Apache Kafka with Spring
https://www.baeldung.com/spring-kafkaApache Kafkaって本当に大丈夫?~故障検証のオーバービューと興味深い挙動の紹介~
https://www.slideshare.net/hadoopxnttdata/hadoop-spark-conference-japan-2019nttdataLINEの大規模データパイプラインを支える、Apache Kafkaプラットフォームの運用の裏側
https://logmi.jp/tech/articles/320330Apache Kafkaの概要とアーキテクチャ
https://qiita.com/sigmalist/items/5a26ab519cbdf1e07af3
目次
- ksbysample-eipapp-kafka プロジェクトを作成する
- docker-compose.yml を作成する
- サーバを起動する
- KafkaTemplate と @KafkaListener でデータを送受信するサンプルを作成する
- group.id が同じ consumer はどれか1つでしかメッセージを受信しない
- group.id が異なれば、group1 の consumer でメッセージを受信した後 group2 の consumer でもメッセージを受信する
手順
ksbysample-eipapp-kafka プロジェクトを作成する
Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。
plugins { id "org.springframework.boot" version "2.1.6.RELEASE" id "java" id "idea" } apply plugin: "io.spring.dependency-management" group = "ksbysample.eipapp" version = "0.0.1-SNAPSHOT" sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } configurations { // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする testAnnotationProcessor.extendsFrom annotationProcessor testImplementation.extendsFrom compileOnly } repositories { mavenCentral() } dependencyManagement { imports { mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES) mavenBom("org.junit:junit-bom:5.5.0") } } dependencies { def lombokVersion = "1.18.8" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE") testImplementation("org.springframework.boot:spring-boot-starter-test") // for lombok // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している annotationProcessor("org.projectlombok:lombok:${lombokVersion}") compileOnly("org.projectlombok:lombok:${lombokVersion}") // for JUnit 5 testCompile("org.junit.jupiter:junit-jupiter") testRuntime("org.junit.platform:junit-platform-launcher") } test { // for JUnit 5 useJUnitPlatform() testLogging { events "STARTED", "PASSED", "FAILED", "SKIPPED" } }
dependencies block に
implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE")
を追加します。spring-integration-kafka は Appendix F. Dependency versions に記載されていませんので、バージョン番号も明記します。spring-integration-kafka を追加すると以下のモジュールが依存関係に追加されます。
\--- org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE +--- org.springframework.integration:spring-integration-core:5.1.6.RELEASE (*) \--- org.springframework.kafka:spring-kafka:2.2.7.RELEASE +--- org.springframework:spring-context:5.1.7.RELEASE -> 5.1.8.RELEASE (*) +--- org.springframework:spring-messaging:5.1.7.RELEASE -> 5.1.8.RELEASE (*) +--- org.springframework:spring-tx:5.1.7.RELEASE -> 5.1.8.RELEASE (*) +--- org.springframework.retry:spring-retry:1.2.4.RELEASE \--- org.apache.kafka:kafka-clients:2.0.1 +--- org.lz4:lz4-java:1.4.1 +--- org.xerial.snappy:snappy-java:1.1.7.1 \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.26
ディレクトリ構成は以下のようにしています。
docker-compose.yml を作成する
今回は zookeeper、kafka どちらも単体のサーバで起動します。
プロジェクトのルートディレクトリ直下に docker-compose.yml を新規作成し、以下の内容を記述します。
version: '3' services: ############################################################################# # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it zookeeper /bin/sh ############################################################################# # 単体 zookeeper zookeeper: image: zookeeper:3.5.5 container_name: zookeeper-1 hostname: zookeeper-1 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 restart: always ############################################################################# # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it kafka /bin/sh ############################################################################# # 単体 Kafka kafka: image: wurstmeister/kafka:2.12-2.2.1 container_name: kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181 # Topic1 will have 1 partition and 1 replicas KAFKA_CREATE_TOPICS: "Topic1:1:1" depends_on: - zookeeper
サーバを起動する
コマンドプロンプトを起動し docker-compose.yml のあるディレクトリへ移動して docker-compose up -d
コマンドを実行して起動します。
IntelliJ IDEA の docker plugin を見ると kafka と zookeeper のコンテナが起動していることが確認できます。
kafka コンテナに接続して kafka-topics.sh で topics の作成状況を確認します(kafka-topics.sh は kafka の Documentation 参照)。
kafka-console-consumer.sh と kafka-console-producer.sh でメッセージの送受信のテストをしてみます。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1
を実行して Topic1 にメッセージが送信されるのを待機します。kafka-console-producer.sh --broker-list localhost:9092 --topic Topic1
を実行後、文字列を入力してメッセージを送信します。
kafka-console-producer.sh を起動した方でメッセージを入力すると kafka-console-consumer.sh を起動した方で入力されたメッセージが表示されました。メッセージの送受信は問題なく出来るようです。kafka-console-producer.sh は Ctrl+D を、kafka-console-consumer.sh は Ctrl+C を入力して終了します。
Topic1 のメッセージをクリアしたいので、docker-compose down
、docker-compose up -d
コマンドを実行してコンテナを起動し直します。
KafkaTemplate と @KafkaListener でデータを送受信するサンプルを作成する
Spring Integration DSL で書く前に KafkaTemplate と @KafkaListener でデータを送受信するサンプルを JUnit 5 のテストで作成します。
src/main/resources/application.properties を以下のように変更します。
spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=topic1-consumer-group1
src/test/java/ksbysample/eipapp/kafka を作成した後、その下に KafkaSendAndReceiveTest.java を新規作成して以下の内容を記述します。
package ksbysample.eipapp.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; @Slf4j @SpringBootTest public class KafkaSendAndReceiveTest { private static final String TOPIC_NAME = "Topic1"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Test void sendToKafkaTest() { kafkaTemplate.send(TOPIC_NAME, "test message"); // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { @KafkaListener(topics = TOPIC_NAME) public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) { log.warn(cr.toString()); } } static class SleepUtils { public static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
テストを実行すると送信したメッセージがを @KafkaListener アノテーションを付与した listenByConsumerRecord メソッドで受信してログが出力されます。
log.warn(cr.toString());
では ConsumerRecord(topic = Topic1, partition = 0, offset = 16, CreateTime = 1562469618138, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test message)
というメッセージが出力されていました。
group.id が同じ consumer はどれか1つでしかメッセージを受信しない
kafka-console-consumer.sh を使って動作確認します。
まずコマンドプロンプトを2つ起動してから docker exec -it kafka /bin/sh
コマンドで kafka のコンテナに接続した後、kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1 --group topic1-consumer-group1
コマンドを実行して同じ group.id でメッセージ送信を待ちます。
KafkaSendAndReceiveTest.java から SleepUtils.sleep(5_000);
と @KafkaListener をコメントアウトして、メッセージを受信せず、かつ送信後すぐにテストが終了するようにします。
@Test void sendToKafkaTest() { kafkaTemplate.send(TOPIC_NAME, "test message"); // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する // SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { // @KafkaListener(topics = TOPIC_NAME) // public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) { // log.warn(cr.toString()); // SleepUtils.sleep(1_000); // } }
テストを実行してメッセージを送信すると、2つ起動している consumer の内1つだけ受信します。
追加で2回テストを実行してみるとメッセージを受信したのは最初に受信した consumer でした。
テストクラスで 1000件連続してメッセージを送信するように変更してみましたが、
@Test void sendToKafkaTest() { // kafkaTemplate.send(TOPIC_NAME, "test message"); for (int i = 1; i <= 1000; i++) { kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)); kafkaTemplate.flush(); } // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する // SleepUtils.sleep(5_000); }
やっぱり一方の consumer だけしか受信しません。
受信していた consumer を落とすと、今度はもう一方の接続していた consumer でメッセージを受信しました。
ここまでの動作をまとめると、
- 同じ group.id の consumer を複数接続してもメッセージを受信するのはその内の1つ。
- 大量のメッセージを送信してもメッセージを受信するのは決められた1つだけである。複数接続している consumer で分散して受信する訳ではない。
- 分散する設定があるのかは不明。
group.id が異なれば、group1 の consumer でメッセージを受信した後 group2 の consumer でもメッセージを受信する
docker-compose down
、docker-compose up -d
コマンドを実行してコンテナを起動し直した後、今度は group.id が異なる kafka-console-consumer.sh を実行します。
テストを実行してメッセージを送信すると、各 consumer でメッセージを受信しました。
もう1点。コンテナを起動し直して Topic1 のメッセージをクリアしてから group.id が topic1-consumer-group1 の kafka-console-consumer.sh だけ起動しておきます。
テストを実行してメッセージを送信すると起動しておいた consumer でメッセージを受信しますが、
後から別の group.id(topic1-consumer-group2)の kafka-console-consumer.sh を起動すると、この時は topic1-consumer-group2 の consumer ではメッセージを受信しません。
ただし1度 group.id = topic1-consumer-group2 の接続情報が登録されると、一旦 topic1-consumer-group2 の kafka-console-consumer.sh を終了してから、
テストを実行してメッセージを送信して topic1-consumer-group1 の kafka-console-consumer.sh でメッセージを受信した後、
topic1-consumer-group2 の kafka-console-consumer.sh を起動すると、起動前に送信されたメッセージが受信されます。
KafkaSendAndReceiveTest.java 内で異なる group.id の @KafkaListener を用意してメッセージを受信するには以下のように変更します。
@Test void sendToKafkaTest() { kafkaTemplate.send(TOPIC_NAME, "test message"); // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { @KafkaListener(topics = TOPIC_NAME, groupId = "topic1-consumer-group1") public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) { log.warn(cr.toString()); } // group.id は @KafkaListener アノテーションの groupId 属性で指定可能 // メソッドの引数にはメッセージのデータの型(今回は String)の引数を指定することも可能 @KafkaListener(topics = TOPIC_NAME, groupId = "topic1-consumer-group2") public void listenByString(String msg) { log.error(msg); } }
テストを実行すると @KafkaListener アノテーションを付与したメソッドそれぞれでメッセージを受信していることが確認できます。
Spring Integration DSL でのサンプルや Kafka、zookeeper を複数サーバ起動する場合のサンプルを作成したいので、あと数回続きます。
履歴
2019/07/07
初版発行。