かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その37 )( Docker Compose でサーバを構築する、Kafka 編4 - zookeeper と kafka を cluster 構成に変更する )

概要

記事一覧はこちらです。

zookeeper, kafka をどちらも単体サーバで構築していましたが、zookeeper x 3、kafka x 5 の cluster 構成に変更してみます。Topic1 も 3 partition and 3 replicas で作成します。

参照したサイト・書籍

  1. cp-docker-images/examples/kafka-cluster/docker-compose.yml
    https://github.com/confluentinc/cp-docker-images/blob/5.3.0-post/examples/kafka-cluster/docker-compose.yml

  2. Building Apache Kafka cluster using docker-compose and VirtualBox
    https://better-coding.com/building-apache-kafka-cluster-using-docker-compose-and-virtualbox/

  3. confluentinc/cp-kafkacat
    https://hub.docker.com/r/confluentinc/cp-kafkacat

  4. Kafka __consumer_offsets topic
    https://medium.com/@felipedutratine/kafka-consumer-offsets-topic-3d5483cda4a6

目次

  1. docker-compose.yml を変更する
  2. docker-compose up -d で起動し、kafkacat で broker が追加されたことを確認する
  3. Topic1 を 3 partition and 3 replicas で作成する
  4. cp-kafka2 で kafka-console-producer を、cp-kafka3 で kafka-console-consumer を実行しメッセージを送受信してみる
  5. Spring Integration DSL のアプリケーションでメッセージの送受信ができるようにする

手順

docker-compose.yml を変更する

docker-compose.yml を以下のように変更します。

version: '3'

services:
  ..........

  # zookeeper cluster
  cp-zookeeper1:
    image: confluentinc/cp-zookeeper:5.3.0
    container_name: cp-zookeeper1
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888
  cp-zookeeper2:
    image: confluentinc/cp-zookeeper:5.3.0
    container_name: cp-zookeeper2
    ports:
      - "22181:22181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888
  cp-zookeeper3:
    image: confluentinc/cp-zookeeper:5.3.0
    container_name: cp-zookeeper3
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888

  ..........

  # Kafka cluster
  # docker exec -it cp-kafka1 /bin/bash
  # docker exec -it cp-kafka2 /bin/bash
  # docker exec -it cp-kafka3 /bin/bash
  # topic の作成・一覧取得・詳細表示
  # kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  # kafka-topics --zookeeper cp-zookeeper1:12181 --list
  # kafka-topics --zookeeper cp-zookeeper1:12181 --topic Topic1 --describe
  # kafka-console-producer --broker-list localhost:19092 --topic Topic1
  # kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic1
  cp-kafka1:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka1
    ports:
      - "19092:19092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka1:19092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  cp-kafka2:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka2
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka2:29092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  cp-kafka3:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka3
    ports:
      - "39092:39092"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka3:39092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  cp-kafka4:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka4
    ports:
      - "49092:49092"
    environment:
      KAFKA_BROKER_ID: 4
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka4:49092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  cp-kafka5:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka5
    ports:
      - "59092:59092"
    environment:
      KAFKA_BROKER_ID: 5
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka5:59092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  • confluentinc/cp-zookeeper, confluentinc/cp-kafka の最新バージョンが 5.3.0 になっていたので、このバージョンを使用します。
  • zookeeper の cluster 構成は以下のように設定します。
    • コンテナ名は cp-zookeeper1, cp-zookeeper2, cp-zookeeper3 と末尾に連番の数字(1~3)を付けた名前にします。
    • ポート番号は単体サーバの時は 2181 を使用しましたが、cluster 構成では cp-zookeeper1(12181番), cp-zookeeper2(22181番), cp-zookeeper3(32181番)を使用します。
    • ZOOKEEPER_SERVER_ID には各コンテナで一意の数字になるよう 1~3 の値を設定します。
    • ZOOKEEPER_INIT_LIMIT, ZOOKEEPER_SYNC_LIMIT, ZOOKEEPER_SERVERS は cp-docker-images/examples/kafka-cluster/docker-compose.yml を参考に設定します。
  • kafka の cluster 構成は以下のように設定します。
    • コンテナ名は cp-kafka1, cp-kafka2, cp-kafka3, cp-kafka4, cp-kafka5 と末尾に連番の数字(1~5)を付けた名前にします。
    • ポート番号は単体サーバの時は 9092 を使用しましたが、cluster 構成では cp-kafka1(19092番), cp-kafka2(29092番), cp-kafka3(39092番), cp-kafka4(49092番), cp-kafka5(59092番)を使用します。
    • KAFKA_BROKER_ID には各コンテナで一意の数字になるよう 1~5 の値を設定します。
    • KAFKA_ZOOKEEPER_CONNECT には cluster 構成に変更した cp-zookeeper コンテナ名+ポート番号をカンマ区切りで列挙します。
    • KAFKA_ADVERTISED_LISTENERS には PLAINTEXT://<cp-kafka コンテナ名>:<コンテナで使用するポート番号> を記述します。
    • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP、KAFKA_INTER_BROKER_LISTENER_NAME、KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR の設定は入れません。
    • depends_on に cp-zookeeper コンテナ名を列挙します。

