かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その34 )( Docker Compose でサーバを構築する、Kafka 編 )

概要

記事一覧はこちらです。

Spring Integration のアプリケーションで使用するサーバを Docker Compose で構築します。

  • Kafka+zookeeper の環境を構築します。
  • Kafka の Dockerイメージは wurstmeister/kafka を使用します。
  • zookeeper の Dockerイメージは zookeeper を使用します。
  • 今回は Kafka、zookeeper どちらも単体サーバで構成します。

参照したサイト・書籍

  1. kafka - Documentation
    https://kafka.apache.org/documentation/

  2. wurstmeister/kafka(Docker Hub のページ)
    https://hub.docker.com/r/wurstmeister/kafka/

  3. wurstmeister/kafka-docker(GitHub のページ)
    https://github.com/wurstmeister/kafka-docker

  4. zookeeper(Docker Hub のページ)
    https://hub.docker.com/_/zookeeper

  5. 31z4/zookeeper-docker(GitHub のページ)
    https://github.com/31z4/zookeeper-docker

  6. Deploy a Kafka broker in a Docker container
    https://www.kaaproject.org/kafka-docker

  7. Apache Kafkaをインストールして、コマンドラインツールを試す
    http://pppurple.hatenablog.com/entry/2018/10/10/232733

  8. Spring for Apache Kafka
    https://docs.spring.io/spring-kafka/reference/html/

  9. Intro to Apache Kafka with Spring
    https://www.baeldung.com/spring-kafka

  10. Apache Kafkaって本当に大丈夫?~故障検証のオーバービューと興味深い挙動の紹介~
    https://www.slideshare.net/hadoopxnttdata/hadoop-spark-conference-japan-2019nttdata

  11. LINEの大規模データパイプラインを支える、Apache Kafkaプラットフォームの運用の裏側
    https://logmi.jp/tech/articles/320330

  12. Apache Kafkaの概要とアーキテクチャ
    https://qiita.com/sigmalist/items/5a26ab519cbdf1e07af3

目次

  1. ksbysample-eipapp-kafka プロジェクトを作成する
  2. docker-compose.yml を作成する
  3. サーバを起動する
  4. KafkaTemplate と @KafkaListener でデータを送受信するサンプルを作成する
  5. group.id が同じ consumer はどれか1つでしかメッセージを受信しない
  6. group.id が異なれば、group1 の consumer でメッセージを受信した後 group2 の consumer でもメッセージを受信する

手順

ksbysample-eipapp-kafka プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。

plugins {
    id "org.springframework.boot" version "2.1.6.RELEASE"
    id "java"
    id "idea"
}

apply plugin: "io.spring.dependency-management"

group = "ksbysample.eipapp"
version = "0.0.1-SNAPSHOT"

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

configurations {
    // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする
    testAnnotationProcessor.extendsFrom annotationProcessor
    testImplementation.extendsFrom compileOnly
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.5.0")
    }
}

dependencies {
    def lombokVersion = "1.18.8"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}
  • dependencies block に implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE") を追加します。spring-integration-kafka は Appendix F. Dependency versions に記載されていませんので、バージョン番号も明記します。

    spring-integration-kafka を追加すると以下のモジュールが依存関係に追加されます。

\--- org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE
     +--- org.springframework.integration:spring-integration-core:5.1.6.RELEASE (*)
     \--- org.springframework.kafka:spring-kafka:2.2.7.RELEASE
          +--- org.springframework:spring-context:5.1.7.RELEASE -> 5.1.8.RELEASE (*)
          +--- org.springframework:spring-messaging:5.1.7.RELEASE -> 5.1.8.RELEASE (*)
          +--- org.springframework:spring-tx:5.1.7.RELEASE -> 5.1.8.RELEASE (*)
          +--- org.springframework.retry:spring-retry:1.2.4.RELEASE
          \--- org.apache.kafka:kafka-clients:2.0.1
               +--- org.lz4:lz4-java:1.4.1
               +--- org.xerial.snappy:snappy-java:1.1.7.1
               \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.26

ディレクトリ構成は以下のようにしています。

f:id:ksby:20190707100947p:plain

docker-compose.yml を作成する

