かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その40 )( Docker Compose でサーバを構築する、Kafka 編7 - cp-schema-registry を追加し Apache Avro を使用する2 )

概要

記事一覧はこちらです。

前回に続き Apache Avro を使用したメッセージ送受信についてもう少し試してみます。

参照したサイト・書籍

目次

  1. kafka-avro-console-consumer で受信する
  2. スキーマを手動登録する
  3. TopicNameStrategy、RecordNameStrategy、TopicRecordNameStrategy を変えて動作を確認する

手順

kafka-avro-console-consumer で受信する

confluentinc/cp-schema-registry には kafka-avro-console-consumer コマンドがインストールされており、Avro で Serialize されたメッセージを受信することができます。

src/main/java/ksbysample./eipapp/kafka/KafkaDslSampleConfig.java の topic1Consumer1Flow ~ topic1Consumer3Flow メソッドをコメントアウトしてからアプリケーションを起動してメッセージを送信します。

f:id:ksby:20190901152429p:plain

docker exec -it cp-schema-registry1 /bin/bash コマンドで cp-schema-registry1 コンテナに接続した後、kafka-avro-console-consumer --topic Topic1 --bootstrap-server cp-kafka1:19092 --property schema.registry.url=http://localhost:8081 コマンドを実行するとメッセージを受信して count, fullName のフィールドの値が表示されます。

f:id:ksby:20190901152638p:plain

ちなみに docker exec -it cp-kafka1 /bin/bash コマンドで cp-kafka1 コンテナに接続してから kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic1 コマンドで受信すると以下の画像のように表示されました。

f:id:ksby:20190901152859p:plain

topic1Consumer1Flow ~ topic1Consumer3Flow メソッドのコメントアウトは元に戻します。

スキーマを手動登録する

スキーマを手動登録してメッセージの送受信をしてみます。

まずは src/main/resources/application.properties に spring.kafka.properties.auto.register.schemas=false の設定を追加してスキーマを自動登録しないようにします。

spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.properties.auto.register.schemas=false
spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.group-id=topic1-consumer-group1

logging.level.org.apache.kafka.clients.NetworkClient=OFF

docker-compose downdocker-compose up -d でコンテナを起動し直してから curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema":"{\"type\":\"record\",\"name\":\"Counter\",\"namespace\":\"ksbysample.eipapp.kafka.avro\",\"doc\":\"???\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"},{\"name\":\"fullName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"}' http://localhost:8081/subjects/Topic1-value/versions コマンドを実行してスキーマを登録します。

f:id:ksby:20190901155330p:plain

--data に指定する文字列は以下の手順で生成します。

  1. Counter.avsc から生成した build/generated-main-avro-java/ksbysample/eipapp/kafka/avro/Counter.java の中に SCHEMA$ という定数が定義されているので、org.apache.avro.Schema.Parser().parse メソッドの引数の文字列をコピーします。

    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Counter\",\"namespace\":\"ksbysample.eipapp.kafka.avro\",\"doc\":\"???\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"},{\"name\":\"fullName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");

  2. '{"schema":+<org.apache.avro.Schema.Parser().parse メソッドの引数の文字列>+}' のフォーマットの文字列を生成します。

アプリケーションを起動すると自動登録の時と同様にメッセージが送受信されました。

f:id:ksby:20190901155450p:plain

TopicNameStrategy、RecordNameStrategy、TopicRecordNameStrategy を変えて動作を確認する

TopicNameStrategy では Topic1-Value という名前でスキーマが登録されますが、RecordNameStrategy、TopicRecordNameStrategy ではそれぞれどうなるか確認してみます。

まずは RecordNameStrategy から。src/main/resources/application.properties に spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy を追加します。

spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
spring.kafka.properties.schema.registry.url=http://localhost:8081
# スキーマを自動登録させない場合には以下の行のコメントアウトを解除する
#spring.kafka.properties.auto.register.schemas=false

spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.group-id=topic1-consumer-group1

logging.level.org.apache.kafka.clients.NetworkClient=OFF

コンテナを起動し直してからアプリケーションを起動してメッセージが送受信できることを確認します。

f:id:ksby:20190901161722p:plain

curl -X GET http://localhost:8081/subjects コマンドを実行すると ksbysample.eipapp.kafka.avro.Counter という名前でスキーマが登録されていました。

f:id:ksby:20190901161903p:plain

次は TopicRecordNameStrategy。src/main/resources/application.properties の設定を spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy に変更します。

spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
spring.kafka.properties.schema.registry.url=http://localhost:8081
# スキーマを自動登録させない場合には以下の行のコメントアウトを解除する
#spring.kafka.properties.auto.register.schemas=false

#spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.group-id=topic1-consumer-group1

logging.level.org.apache.kafka.clients.NetworkClient=OFF

コンテナを起動し直してからアプリケーションを起動してメッセージが送受信できることを確認した後、curl -X GET http://localhost:8081/subjects コマンドを実行すると Topic1-ksbysample.eipapp.kafka.avro.Counter という名前でスキーマが登録されていました。

f:id:ksby:20190901162608p:plain

履歴

2019/09/01
初版発行。