Spring Boot + Spring Integration でいろいろ試してみる ( その39 )( Docker Compose でサーバを構築する、Kafka 編6 - cp-schema-registry を追加し Apache Avro を使用する )
概要
記事一覧はこちらです。
今回は Schema Registry のサーバを起動して Apache Avro でメッセージを Serialize、Deserialize してみます。
参照したサイト・書籍
cp-docker-images/examples/multi-datacenter/docker-compose.yml
https://github.com/confluentinc/cp-docker-images/blob/5.3.0-post/examples/multi-datacenter/docker-compose.ymlspring-integration-extensions/samples/kafka/build.gradle
https://github.com/spring-projects/spring-integration-extensions/blob/master/samples/kafka/build.gradlecommercehub-oss/gradle-avro-plugin
https://github.com/commercehub-oss/gradle-avro-pluginKafka Tutorial: Kafka, Avro Serialization and the Schema Registry
http://cloudurable.com/blog/kafka-avro-schema-registry/index.htmlInstall Clients
https://docs.confluent.io/current/clients/install.htmlSchema Registry Tutorial
https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.htmlSpring Boot + Kafka + Schema Registry
https://medium.com/@sunilvb/spring-boot-kafka-schema-registry-c6e32485a7c8Schema Registry Serializer and Formatter
https://docs.confluent.io/current/schema-registry/serializer-formatter.htmlSchema Evolution and Compatibility
https://docs.confluent.io/current/schema-registry/avro.htmlKafka and Avro with Confluent Schema Registry
https://www.slideshare.net/JeanPaulAzar1/kafka-and-avro-with-confluent-schema-registryKafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord
https://stackoverflow.com/questions/39606026/kafkaavrodeserializer-does-not-return-specificrecord-but-returns-genericrecord
目次
- cp-schema-registry を追加する
- build.gradle を変更する
- 今回使用するメッセージの avro スキーマの定義ファイルを作成する
- producer でメッセージを送信して consumer で受信する(スキーマは自動登録する)
手順
cp-schema-registry を追加する
cp-schema-registry を3サーバ立てて haproxy 経由でアクセスできるようにします。
まず haproxy の設定ファイルを作成します。プロジェクトのルートディレクトリ直下に docker/cp-schema-registry ディレクトリを新規作成した後、その下に haproxy.cfg というファイルを新規作成して以下の内容を記述します。
global log 127.0.0.1 local1 maxconn 4096 defaults log global mode tcp option tcplog retries 3 option redispatch maxconn 2000 timeout connect 5000 timeout client 50000 timeout server 50000 listen stats bind *:1936 mode http stats enable stats hide-version stats realm Haproxy\ Statistics stats refresh 5s stats uri /haproxy-cp-schema-registry?stats listen cp-schema-registry bind *:8081 mode tcp balance roundrobin timeout client 3h timeout server 3h option clitcpka server cp-schema-registry1 cp-schema-registry1:8081 check inter 5s rise 2 fall 3 server cp-schema-registry2 cp-schema-registry2:8081 check inter 5s rise 2 fall 3 server cp-schema-registry3 cp-schema-registry3:8081 check inter 5s rise 2 fall 3
docker-compose.yml を以下のように変更します。
.......... # Schema Registry cp-schema-registry1: image: confluentinc/cp-schema-registry:5.3.0 container_name: cp-schema-registry1 environment: SCHEMA_REGISTRY_HOST_NAME: cp-schema-registry1 SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092" SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081" SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: ERROR depends_on: - cp-kafka1 - cp-kafka2 - cp-kafka3 - cp-kafka4 - cp-kafka5 # broker の登録が完了する前にタイムアウトでコンテナが終了してしまう場合があるので restart: always を指定する restart: always cp-schema-registry2: image: confluentinc/cp-schema-registry:5.3.0 container_name: cp-schema-registry2 environment: SCHEMA_REGISTRY_HOST_NAME: cp-schema-registry2 SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092" SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081" SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: ERROR depends_on: - cp-schema-registry1 # broker の登録が完了する前にタイムアウトでコンテナが終了してしまう場合があるので restart: always を指定する restart: always cp-schema-registry3: image: confluentinc/cp-schema-registry:5.3.0 container_name: cp-schema-registry3 environment: SCHEMA_REGISTRY_HOST_NAME: cp-schema-registry3 SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092" SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081" SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: ERROR depends_on: - cp-schema-registry1 # broker の登録が完了する前にタイムアウトでコンテナが終了してしまう場合があるので restart: always を指定する restart: always # http://localhost:1936/haproxy-cp-schema-registry?stats haproxy-cp-schema-registry: image: haproxy:2.0.5-alpine container_name: haproxy-cp-schema-registry ports: - "1936:1936" - "8081:8081" environment: - TZ=Asia/Tokyo volumes: - ./docker/cp-schema-registry/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro depends_on: - cp-schema-registry1 - cp-schema-registry2 - cp-schema-registry3
- 以下4つのコンテナを追加します。
- cp-schema-registry1
- cp-schema-registry2
- cp-schema-registry3
- haproxy-cp-schema-registry
- docker-compose.yml に cp-kafka1~5 の5サーバ定義していて Kafka Cluster が利用可能になるまで時間がかかるため、cp-schema-registry が Kafka を利用可能と判断できる前にタイムアウトしてコンテナが終了してしまう場合がありました。
restart: always
の記述は必須です。コンテナが終了しても自動で起動し直します。
docker-compose up -d
コマンドで起動します。
http://localhost:1936/haproxy-cp-schema-registry?stats にアクセスして cp-schema-registry1~3 が緑色になるまで待ちます。
curl -X GET http://localhost:8081/subjects
コマンドを実行すると、まだ何もスキーマが登録されていないことが確認できます。
build.gradle を変更する
build.gradle に Apache Avro を利用するために必要な記述を追加します。
buildscript { repositories { mavenCentral() gradlePluginPortal() } dependencies { classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0" } } .......... apply plugin: "io.spring.dependency-management" // com.commercehub.gradle.plugin.avro を追加すると src/main/avro, src/test/avro が作成される apply plugin: "com.commercehub.gradle.plugin.avro" .......... repositories { mavenCentral() maven { url "https://packages.confluent.io/maven/" } } .......... dependencies { def lombokVersion = "1.18.8" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.boot:spring-boot-starter-json") { exclude group: "org.springframework", module: "spring-web" } implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE") testImplementation("org.springframework.boot:spring-boot-starter-test") // for lombok // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している annotationProcessor("org.projectlombok:lombok:${lombokVersion}") compileOnly("org.projectlombok:lombok:${lombokVersion}") // for JUnit 5 testCompile("org.junit.jupiter:junit-jupiter") testRuntime("org.junit.platform:junit-platform-launcher") // for Apache Avro implementation("org.apache.avro:avro:1.9.0") implementation("io.confluent:kafka-avro-serializer:5.3.0") } avro { fieldVisibility = "PRIVATE" } ..........
- buildscript block を追加します。この中に commercehub-oss/gradle-avro-plugin を利用するための
classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0"
を記述します。 apply plugin: "com.commercehub.gradle.plugin.avro"
を追加します。- repositories block に
maven { url "https://packages.confluent.io/maven/" }
を追加します。dependencies block に記述するimplementation("io.confluent:kafka-avro-serializer:5.3.0")
の取得先になります(Maven Repository には 5.2.2 までしかありませんでした)。 - dependencies block に以下の2行を追加します。
implementation("org.apache.avro:avro:1.9.0")
implementation("io.confluent:kafka-avro-serializer:5.3.0")
- avro block を追加します。
変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新すると src/main/avro、src/test/avro が追加されます。
今回使用するメッセージの avro スキーマの定義ファイルを作成する
src/main/avro の下に Counter.avsc というファイルを新規作成し、以下の内容を記述します。フォーマットは JSON です。今回は count と fullName という2つのフィールドを持つメッセージを送受信します。
{ "namespace": "ksbysample.eipapp.kafka.avro", "type": "record", "name": "Counter", "doc": "???", "fields" : [ { "name": "count", "type": "int" }, { "name": "fullName", "type": "string" } ] }
ファイル作成後、gradle に追加された generateAvroJava タスクを実行します。
build/generated-main-avro-java/ksbysample/eipapp/kafka/avro/Counter.java が生成されます。.java ファイルはここに生成されますが、src/main/java の下のソースファイルから参照可能です(import して利用できます)。
producer でメッセージを送信して consumer で受信する(スキーマは自動登録する)
application.properties を以下のように変更します。
spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092 spring.kafka.properties.schema.registry.url=http://localhost:8081 spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer spring.kafka.consumer.properties.specific.avro.reader=true spring.kafka.consumer.group-id=topic1-consumer-group1 logging.level.org.apache.kafka.clients.NetworkClient=OFF
- 以下の6行を追加します。
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.specific.avro.reader=true
の設定がないと受信したメッセージを Counter クラスに変換してくれません。java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class ksbysample.eipapp.kafka.avro.Counter
のエラーメッセージが出力されます。
src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java の以下の点を変更します。
import ksbysample.eipapp.kafka.avro.Counter; .......... @Slf4j @Configuration public class KafkaDslSampleConfig { .......... // 設定は application.properties の spring.kafka.producer.~ で行う private final ProducerFactory<Integer, Counter> kafkaProducerFactory; // 設定は application.properties の spring.kafka.consumer.~ で行う private final ConsumerFactory<Integer, Counter> kafkaConsumerFactory; .......... private static final String[] FULL_NAMES = new String[]{"田中 太郎", "鈴木 花子", "木村 さくら"}; public KafkaDslSampleConfig(ProducerFactory<Integer, Counter> kafkaProducerFactory , ConsumerFactory<Integer, Counter> kafkaConsumerFactory , MessageChannel errorChannel) { .......... } @Bean public Supplier<Integer> countSupplier() { return () -> this.count.addAndGet(1); } @Bean public IntegrationFlow topic1ProducerFlow() { return IntegrationFlows .from(countSupplier() , e -> e.poller(Pollers.fixedDelay(1000))) // メッセージの kafka_topic ヘッダに topic 名をセットすると // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく // kafka_topic ヘッダの topic に送信される // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME)) .<Integer, Counter>transform(p -> Counter.newBuilder() .setCount(p) .setFullName(FULL_NAMES[ThreadLocalRandom.current().nextInt(3)]) .build()) .log(LoggingHandler.Level.WARN) .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME) , e -> e.advice(retryAdvice())) .get(); } .......... private KafkaProducerMessageHandlerSpec<Integer, Counter, ?> kafkaMessageHandler( ProducerFactory<Integer, Counter> producerFactory, String topic) { .......... } @Bean public IntegrationFlow topic1Consumer1Flow() { return IntegrationFlows .from(createKafkaMessageDrivenChannelAdapter()) .<Counter>handle((p, h) -> { log.error(String.format("★★★ partition = %s, count = %s, fullName = %s" , h.get("kafka_receivedPartitionId"), p.getCount(), p.getFullName())); return null; }) .get(); } .......... private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, Counter> createKafkaMessageDrivenChannelAdapter() { .......... } .......... }
大半は Value の型の変更(String → Counter)です。
- フィールドで定義している ProducerFactory, ConsumerFactory の Value の型を String → Counter に変更します。
ProducerFactory<Integer, String> kafkaProducerFactory;
→ProducerFactory<Integer, Counter> kafkaProducerFactory;
ConsumerFactory<Integer, Counter> kafkaConsumerFactory;
→ConsumerFactory<Integer, Counter> kafkaConsumerFactory;
- fullName にセットするためのデータを定義した
private static final String[] FULL_NAMES = new String[]{"田中 太郎", "鈴木 花子", "木村 さくら"};
を追加します。 - コンストラクタの引数の ProducerFactory, ConsumerFactory の Value の型を String → Counter に変更します。
ProducerFactory<Integer, String> kafkaProducerFactory
→ProducerFactory<Integer, Counter> kafkaProducerFactory
ConsumerFactory<Integer, String> kafkaConsumerFactory
→ConsumerFactory<Integer, Counter> kafkaConsumerFactory
- countSupplier メソッドの以下の点を変更します。
- 戻り値の型を
Supplier<String>
→Supplier<Integer>
に変更します。 return () -> String.valueOf(this.count.addAndGet(1));
→return () -> this.count.addAndGet(1);
に変更します。
- 戻り値の型を
- topic1ProducerFlow メソッドの以下の点を変更します。
.<Integer, Counter>transform(...)
を追加します。ここで送信する Counter クラスのメッセージを生成します。
- kafkaMessageHandler メソッドの以下の点を変更します。
- 戻り値の型を
KafkaProducerMessageHandlerSpec<Integer, String, ?>
→KafkaProducerMessageHandlerSpec<Integer, Counter, ?>
に変更します。 - 第1引数を
ProducerFactory<Integer, String> producerFactory
→ProducerFactory<Integer, Counter> producerFactory
に変更します。
- 戻り値の型を
- topic1Consumer1Flow ~ topic1Consumer3Flow メソッドの以下の点を変更します。
.handle((p, h) -> { ... })
→.<Counter>handle((p, h) -> { ... })
に変更します。- p.getCount(), p.getFullName() をログに出力するようにします。
- createKafkaMessageDrivenChannelAdapter メソッドの戻り値の型を
<Integer, String>
→<Integer, Counter>
に変更します。
src/test/java/ksbysample/eipapp/kafka/KafkaSendAndReceiveTest.java のテストはもう動かなくなっているので、クラスに @Disabled アノテーションを付加してテストが実行されないようにします。
@Disabled @Slf4j @SpringBootTest public class KafkaSendAndReceiveTest {
以上で変更は終了です。build タスクを実行した後、
アプリケーションを実行するとメッセージの送受信が正常に行われていることが確認できます。
またアプリケーション実行直後に ConsumerConfig、ProducerConfig 以外に KafkaAvroDeserializerConfig、KafkaAvroSerializerConfig の設定が出力されていました。
2019-09-01 00:24:17.210 INFO 7592 --- [ main] i.c.k.s.KafkaAvroDeserializerConfig : KafkaAvroDeserializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [http://localhost:8081] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN specific.avro.reader = true value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy .......... 2019-09-01 00:24:17.251 INFO 7592 --- [ask-scheduler-1] i.c.k.s.KafkaAvroSerializerConfig : KafkaAvroSerializerConfig values: bearer.auth.token = [hidden] schema.registry.url = [http://localhost:8081] basic.auth.user.info = [hidden] auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
curl -X GET http://localhost:8081/subjects
コマンドを実行して登録されているスキーマを見ると Topic1-Value が登録されていました。
スキーマ名が Counter ではなく Topic1-Value なのは Schema Registry Serializer and Formatter に記載されている TopicNameStrategy によるものです。ログに出力されている KafkaAvroSerializerConfig、KafkaAvroDeserializerConfig の設定にも value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
が出力されています。
履歴
2019/09/01
初版発行。