Spring Boot + Spring Integration でいろいろ試してみる ( その35 )( Docker Compose でサーバを構築する、Kafka 編2 )
概要
記事一覧はこちらです。
Kafka+zookeeper の環境を Docker Compose で構築して Spring Integration を利用したサンプルを作成するだけのつもりでいたのですが、Kafka や Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) を読んでみたら結構面白いサーバであることが分かったので、しばらくいろいろ試してみることにします。
- docker-compose.yml を confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう書き直します。
- partition を複数作成すれば複数の consumer で分散して受信できるようなので試します。
参照したサイト・書籍
Kafka
- 作者: Neha Narkhede,Gwen Shapira,Todd Palino,上岡真也,笹井崇司
- 出版社/メーカー: オライリージャパン
- 発売日: 2018/08/03
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
Apache Kafka 分散メッセージングシステムの構築と活用
Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE)
- 作者: 株式会社NTTデータ,佐々木徹,岩崎正剛,猿田浩輔,都築正宜,吉田耕陽,下垣徹,土橋昌
- 出版社/メーカー: 翔泳社
- 発売日: 2018/10/30
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
confluentinc/cp-kafka(Docker Hub のページ)
https://hub.docker.com/r/confluentinc/cp-kafkaconfluentinc/cp-zookeeper(Docker Hub のページ)
https://hub.docker.com/r/confluentinc/cp-zookeeperconfluentinc/cp-docker-images
https://github.com/confluentinc/cp-docker-images- examples の下に Kafka を cluster 構成にする docker-compose.yml のサンプルがあります。
Confluent Platform Docker Images
https://docs.confluent.io/current/installation/docker/index.htmlConfluent Platform Quick Start (Docker)
https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html- control-center や ksql 等 confluentinc/cp-docker-images で入れられる機能を試したい場合の手順が記載されています(ただし今回の記事では試していません)。
Generic Spring Kafka Listener
https://stackoverflow.com/questions/42045033/generic-spring-kafka-listener- @KafkaListener による consumer で topicPartitions 属性で partition を指定したい場合の例が書かれています。
目次
- confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう docker-compose.yml を書き直す
- topic に partition を複数作成すればメッセージの受信を分散できる
手順
confluentinc/cp-kafka、confluentinc/cp-zookeeper を利用するよう docker-compose.yml を書き直す
Spring Boot + Spring Integration でいろいろ試してみる ( その34 )( Docker Compose でサーバを構築する、Kafka 編 ) で Kafka の Docker コンテナを作成した時に Kafka は Official Images がないのかな?と思っていたのですが、Apache Kafka 分散メッセージングシステムの構築と活用 (NEXT ONE) を読むと Kafka のコミッタの半数以上が Confluent という会社に所属していることとインストールで Confluent が配布している Confluent Platform の OSS 版を利用することが記載されていました。
Docker Hub で "confluent" で検索してみると confluentinc/cp-kafka と confluentinc/cp-zookeeper を見つけたので、この2つを利用するよう docker-compose.yml を書き直すことにします。
cp-docker-images/examples/ の下にサンプルが多数ありますので、今回は kafka-single-node/docker-compose.yml を参考にします(というよりほぼコピペです)。
Conflulent の Docker Image で利用可能なオプションは Docker Configuration に記載があります。
version: '3' services: ############################################################################# # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it cp-zookeeper /bin/bash ############################################################################# # 単体 zookeeper cp-zookeeper: image: confluentinc/cp-zookeeper:5.2.2 container_name: cp-zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ############################################################################# # 起動したコンテナに /bin/sh でアクセスする場合には以下のコマンドを実行する # docker exec -it cp-kafka /bin/bash ############################################################################# # 単体 Kafka # topic の作成・一覧取得・詳細表示 # kafka-topics --zookeeper cp-zookeeper:2181 --create --topic Topic1 --partitions 1 --replication-factor 1 --if-not-exists # kafka-topics --zookeeper cp-zookeeper:2181 --alter --topic Topic1 --partitions 3 # kafka-topics --zookeeper cp-zookeeper:2181 --list # kafka-topics --zookeeper cp-zookeeper:2181 --topic Topic1 --describe # kafka-console-producer --broker-list localhost:9092 --topic Topic1 # kafka-console-consumer --bootstrap-server localhost:9092 --topic Topic1 cp-kafka: image: confluentinc/cp-kafka:5.2.2 container_name: cp-kafka ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - cp-zookeeper
docker-compose up -d
コマンドを実行します。
IntelliJ IDEA の docker plugin を見ると kafka と zookeeper のコンテナが起動していることが確認できます。
topic を作成します。
kafka-console-producer
、kafka-console-consumer
コマンドでメッセージの送受信が出来ることを確認します。
wurstmeister/kafka と比較すると Topic を作成するのに KAFKA_CREATE_TOPICS が使用できなくなりますが(コンテナに接続して kafka-topics --create
コマンドを実行する必要があります)、cp-docker-images/examples/cp-all-in-one/docker-compose.yml を見ると control-center(管理画面らしい)や schema-registry(serializer, deserializer に Apache Avro を利用するのに必要らしい)、 ksql-server, ksql-cli(KSQLが試せる?)等もあるようなので、Kafka のいろいろな機能を試すならこちらの方が良さそうな気がします。
topic に partition を複数作成すればメッセージの受信を分散できる
partition が1つだけだと1つの consumer しか受信できませんでしたが、複数の partition を作成すれば複数の consumer で分散して受信できるようです。
Topic1 を partition = 3 で作成して試してみます。
src/test/java/ksbysample/eipapp/kafka/KafkaSendAndReceiveTest.java を以下のように変更してメッセージを 10件連続で送信できるようにする、かつ @KafkaListener による consumer は削除する、ようにします。またメッセージ送信時に正常に送信できていることを確認するよう kafkaTemplate.send(TOPIC_NAME, String.valueOf(i));
→ kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get();
に変更します(こうすると送信時にエラーが出た時には例外が throw されます)。
@Test void sendToKafkaTest() throws ExecutionException, InterruptedException { // kafkaTemplate.send(TOPIC_NAME, "test message"); for (int i = 1; i <= 10; i++) { kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get(); } // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する // SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { // @KafkaListener(topics = TOPIC_NAME) // public void listenByConsumerRecord(ConsumerRecord<?, ?> cr) { // log.warn(cr.toString()); // } }
コマンドプロンプトを2つ起動して、どちらも kafka-console-consumer --bootstrap-server localhost:9092 --topic Topic1 --group topic1-consumer-group1
を実行します。
テストを実行してメッセージを 10件送信すると、2つの consumer で重複せずにメッセージを受信するようになりました。
テストクラスで複数の consumer で分散して受信してみます。KafkaSendAndReceiveTest.java を以下のように変更して consumer を2つ作成してから、
@Test void sendToKafkaTest() throws ExecutionException, InterruptedException { // @KafkaListener の consumer が登録されるまで少し時間がかかるので 15秒 sleep する SleepUtils.sleep(15_000); // kafkaTemplate.send(TOPIC_NAME, "test message"); for (int i = 1; i <= 10; i++) { kafkaTemplate.send(TOPIC_NAME, String.valueOf(i)).get(); } // @KafkaListener でメッセージを受信するまで少し時間がかかるので 5秒 sleep する SleepUtils.sleep(5_000); } @TestConfiguration static class TestConfig { @KafkaListener(topics = TOPIC_NAME) public void listenByConsumerRecord1(ConsumerRecord<?, ?> cr) { log.warn(String.format("partition = %d, message = %s", cr.partition(), cr.value())); } @KafkaListener(topics = TOPIC_NAME) public void listenByConsumerRecord2(ConsumerRecord<?, ?> cr) { log.error(String.format("partition = %d, message = %s", cr.partition(), cr.value())); } }
テストを実行すると consumer が受信する partition がアサインされて、
それぞれの consumer でメッセージを受信します。この場合、partition = 0, 1 がアサインされたのが listenByConsumerRecord1 で、partition = 2 がアサインされたのが listenByConsumerRecord2 でした。
履歴
2019/07/17
初版発行。