かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その39 )( Docker Compose でサーバを構築する、Kafka 編6 - cp-schema-registry を追加し Apache Avro を使用する )

概要

記事一覧はこちらです。

今回は Schema Registry のサーバを起動して Apache Avro でメッセージを Serialize、Deserialize してみます。

参照したサイト・書籍

  1. cp-docker-images/examples/multi-datacenter/docker-compose.yml
    https://github.com/confluentinc/cp-docker-images/blob/5.3.0-post/examples/multi-datacenter/docker-compose.yml

  2. spring-integration-extensions/samples/kafka/build.gradle
    https://github.com/spring-projects/spring-integration-extensions/blob/master/samples/kafka/build.gradle

  3. commercehub-oss/gradle-avro-plugin
    https://github.com/commercehub-oss/gradle-avro-plugin

  4. Kafka Tutorial: Kafka, Avro Serialization and the Schema Registry
    http://cloudurable.com/blog/kafka-avro-schema-registry/index.html

  5. Install Clients
    https://docs.confluent.io/current/clients/install.html

  6. Schema Registry Tutorial
    https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html

  7. Spring Boot + Kafka + Schema Registry
    https://medium.com/@sunilvb/spring-boot-kafka-schema-registry-c6e32485a7c8

  8. Schema Registry Serializer and Formatter
    https://docs.confluent.io/current/schema-registry/serializer-formatter.html

  9. Schema Evolution and Compatibility
    https://docs.confluent.io/current/schema-registry/avro.html

  10. Kafka and Avro with Confluent Schema Registry
    https://www.slideshare.net/JeanPaulAzar1/kafka-and-avro-with-confluent-schema-registry

  11. KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord
    https://stackoverflow.com/questions/39606026/kafkaavrodeserializer-does-not-return-specificrecord-but-returns-genericrecord

目次

  1. cp-schema-registry を追加する
  2. build.gradle を変更する
  3. 今回使用するメッセージの avro スキーマの定義ファイルを作成する
  4. producer でメッセージを送信して consumer で受信する(スキーマは自動登録する)

手順

cp-schema-registry を追加する

cp-schema-registry を3サーバ立てて haproxy 経由でアクセスできるようにします。

f:id:ksby:20190831184848p:plain

まず haproxy の設定ファイルを作成します。プロジェクトのルートディレクトリ直下に docker/cp-schema-registry ディレクトリを新規作成した後、その下に haproxy.cfg というファイルを新規作成して以下の内容を記述します。

global
    log 127.0.0.1   local1
    maxconn 4096

defaults
    log     global
    mode    tcp
    option  tcplog
    retries 3
    option redispatch
    maxconn 2000
    timeout connect 5000
    timeout client 50000
    timeout server 50000

listen stats
    bind *:1936
    mode http
    stats enable
    stats hide-version
    stats realm Haproxy\ Statistics
    stats refresh 5s
    stats uri /haproxy-cp-schema-registry?stats

listen cp-schema-registry
    bind *:8081
    mode            tcp
    balance         roundrobin
    timeout client  3h
    timeout server  3h
    option          clitcpka
    server          cp-schema-registry1 cp-schema-registry1:8081  check inter 5s rise 2 fall 3
    server          cp-schema-registry2 cp-schema-registry2:8081  check inter 5s rise 2 fall 3
    server          cp-schema-registry3 cp-schema-registry3:8081  check inter 5s rise 2 fall 3

