かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その35 )( Docker Compose でサーバを構築する、Kafka 編2 )

概要

記事一覧はこちらです。

Kafka+zookeeper の環境を Docker Compose で構築して Spring Integration を利用したサンプルを作成するだけのつもりでいたのですが、KafkaApache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) を読んでみたら結構面白いサーバであることが分かったので、しばらくいろいろ試してみることにします。

  • docker-compose.yml を confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう書き直します。
  • partition を複数作成すれば複数の consumer で分散して受信できるようなので試します。

参照したサイト・書籍

  1. Kafka

    Kafka

    Kafka

  2. Apache Kafka 分散メッセージングシステムの構築と活用

    Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE)

    Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE)

  3. confluentinc/cp-kafka(Docker Hub のページ)
    https://hub.docker.com/r/confluentinc/cp-kafka

  4. confluentinc/cp-zookeeper(Docker Hub のページ)
    https://hub.docker.com/r/confluentinc/cp-zookeeper

  5. confluentinc/cp-docker-images
    https://github.com/confluentinc/cp-docker-images

    • examples の下に Kafka を cluster 構成にする docker-compose.yml のサンプルがあります。
  6. Confluent Platform Docker Images
    https://docs.confluent.io/current/installation/docker/index.html

  7. Confluent Platform Quick Start (Docker)
    https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html

    • control-center や ksql 等 confluentinc/cp-docker-images で入れられる機能を試したい場合の手順が記載されています(ただし今回の記事では試していません)。
  8. Generic Spring Kafka Listener
    https://stackoverflow.com/questions/42045033/generic-spring-kafka-listener

    • @KafkaListener による consumer で topicPartitions 属性で partition を指定したい場合の例が書かれています。

目次

  1. confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう docker-compose.yml を書き直す
  2. topic に partition を複数作成すればメッセージの受信を分散できる

手順

confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう docker-compose.yml を書き直す

Spring Boot + Spring Integration でいろいろ試してみる ( その34 )( Docker Compose でサーバを構築する、Kafka 編 ) で Kafka の Docker コンテナを作成した時に Kafka は Official Images がないのかな?と思っていたのですが、Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) を読むと Kafka のコミッタの半数以上が Confluent という会社に所属していることとインストールで Confluent が配布している Confluent Platform の OSS 版を利用することが記載されていました。

Docker Hub で "confluent" で検索してみると confluentinc/cp-kafkaconfluentinc/cp-zookeeper を見つけたので、この2つを利用するよう docker-compose.yml を書き直すことにします。

cp-docker-images/examples/ の下にサンプルが多数ありますので、今回は kafka-single-node/docker-compose.yml を参考にします(というよりほぼコピペです)。

Conflulent の Docker Image で利用可能なオプションは Docker Configuration に記載があります。

version: '3'

services:
  #############################################################################
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it cp-zookeeper /bin/bash
  #############################################################################
  # 単体 zookeeper
  cp-zookeeper:
    image: confluentinc/cp-zookeeper:5.2.2
    container_name: cp-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  #############################################################################
  # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する
  # docker exec -it cp-kafka /bin/bash
  #############################################################################
  # 単体 Kafka
  # topic の作成・一覧取得・詳細表示
  # kafka-topics --zookeeper cp-zookeeper:2181 --create --topic Topic1 --partitions 1 --replication-factor 1 --if-not-exists
  # kafka-topics --zookeeper cp-zookeeper:2181 --alter --topic Topic1 --partitions 3
  # kafka-topics --zookeeper cp-zookeeper:2181 --list
  # kafka-topics --zookeeper cp-zookeeper:2181 --topic Topic1 --describe
  # kafka-console-producer --broker-list localhost:9092 --topic Topic1
  # kafka-console-consumer --bootstrap-server localhost:9092 --topic Topic1
  cp-kafka:
    image: confluentinc/cp-kafka:5.2.2
    container_name: cp-kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - cp-zookeeper

docker-compose up -d コマンドを実行します。

f:id:ksby:20190713130313p:plain