docker-compose up -d で起動し、kafkacat で broker が追加されたことを確認する

kafka cluster は起動しても broker が追加されるまで少し時間がかかります。broker が追加されたことを確認できるよう confluentinc/cp-kafkacat コンテナを利用して kafkacat コマンドを使用できるようにします。

kafka cluster を起動する前に docker run --rm --name cp-kafkacat confluentinc/cp-kafkacat:5.3.0 コマンドを実行して cp-kafkacat の docker image をダウンロードしておきます。

f:id:ksby:20190813161520p:plain ※kafkacat の usage が表示されます。

kafka cluster を起動します。docker-compose up -d コマンドを実行した後、

f:id:ksby:20190813162354p:plain

docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L コマンドを実行して broker が追加されるまで待ちます。最初のうちは ERROR: Failed to acquire metadata: Local: Broker transport failure のメッセージが表示されますが、しばらくすると broker が表示されるようになります。topics の情報まで表示されるようになったら完了です。

f:id:ksby:20190813162545p:plain f:id:ksby:20190813162731p:plain

Topic1 を 3 partition and 3 replicas で作成する

cp-kafka1 コンテナに接続した後、以下のコマンドを実行して Topic1 を作成・確認します。

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --topic Topic1 --describe

f:id:ksby:20190813173008p:plain ※topic の各 partition の一番右側に表示されている isrs は in-sync replica(isr)+複数形の s です。

docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L コマンドを実行すると、こちらでも追加された Topic1 の情報が表示されます。

f:id:ksby:20190813173220p:plain

cp-kafka2 で kafka-console-producer を、cp-kafka3 で kafka-console-consumer を実行しメッセージを送受信してみる

cp-kafka2 コンテナに接続して kafka-console-producer --broker-list localhost:29092 --topic Topic1 コマンドを実行し、cp-kafka3 コンテナに接続して kafka-console-consumer --bootstrap-server localhost:39092 --topic Topic1 コマンドを実行して、メッセージの送受信ができることを確認してみます。

f:id:ksby:20190813202218p:plain

別の kafka に接続していますが、問題なくメッセージの送受信が出来ました。

またメッセージを送受信した後で docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L コマンドを実行すると topic "__consumer_offsets" with 50 partitions というものが作成されていました(しかもレプリケーションされています)。

f:id:ksby:20190813203130p:plain f:id:ksby:20190813203231p:plain

Spring Integration DSL のアプリケーションでメッセージの送受信ができるようにする

※Spring Integration DSL のアプリケーションは 前回 の内容から「メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、9 なら partition 2 へ送信する」の変更だけ元に戻しました。

ホスト名が localhost だと Docker Compose で起動した kafka cluster と通信ができないため、コンテナ名で通信できるよう以下の設定を hosts ファイルに記述します。

127.0.0.1   cp-kafka1 cp-kafka2 cp-kafka3 cp-kafka4 cp-kafka5

src/main/resources/application.properties を以下のように変更します。

spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
spring.kafka.consumer.group-id=topic1-consumer-group1
  • spring.kafka.consumer.bootstrap-servers=localhost:9092 を削除し、spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092 を追加します。producer、consumer で同じサーバを指定する場合には spring.kafka.bootstrap-servers で指定すればよいことに気づきました。。。

アプリケーションを起動すると最初に consumer が受信する topic の partition が調整された後、以前と同様にメッセージを送受信するようになりました。

f:id:ksby:20190814231810p:plain f:id:ksby:20190814231914p:plain

次回は broker(kafka サーバ)を停止した時にどのような動きをするのか見てみます。

履歴

2019/08/16
初版発行。