今回は zookeeper、kafka どちらも単体のサーバで起動します。

プロジェクトのルートディレクトリ直下に docker-compose.yml を新規作成し、以下の内容を記述します。

version: '3'

services:
  #############################################################################
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it zookeeper /bin/sh
  #############################################################################
  # 単体 zookeeper
  zookeeper:
    image: zookeeper:3.5.5
    container_name: zookeeper-1
    hostname: zookeeper-1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181
    restart: always

  #############################################################################
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it kafka /bin/sh
  #############################################################################
  # 単体 Kafka
  kafka:
    image: wurstmeister/kafka:2.12-2.2.1
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      # Topic1 will have 1 partition and 1 replicas
      KAFKA_CREATE_TOPICS: "Topic1:1:1"
    depends_on:
      - zookeeper

サーバを起動する

コマンドプロンプトを起動し docker-compose.yml のあるディレクトリへ移動して docker-compose up -d コマンドを実行して起動します。

f:id:ksby:20190707103616p:plain

IntelliJ IDEA の docker plugin を見ると kafka と zookeeper のコンテナが起動していることが確認できます。

f:id:ksby:20190707103808p:plain

kafka コンテナに接続して kafka-topics.sh で topics の作成状況を確認します(kafka-topics.sh は kafka の Documentation 参照)。

f:id:ksby:20190707104734p:plain

kafka-console-consumer.sh と kafka-console-producer.sh でメッセージの送受信のテストをしてみます。

  • kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1 を実行して Topic1 にメッセージが送信されるのを待機します。
  • kafka-console-producer.sh --broker-list localhost:9092 --topic Topic1 を実行後、文字列を入力してメッセージを送信します。

f:id:ksby:20190707110012p:plain

kafka-console-producer.sh を起動した方でメッセージを入力すると kafka-console-consumer.sh を起動した方で入力されたメッセージが表示されました。メッセージの送受信は問題なく出来るようです。kafka-console-producer.sh は Ctrl+D を、kafka-console-consumer.sh は Ctrl+C を入力して終了します。

Topic1 のメッセージをクリアしたいので、docker-compose downdocker-compose up -d コマンドを実行してコンテナを起動し直します。

KafkaTemplate と @KafkaListener でデータを送受信するサンプルを作成する

Spring Integration DSL で書く前に KafkaTemplate と @KafkaListener でデータを送受信するサンプルを JUnit 5 のテストで作成します。

src/main/resources/application.properties を以下のように変更します。

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=topic1-consumer-group1

src/test/java/ksbysample/eipapp/kafka を作成した後、その下に KafkaSendAndReceiveTest.java を新規作成して以下の内容を記述します。

package ksbysample.eipapp.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

@Slf4j
@SpringBootTest
public class KafkaSendAndReceiveTest {

    private static final String TOPIC_NAME = "Topic1";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void sendToKafkaTest() {
        kafkaTemplate.send(TOPIC_NAME, "test message");
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

        @KafkaListener(topics = TOPIC_NAME)
        public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) {
            log.warn(cr.toString());
        }

    }

    static class SleepUtils {

        public static void sleep(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

    }

}

テストを実行すると送信したメッセージがを @KafkaListener アノテーションを付与した listenByConsumerRecord メソッドで受信してログが出力されます。

f:id:ksby:20190707122134p:plain

log.warn(cr.toString()); では ConsumerRecord(topic = Topic1, partition = 0, offset = 16, CreateTime = 1562469618138, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test message) というメッセージが出力されていました。

group.id が同じ consumer はどれか1つでしかメッセージを受信しない

kafka-console-consumer.sh を使って動作確認します。

まずコマンドプロンプトを2つ起動してから docker exec -it kafka /bin/sh コマンドで kafka のコンテナに接続した後、kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic1 --group topic1-consumer-group1 コマンドを実行して同じ group.id でメッセージ送信を待ちます。

f:id:ksby:20190707140347p:plain

KafkaSendAndReceiveTest.java から SleepUtils.sleep(5_000); と @KafkaListener をコメントアウトして、メッセージを受信せず、かつ送信後すぐにテストが終了するようにします。

    @Test
    void sendToKafkaTest() {
        kafkaTemplate.send(TOPIC_NAME, "test message");
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
//        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

//        @KafkaListener(topics = TOPIC_NAME)
//        public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) {
//            log.warn(cr.toString());
//            SleepUtils.sleep(1_000);
//        }

    }

