Spring Boot + Spring Integration でいろいろ試してみる ( その37 )( Docker Compose でサーバを構築する、Kafka 編4 - zookeeper と kafka を cluster 構成に変更する )
概要
記事一覧はこちらです。
zookeeper, kafka をどちらも単体サーバで構築していましたが、zookeeper x 3、kafka x 5 の cluster 構成に変更してみます。Topic1 も 3 partition and 3 replicas で作成します。
参照したサイト・書籍
cp-docker-images/examples/kafka-cluster/docker-compose.yml
https://github.com/confluentinc/cp-docker-images/blob/5.3.0-post/examples/kafka-cluster/docker-compose.ymlBuilding Apache Kafka cluster using docker-compose and VirtualBox
https://better-coding.com/building-apache-kafka-cluster-using-docker-compose-and-virtualbox/confluentinc/cp-kafkacat
https://hub.docker.com/r/confluentinc/cp-kafkacatKafka __consumer_offsets topic
https://medium.com/@felipedutratine/kafka-consumer-offsets-topic-3d5483cda4a6
目次
- docker-compose.yml を変更する
docker-compose up -d
で起動し、kafkacat で broker が追加されたことを確認する- Topic1 を 3 partition and 3 replicas で作成する
- cp-kafka2 で kafka-console-producer を、cp-kafka3 で kafka-console-consumer を実行しメッセージを送受信してみる
- Spring Integration DSL のアプリケーションでメッセージの送受信ができるようにする
手順
docker-compose.yml を変更する
docker-compose.yml を以下のように変更します。
version: '3' services: .......... # zookeeper cluster cp-zookeeper1: image: confluentinc/cp-zookeeper:5.3.0 container_name: cp-zookeeper1 ports: - "12181:12181" environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 12181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888 cp-zookeeper2: image: confluentinc/cp-zookeeper:5.3.0 container_name: cp-zookeeper2 ports: - "22181:22181" environment: ZOOKEEPER_SERVER_ID: 2 ZOOKEEPER_CLIENT_PORT: 22181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888 cp-zookeeper3: image: confluentinc/cp-zookeeper:5.3.0 container_name: cp-zookeeper3 ports: - "32181:32181" environment: ZOOKEEPER_SERVER_ID: 3 ZOOKEEPER_CLIENT_PORT: 32181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888 .......... # Kafka cluster # docker exec -it cp-kafka1 /bin/bash # docker exec -it cp-kafka2 /bin/bash # docker exec -it cp-kafka3 /bin/bash # topic の作成・一覧取得・詳細表示 # kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists # kafka-topics --zookeeper cp-zookeeper1:12181 --list # kafka-topics --zookeeper cp-zookeeper1:12181 --topic Topic1 --describe # kafka-console-producer --broker-list localhost:19092 --topic Topic1 # kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic1 cp-kafka1: image: confluentinc/cp-kafka:5.3.0 container_name: cp-kafka1 ports: - "19092:19092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka1:19092 depends_on: - cp-zookeeper1 - cp-zookeeper2 - cp-zookeeper3 cp-kafka2: image: confluentinc/cp-kafka:5.3.0 container_name: cp-kafka2 ports: - "29092:29092" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka2:29092 depends_on: - cp-zookeeper1 - cp-zookeeper2 - cp-zookeeper3 cp-kafka3: image: confluentinc/cp-kafka:5.3.0 container_name: cp-kafka3 ports: - "39092:39092" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka3:39092 depends_on: - cp-zookeeper1 - cp-zookeeper2 - cp-zookeeper3 cp-kafka4: image: confluentinc/cp-kafka:5.3.0 container_name: cp-kafka4 ports: - "49092:49092" environment: KAFKA_BROKER_ID: 4 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka4:49092 depends_on: - cp-zookeeper1 - cp-zookeeper2 - cp-zookeeper3 cp-kafka5: image: confluentinc/cp-kafka:5.3.0 container_name: cp-kafka5 ports: - "59092:59092" environment: KAFKA_BROKER_ID: 5 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka5:59092 depends_on: - cp-zookeeper1 - cp-zookeeper2 - cp-zookeeper3
- confluentinc/cp-zookeeper, confluentinc/cp-kafka の最新バージョンが 5.3.0 になっていたので、このバージョンを使用します。
- zookeeper の cluster 構成は以下のように設定します。
- コンテナ名は cp-zookeeper1, cp-zookeeper2, cp-zookeeper3 と末尾に連番の数字(1~3)を付けた名前にします。
- ポート番号は単体サーバの時は 2181 を使用しましたが、cluster 構成では cp-zookeeper1(12181番), cp-zookeeper2(22181番), cp-zookeeper3(32181番)を使用します。
- ZOOKEEPER_SERVER_ID には各コンテナで一意の数字になるよう 1~3 の値を設定します。
- ZOOKEEPER_INIT_LIMIT, ZOOKEEPER_SYNC_LIMIT, ZOOKEEPER_SERVERS は cp-docker-images/examples/kafka-cluster/docker-compose.yml を参考に設定します。
- kafka の cluster 構成は以下のように設定します。
- コンテナ名は cp-kafka1, cp-kafka2, cp-kafka3, cp-kafka4, cp-kafka5 と末尾に連番の数字(1~5)を付けた名前にします。
- ポート番号は単体サーバの時は 9092 を使用しましたが、cluster 構成では cp-kafka1(19092番), cp-kafka2(29092番), cp-kafka3(39092番), cp-kafka4(49092番), cp-kafka5(59092番)を使用します。
- KAFKA_BROKER_ID には各コンテナで一意の数字になるよう 1~5 の値を設定します。
- KAFKA_ZOOKEEPER_CONNECT には cluster 構成に変更した cp-zookeeper コンテナ名+ポート番号をカンマ区切りで列挙します。
- KAFKA_ADVERTISED_LISTENERS には
PLAINTEXT://<cp-kafka コンテナ名>:<コンテナで使用するポート番号>
を記述します。 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP、KAFKA_INTER_BROKER_LISTENER_NAME、KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR の設定は入れません。
- depends_on に cp-zookeeper コンテナ名を列挙します。
docker-compose up -d
で起動し、kafkacat で broker が追加されたことを確認する
kafka cluster は起動しても broker が追加されるまで少し時間がかかります。broker が追加されたことを確認できるよう confluentinc/cp-kafkacat コンテナを利用して kafkacat コマンドを使用できるようにします。
kafka cluster を起動する前に docker run --rm --name cp-kafkacat confluentinc/cp-kafkacat:5.3.0
コマンドを実行して cp-kafkacat の docker image をダウンロードしておきます。
※kafkacat の usage が表示されます。
kafka cluster を起動します。docker-compose up -d
コマンドを実行した後、
docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L
コマンドを実行して broker が追加されるまで待ちます。最初のうちは ERROR: Failed to acquire metadata: Local: Broker transport failure
のメッセージが表示されますが、しばらくすると broker が表示されるようになります。topics の情報まで表示されるようになったら完了です。
Topic1 を 3 partition and 3 replicas で作成する
cp-kafka1 コンテナに接続した後、以下のコマンドを実行して Topic1 を作成・確認します。
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
kafka-topics --zookeeper cp-zookeeper1:12181 --topic Topic1 --describe
※topic の各 partition の一番右側に表示されている isrs は in-sync replica(isr)+複数形の s です。
docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L
コマンドを実行すると、こちらでも追加された Topic1 の情報が表示されます。
cp-kafka2 で kafka-console-producer を、cp-kafka3 で kafka-console-consumer を実行しメッセージを送受信してみる
cp-kafka2 コンテナに接続して kafka-console-producer --broker-list localhost:29092 --topic Topic1
コマンドを実行し、cp-kafka3 コンテナに接続して kafka-console-consumer --bootstrap-server localhost:39092 --topic Topic1
コマンドを実行して、メッセージの送受信ができることを確認してみます。
別の kafka に接続していますが、問題なくメッセージの送受信が出来ました。
またメッセージを送受信した後で docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L
コマンドを実行すると topic "__consumer_offsets" with 50 partitions
というものが作成されていました(しかもレプリケーションされています)。
Spring Integration DSL のアプリケーションでメッセージの送受信ができるようにする
※Spring Integration DSL のアプリケーションは 前回 の内容から「メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、9 なら partition 2 へ送信する」の変更だけ元に戻しました。
ホスト名が localhost だと Docker Compose で起動した kafka cluster と通信ができないため、コンテナ名で通信できるよう以下の設定を hosts ファイルに記述します。
127.0.0.1 cp-kafka1 cp-kafka2 cp-kafka3 cp-kafka4 cp-kafka5
src/main/resources/application.properties を以下のように変更します。
spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092 spring.kafka.consumer.group-id=topic1-consumer-group1
spring.kafka.consumer.bootstrap-servers=localhost:9092
を削除し、spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
を追加します。producer、consumer で同じサーバを指定する場合にはspring.kafka.bootstrap-servers
で指定すればよいことに気づきました。。。
アプリケーションを起動すると最初に consumer が受信する topic の partition が調整された後、以前と同様にメッセージを送受信するようになりました。
次回は broker(kafka サーバ)を停止した時にどのような動きをするのか見てみます。
履歴
2019/08/16
初版発行。