docker-compose.yml を以下のように変更します。

  ..........

  # Schema Registry
  cp-schema-registry1:
    image: confluentinc/cp-schema-registry:5.3.0
    container_name: cp-schema-registry1
    environment:
      SCHEMA_REGISTRY_HOST_NAME: cp-schema-registry1
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092"
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: ERROR
    depends_on:
      - cp-kafka1
      - cp-kafka2
      - cp-kafka3
      - cp-kafka4
      - cp-kafka5
    # broker の登録が完了する前にタイムアウトでコンテナが終了してしまう場合があるので restart: always を指定する
    restart: always
  cp-schema-registry2:
    image: confluentinc/cp-schema-registry:5.3.0
    container_name: cp-schema-registry2
    environment:
      SCHEMA_REGISTRY_HOST_NAME: cp-schema-registry2
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092"
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: ERROR
    depends_on:
      - cp-schema-registry1
    # broker の登録が完了する前にタイムアウトでコンテナが終了してしまう場合があるので restart: always を指定する
    restart: always
  cp-schema-registry3:
    image: confluentinc/cp-schema-registry:5.3.0
    container_name: cp-schema-registry3
    environment:
      SCHEMA_REGISTRY_HOST_NAME: cp-schema-registry3
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092"
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: ERROR
    depends_on:
      - cp-schema-registry1
    # broker の登録が完了する前にタイムアウトでコンテナが終了してしまう場合があるので restart: always を指定する
    restart: always
  # http://localhost:1936/haproxy-cp-schema-registry?stats
  haproxy-cp-schema-registry:
    image: haproxy:2.0.5-alpine
    container_name: haproxy-cp-schema-registry
    ports:
      - "1936:1936"
      - "8081:8081"
    environment:
      - TZ=Asia/Tokyo
    volumes:
      - ./docker/cp-schema-registry/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
    depends_on:
      - cp-schema-registry1
      - cp-schema-registry2
      - cp-schema-registry3
  • 以下4つのコンテナを追加します。
    • cp-schema-registry1
    • cp-schema-registry2
    • cp-schema-registry3
    • haproxy-cp-schema-registry
  • docker-compose.yml に cp-kafka1~5 の5サーバ定義していて Kafka Cluster が利用可能になるまで時間がかかるため、cp-schema-registry が Kafka を利用可能と判断できる前にタイムアウトしてコンテナが終了してしまう場合がありました。restart: always の記述は必須です。コンテナが終了しても自動で起動し直します。

docker-compose up -d コマンドで起動します。

f:id:ksby:20190831192022p:plain

http://localhost:1936/haproxy-cp-schema-registry?stats にアクセスして cp-schema-registry1~3 が緑色になるまで待ちます。

f:id:ksby:20190831192211p:plain

curl -X GET http://localhost:8081/subjects コマンドを実行すると、まだ何もスキーマが登録されていないことが確認できます。

f:id:ksby:20190831192458p:plain

build.gradle を変更する

build.gradle に Apache Avro を利用するために必要な記述を追加します。

buildscript {
    repositories {
        mavenCentral()
        gradlePluginPortal()
    }
    dependencies {
        classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0"
    }
}

..........

apply plugin: "io.spring.dependency-management"
// com.commercehub.gradle.plugin.avro を追加すると src/main/avro, src/test/avro が作成される
apply plugin: "com.commercehub.gradle.plugin.avro"

..........

repositories {
    mavenCentral()
    maven {
        url "https://packages.confluent.io/maven/"
    }
}

..........

dependencies {
    def lombokVersion = "1.18.8"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.boot:spring-boot-starter-json") {
        exclude group: "org.springframework", module: "spring-web"
    }
    implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")

    // for Apache Avro
    implementation("org.apache.avro:avro:1.9.0")
    implementation("io.confluent:kafka-avro-serializer:5.3.0")
}

avro {
    fieldVisibility = "PRIVATE"
}

..........
  • buildscript block を追加します。この中に commercehub-oss/gradle-avro-plugin を利用するための classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0" を記述します。
  • apply plugin: "com.commercehub.gradle.plugin.avro" を追加します。
  • repositories block に maven { url "https://packages.confluent.io/maven/" } を追加します。dependencies block に記述する implementation("io.confluent:kafka-avro-serializer:5.3.0") の取得先になります(Maven Repository には 5.2.2 までしかありませんでした)。
  • dependencies block に以下の2行を追加します。
    • implementation("org.apache.avro:avro:1.9.0")
    • implementation("io.confluent:kafka-avro-serializer:5.3.0")
  • avro block を追加します。