IntelliJ IDEA の docker plugin を見ると kafka と zookeeper のコンテナが起動していることが確認できます。

f:id:ksby:20190713130411p:plain

topic を作成します。

f:id:ksby:20190713133500p:plain

kafka-console-producerkafka-console-consumer コマンドでメッセージの送受信が出来ることを確認します。

f:id:ksby:20190713133827p:plain

wurstmeister/kafka と比較すると Topic を作成するのに KAFKA_CREATE_TOPICS が使用できなくなりますが(コンテナに接続して kafka-topics --create コマンドを実行する必要があります)、cp-docker-images/examples/cp-all-in-one/docker-compose.yml を見ると control-center(管理画面らしい)や schema-registry(serializer, deserializer に Apache Avro を利用するのに必要らしい)、 ksql-server, ksql-cli(KSQLが試せる?)等もあるようなので、Kafka のいろいろな機能を試すならこちらの方が良さそうな気がします。

topic に partition を複数作成すればメッセージの受信を分散できる

partition が1つだけだと1つの consumer しか受信できませんでしたが、複数の partition を作成すれば複数の consumer で分散して受信できるようです。

Topic1 を partition = 3 で作成して試してみます。

f:id:ksby:20190713192841p:plain

src/test/java/ksbysample/eipapp/kafka/KafkaSendAndReceiveTest.java を以下のように変更してメッセージを 10件連続で送信できるようにする、かつ @KafkaListener による consumer は削除する、ようにします。またメッセージ送信時に正常に送信できていることを確認するよう kafkaTemplate.send(TOPIC_NAME, String.valueOf(i));kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get(); に変更します(こうすると送信時にエラーが出た時には例外が throw されます)。

    @Test
    void sendToKafkaTest() throws ExecutionException, InterruptedException {
//        kafkaTemplate.send(TOPIC_NAME, "test message");
        for (int i = 1; i <= 10; i++) {
            kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get();
        }
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
//        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

//        @KafkaListener(topics = TOPIC_NAME)
//        public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) {
//            log.warn(cr.toString());
//        }

    }

コマンドプロンプトを2つ起動して、どちらも kafka-console-consumer --bootstrap-server localhost:9092 --topic Topic1 --group topic1-consumer-group1 を実行します。

f:id:ksby:20190713193538p:plain

テストを実行してメッセージを 10件送信すると、2つの consumer で重複せずにメッセージを受信するようになりました。

f:id:ksby:20190713194257p:plain

テストクラスで複数の consumer で分散して受信してみます。KafkaSendAndReceiveTest.java を以下のように変更して consumer を2つ作成してから、

    @Test
    void sendToKafkaTest() throws ExecutionException, InterruptedException {
        // @KafkaListener の consumer が登録されるまで少し時間がかかるので 15秒 sleep する
        SleepUtils.sleep(15_000);

//        kafkaTemplate.send(TOPIC_NAME, "test message");
        for (int i = 1; i <= 10; i++) {
            kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get();
        }
        // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する
        SleepUtils.sleep(5_000);
    }

    @TestConfiguration
    static class TestConfig {

        @KafkaListener(topics = TOPIC_NAME)
        public void listenByConsumerRecord1(ConsumerRecord<?, ?> cr) {
            log.warn(String.format("partition = %d, message = %s", cr.partition(), cr.value()));
        }

        @KafkaListener(topics = TOPIC_NAME)
        public void listenByConsumerRecord2(ConsumerRecord<?, ?> cr) {
            log.error(String.format("partition = %d, message = %s", cr.partition(), cr.value()));
        }

    }

テストを実行すると consumer が受信する partition がアサインされて、

f:id:ksby:20190713203516p:plain

それぞれの consumer でメッセージを受信します。この場合、partition = 0, 1 がアサインされたのが listenByConsumerRecord1 で、partition = 2 がアサインされたのが listenByConsumerRecord2 でした。

f:id:ksby:20190713203615p:plain

履歴

2019/07/17
初版発行。