Spring Boot + Spring Integration でいろいろ試してみる ( その40 )( Docker Compose でサーバを構築する、Kafka 編7 - cp-schema-registry を追加し Apache Avro を使用する2 )
概要
記事一覧はこちらです。
前回に続き Apache Avro を使用したメッセージ送受信についてもう少し試してみます。
参照したサイト・書籍
目次
- kafka-avro-console-consumer で受信する
- スキーマを手動登録する
- TopicNameStrategy、RecordNameStrategy、TopicRecordNameStrategy を変えて動作を確認する
手順
kafka-avro-console-consumer で受信する
confluentinc/cp-schema-registry には kafka-avro-console-consumer コマンドがインストールされており、Avro で Serialize されたメッセージを受信することができます。
src/main/java/ksbysample./eipapp/kafka/KafkaDslSampleConfig.java の topic1Consumer1Flow ~ topic1Consumer3Flow メソッドをコメントアウトしてからアプリケーションを起動してメッセージを送信します。
docker exec -it cp-schema-registry1 /bin/bash
コマンドで cp-schema-registry1 コンテナに接続した後、kafka-avro-console-consumer --topic Topic1 --bootstrap-server cp-kafka1:19092 --property schema.registry.url=http://localhost:8081
コマンドを実行するとメッセージを受信して count, fullName のフィールドの値が表示されます。
ちなみに docker exec -it cp-kafka1 /bin/bash
コマンドで cp-kafka1 コンテナに接続してから kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic1
コマンドで受信すると以下の画像のように表示されました。
topic1Consumer1Flow ~ topic1Consumer3Flow メソッドのコメントアウトは元に戻します。
スキーマを手動登録する
スキーマを手動登録してメッセージの送受信をしてみます。
まずは src/main/resources/application.properties に spring.kafka.properties.auto.register.schemas=false
の設定を追加してスキーマを自動登録しないようにします。
spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092 spring.kafka.properties.schema.registry.url=http://localhost:8081 spring.kafka.properties.auto.register.schemas=false spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.properties.specific.avro.reader=true spring.kafka.consumer.group-id=topic1-consumer-group1 logging.level.org.apache.kafka.clients.NetworkClient=OFF
docker-compose down
、docker-compose up -d
でコンテナを起動し直してから curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema":"{\"type\":\"record\",\"name\":\"Counter\",\"namespace\":\"ksbysample.eipapp.kafka.avro\",\"doc\":\"???\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"},{\"name\":\"fullName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"}' http://localhost:8081/subjects/Topic1-value/versions
コマンドを実行してスキーマを登録します。
--data
に指定する文字列は以下の手順で生成します。
Counter.avsc から生成した build/generated-main-avro-java/ksbysample/eipapp/kafka/avro/Counter.java の中に
SCHEMA$
という定数が定義されているので、org.apache.avro.Schema.Parser().parse
メソッドの引数の文字列をコピーします。public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Counter\",\"namespace\":\"ksbysample.eipapp.kafka.avro\",\"doc\":\"???\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"},{\"name\":\"fullName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
'{"schema":
+<org.apache.avro.Schema.Parser().parse メソッドの引数の文字列>+}'
のフォーマットの文字列を生成します。
アプリケーションを起動すると自動登録の時と同様にメッセージが送受信されました。
TopicNameStrategy、RecordNameStrategy、TopicRecordNameStrategy を変えて動作を確認する
TopicNameStrategy では Topic1-Value
という名前でスキーマが登録されますが、RecordNameStrategy、TopicRecordNameStrategy ではそれぞれどうなるか確認してみます。
まずは RecordNameStrategy から。src/main/resources/application.properties に spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
を追加します。
spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092 spring.kafka.properties.schema.registry.url=http://localhost:8081 # スキーマを自動登録させない場合には以下の行のコメントアウトを解除する #spring.kafka.properties.auto.register.schemas=false spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy #spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.properties.specific.avro.reader=true spring.kafka.consumer.group-id=topic1-consumer-group1 logging.level.org.apache.kafka.clients.NetworkClient=OFF
コンテナを起動し直してからアプリケーションを起動してメッセージが送受信できることを確認します。
curl -X GET http://localhost:8081/subjects
コマンドを実行すると ksbysample.eipapp.kafka.avro.Counter
という名前でスキーマが登録されていました。
次は TopicRecordNameStrategy。src/main/resources/application.properties の設定を spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
に変更します。
spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092 spring.kafka.properties.schema.registry.url=http://localhost:8081 # スキーマを自動登録させない場合には以下の行のコメントアウトを解除する #spring.kafka.properties.auto.register.schemas=false #spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.properties.specific.avro.reader=true spring.kafka.consumer.group-id=topic1-consumer-group1 logging.level.org.apache.kafka.clients.NetworkClient=OFF
コンテナを起動し直してからアプリケーションを起動してメッセージが送受信できることを確認した後、curl -X GET http://localhost:8081/subjects
コマンドを実行すると Topic1-ksbysample.eipapp.kafka.avro.Counter
という名前でスキーマが登録されていました。
履歴
2019/09/01
初版発行。