テストを実行してメッセージを送信すると、2つ起動している consumer の内1つだけ受信します。

f:id:ksby:20190707141042p:plain

追加で2回テストを実行してみるとメッセージを受信したのは最初に受信した consumer でした。

f:id:ksby:20190707141314p:plain

テストクラスで 1000件連続してメッセージを送信するように変更してみましたが、

    @Test
    void sendToKafkaTest() {
//        kafkaTemplate.send(TOPIC_NAME, "test message");
        for (int i = 1; i <= 1000; i++) {
            kafkaTemplate.send(TOPIC_NAME, String.valueOf(i));
            kafkaTemplate.flush();
        }
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
//        SleepUtils.sleep(5_000);
    }

やっぱり一方の consumer だけしか受信しません。

f:id:ksby:20190707141842p:plain

受信していた consumer を落とすと、今度はもう一方の接続していた consumer でメッセージを受信しました。

f:id:ksby:20190707142330p:plain

ここまでの動作をまとめると、

  • 同じ group.id の consumer を複数接続してもメッセージを受信するのはその内の1つ。
  • 大量のメッセージを送信してもメッセージを受信するのは決められた1つだけである。複数接続している consumer で分散して受信する訳ではない。
  • 分散する設定があるのかは不明。

group.id が異なれば、group1 の consumer でメッセージを受信した後 group2 の consumer でもメッセージを受信する

docker-compose downdocker-compose up -d コマンドを実行してコンテナを起動し直した後、今度は group.id が異なる kafka-console-consumer.sh を実行します。

f:id:ksby:20190707150931p:plain

テストを実行してメッセージを送信すると、各 consumer でメッセージを受信しました。

f:id:ksby:20190707151254p:plain

もう1点。コンテナを起動し直して Topic1 のメッセージをクリアしてから group.id が topic1-consumer-group1 の kafka-console-consumer.sh だけ起動しておきます。

f:id:ksby:20190707151649p:plain

テストを実行してメッセージを送信すると起動しておいた consumer でメッセージを受信しますが、

f:id:ksby:20190707151837p:plain

後から別の group.id(topic1-consumer-group2)の kafka-console-consumer.sh を起動すると、この時は topic1-consumer-group2 の consumer ではメッセージを受信しません。

f:id:ksby:20190707152612p:plain

ただし1度 group.id = topic1-consumer-group2 の接続情報が登録されると、一旦 topic1-consumer-group2 の kafka-console-consumer.sh を終了してから、

f:id:ksby:20190707152847p:plain

テストを実行してメッセージを送信して topic1-consumer-group1 の kafka-console-consumer.sh でメッセージを受信した後、

f:id:ksby:20190707153024p:plain

topic1-consumer-group2 の kafka-console-consumer.sh を起動すると、起動前に送信されたメッセージが受信されます。

f:id:ksby:20190707153204p:plain

KafkaSendAndReceiveTest.java 内で異なる group.id の @KafkaListener を用意してメッセージを受信するには以下のように変更します。

    @Test
    void sendToKafkaTest() {
        kafkaTemplate.send(TOPIC_NAME, "test message");
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

        @KafkaListener(topics = TOPIC_NAME, groupId = "topic1-consumer-group1")
        public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) {
            log.warn(cr.toString());
        }

        // group.id は @KafkaListener アノテーションの groupId 属性で指定可能
        // メソッドの引数にはメッセージのデータの型(今回は String)の引数を指定することも可能
        @KafkaListener(topics = TOPIC_NAME, groupId = "topic1-consumer-group2")
        public void listenByString(String msg) {
            log.error(msg);
        }

    }

テストを実行すると @KafkaListener アノテーションを付与したメソッドそれぞれでメッセージを受信していることが確認できます。

f:id:ksby:20190707153939p:plain

Spring Integration DSL でのサンプルや Kafka、zookeeper を複数サーバ起動する場合のサンプルを作成したいので、あと数回続きます。

履歴

2019/07/07
初版発行。