変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新すると src/main/avro、src/test/avro が追加されます。

f:id:ksby:20190831203538p:plain

今回使用するメッセージの avro スキーマの定義ファイルを作成する

src/main/avro の下に Counter.avsc というファイルを新規作成し、以下の内容を記述します。フォーマットは JSON です。今回は count と fullName という2つのフィールドを持つメッセージを送受信します。

{
    "namespace": "ksbysample.eipapp.kafka.avro",
    "type": "record",
    "name": "Counter",
    "doc": "???",
    "fields" : [
        { "name": "count", "type": "int" },
        { "name": "fullName", "type": "string" }
    ]
}

ファイル作成後、gradle に追加された generateAvroJava タスクを実行します。

f:id:ksby:20190831205406p:plain

build/generated-main-avro-java/ksbysample/eipapp/kafka/avro/Counter.java が生成されます。.java ファイルはここに生成されますが、src/main/java の下のソースファイルから参照可能です(import して利用できます)。

f:id:ksby:20190831205539p:plain

producer でメッセージを送信して consumer で受信する(スキーマは自動登録する)

application.properties を以下のように変更します。

spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.group-id=topic1-consumer-group1

logging.level.org.apache.kafka.clients.NetworkClient=OFF
  • 以下の6行を追加します。
    • spring.kafka.properties.schema.registry.url=http://localhost:8081
    • spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    • spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    • spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
    • spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
    • spring.kafka.consumer.properties.specific.avro.reader=true
  • spring.kafka.consumer.properties.specific.avro.reader=true の設定がないと受信したメッセージを Counter クラスに変換してくれません。java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class ksbysample.eipapp.kafka.avro.Counter のエラーメッセージが出力されます。

src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java の以下の点を変更します。

import ksbysample.eipapp.kafka.avro.Counter;

..........

@Slf4j
@Configuration
public class KafkaDslSampleConfig {

    ..........

    // 設定は application.properties の spring.kafka.producer.~ で行う
    private final ProducerFactory<Integer, Counter> kafkaProducerFactory;
    // 設定は application.properties の spring.kafka.consumer.~ で行う
    private final ConsumerFactory<Integer, Counter> kafkaConsumerFactory;

    ..........

    private static final String[] FULL_NAMES = new String[]{"田中 太郎", "鈴木 花子", "木村 さくら"};

    public KafkaDslSampleConfig(ProducerFactory<Integer, Counter> kafkaProducerFactory
            , ConsumerFactory<Integer, Counter> kafkaConsumerFactory
            , MessageChannel errorChannel) {
        ..........
    }

    @Bean
    public Supplier<Integer> countSupplier() {
        return () -> this.count.addAndGet(1);
    }

    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // メッセージの kafka_topic ヘッダに topic 名をセットすると
                // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく
                // kafka_topic ヘッダの topic に送信される
                // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME))
                .<Integer, Counter>transform(p -> Counter.newBuilder()
                        .setCount(p)
                        .setFullName(FULL_NAMES[ThreadLocalRandom.current().nextInt(3)])
                        .build())
                .log(LoggingHandler.Level.WARN)
                .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME)
                        , e -> e.advice(retryAdvice()))
                .get();
    }

    ..........

    private KafkaProducerMessageHandlerSpec<Integer, Counter, ?> kafkaMessageHandler(
            ProducerFactory<Integer, Counter> producerFactory, String topic) {
        ..........
    }

    @Bean
    public IntegrationFlow topic1Consumer1Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .<Counter>handle((p, h) -> {
                    log.error(String.format("★★★ partition = %s, count = %s, fullName = %s"
                            , h.get("kafka_receivedPartitionId"), p.getCount(), p.getFullName()));
                    return null;
                })
                .get();
    }

    ..........

    private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, Counter>
    createKafkaMessageDrivenChannelAdapter() {
        ..........
    }

    ..........

}

大半は Value の型の変更(String → Counter)です。

  • フィールドで定義している ProducerFactory, ConsumerFactory の Value の型を String → Counter に変更します。
    • ProducerFactory<Integer, String> kafkaProducerFactory;ProducerFactory<Integer, Counter> kafkaProducerFactory;
    • ConsumerFactory<Integer, Counter> kafkaConsumerFactory;ConsumerFactory<Integer, Counter> kafkaConsumerFactory;
  • fullName にセットするためのデータを定義した private static final String[] FULL_NAMES = new String[]{"田中 太郎", "鈴木 花子", "木村 さくら"}; を追加します。
  • コンストラクタの引数の ProducerFactory, ConsumerFactory の Value の型を String → Counter に変更します。
    • ProducerFactory<Integer, String> kafkaProducerFactoryProducerFactory<Integer, Counter> kafkaProducerFactory
    • ConsumerFactory<Integer, String> kafkaConsumerFactoryConsumerFactory<Integer, Counter> kafkaConsumerFactory
  • countSupplier メソッドの以下の点を変更します。
    • 戻り値の型を Supplier<String>Supplier<Integer> に変更します。
    • return () -> String.valueOf(this.count.addAndGet(1));return () -> this.count.addAndGet(1); に変更します。
  • topic1ProducerFlow メソッドの以下の点を変更します。
    • .<Integer, Counter>transform(...) を追加します。ここで送信する Counter クラスのメッセージを生成します。
  • kafkaMessageHandler メソッドの以下の点を変更します。
    • 戻り値の型を KafkaProducerMessageHandlerSpec<Integer, String, ?>KafkaProducerMessageHandlerSpec<Integer, Counter, ?> に変更します。
    • 第1引数を ProducerFactory<Integer, String> producerFactoryProducerFactory<Integer, Counter> producerFactory に変更します。
  • topic1Consumer1Flow ~ topic1Consumer3Flow メソッドの以下の点を変更します。
    • .handle((p, h) -> { ... }).<Counter>handle((p, h) -> { ... }) に変更します。
    • p.getCount(), p.getFullName() をログに出力するようにします。
  • createKafkaMessageDrivenChannelAdapter メソッドの戻り値の型を <Integer, String><Integer, Counter> に変更します。

src/test/java/ksbysample/eipapp/kafka/KafkaSendAndReceiveTest.java のテストはもう動かなくなっているので、クラスに @Disabled アノテーションを付加してテストが実行されないようにします。

@Disabled
@Slf4j
@SpringBootTest
public class KafkaSendAndReceiveTest {

以上で変更は終了です。build タスクを実行した後、

f:id:ksby:20190901002343p:plain

アプリケーションを実行するとメッセージの送受信が正常に行われていることが確認できます。

f:id:ksby:20190901002702p:plain

またアプリケーション実行直後に ConsumerConfig、ProducerConfig 以外に KafkaAvroDeserializerConfig、KafkaAvroSerializerConfig の設定が出力されていました。

2019-09-01 00:24:17.210  INFO 7592 --- [           main] i.c.k.s.KafkaAvroDeserializerConfig      : KafkaAvroDeserializerConfig values: 
    bearer.auth.token = [hidden]
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    specific.avro.reader = true
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

..........

2019-09-01 00:24:17.251  INFO 7592 --- [ask-scheduler-1] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: 
    bearer.auth.token = [hidden]
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

curl -X GET http://localhost:8081/subjects コマンドを実行して登録されているスキーマを見ると Topic1-Value が登録されていました。

f:id:ksby:20190901003310p:plain

スキーマ名が Counter ではなく Topic1-Value なのは Schema Registry Serializer and Formatter に記載されている TopicNameStrategy によるものです。ログに出力されている KafkaAvroSerializerConfig、KafkaAvroDeserializerConfig の設定にも value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy が出力されています。

履歴

2019/09/01
初版発行。