かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その20 )( MessageChannel に Redis を使用する )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 24. Redis Support
    http://docs.spring.io/spring-integration/reference/html/redis.html

  2. RDB技術者のためのNoSQLガイド

    RDB技術者のためのNoSQLガイド

    RDB技術者のためのNoSQLガイド

  3. logback+MDCでWebアプリのリクエスト内容を簡単にログに出力する方法
    http://qiita.com/namutaka/items/c35c437b7246c5e4d729

目次

  1. ksbysample-eipapp-redisqueue プロジェクトを作成する
  2. サンプルの Flow を作成する
  3. 動作確認
  4. Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class というログが出力される理由とは?
  5. X-B3-TraceId, X-B3-SpanId がログに出力されない理由とは?
  6. 動作確認2
  7. 動作確認3 ( Message の送信側と受信・出力側を別プロセスにしてみる )
  8. 続く。。。

手順

ksbysample-eipapp-redisqueue プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. ksbysample-eipapp-wiretap プロジェクトのルート直下に config/checkstyle, config/findbugs ディレクトリを作成します。

  3. config/checkstyle の下に ksbysample-eipapp-messaginggateway プロジェクトの config/checkstyle の下にある google_checks.xml をコピーします。

  4. config/findbugs の下に findbugs-exclude.xml を新規作成し、リンク先の内容 の内容に変更します。

  5. src/main/java の下に ksbysample.eipapp.redisqueue パッケージを作成します。

  6. src/main/java/ksbysample/eipapp/redisqueue の下に Application.java を作成し、リンク先の内容 を記述します。

  7. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  8. src/main/resources の下に logback-spring.xml を作成し、リンク先のその1の内容 を記述します。

サンプルの Flow を作成する

  1. src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を新規作成し、リンク先の内容 を記述します。

動作確認

  1. まずは clean タスク → Rebuild Project → build タスクを実行して正常終了することを確認します。

    f:id:ksby:20170318091131p:plain

  2. http://zipkin.io/pages/quickstart.html から最新の ZipkinServer ( zipkin-server-1.21.0-exec.jar ) をダウンロードして C:\zipkin に保存した後、起動します。今回は起動するだけです。

  3. redis-cli コマンドで Redis の中にデータが入っていないことを確認します。

    f:id:ksby:20170318092849p:plain

  4. bootRun を実行して ksbysample-eipapp-redisqueue を起動します。

    起動後 sampleDataMessageSource() から Message を受信して出力するところまで動いてはいるようですが、その確認の前に以下2点の問題が出ていることに気づきました。

    • Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class という WARN ログが出力されています。
    • traceId, spanId がログに出力されていません。

    f:id:ksby:20170318133056p:plain

    先にこの2点の問題を解消します。

Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class というログが出力される理由とは?

Web で検索すると spring-cloud/spring-cloud-sleuth の Deprecated trace headers #467 の Issue がヒットしました。いろいろ書かれていますが、WARN なので気にしなくてよいようです。ログを出さないようにします。

src/main/resources/logback-spring.xmlリンク先のその2の内容 に変更します。

clean タスク → Rebuild Project → build タスクを実行した後、bootRun を実行すると今度はログは出ませんでした。

f:id:ksby:20170318161134p:plain

X-B3-TraceId, X-B3-SpanId がログに出力されない理由とは?

logback+MDCでWebアプリのリクエスト内容を簡単にログに出力する方法 の記事を見つけました。%X{...} は MDC のデータを出力するためのもののようなので、%X{X-B3-TraceId:-} で値が出力されないということは MDC に X-B3-TraceId がセットされていないということでしょうか?

MDC の値を出力してみます。src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。

    @Bean
    public IntegrationFlow printFlow() {
        return IntegrationFlows
                .from(redisQueue())
                .bridge(e -> e.poller(Pollers.fixedDelay(30000)))
                .<SampleData>handle((p, h) -> {
                    // MDC のデータを出力する
                    Map<String, String> mdcMap = MDC.getCopyOfContextMap();
                    mdcMap.entrySet().forEach(entry -> System.out.println("[MDC ] " + entry.getKey() + " = " + entry.getValue()));

                    System.out.println("★★★ " + p.getMessage());
                    return null;
                })
                .get();
    }

bootRun を実行して MDC にセットされているデータを出力してみましたが、X-B3-TraceId, X-B3-SpanId はセットされていますね。。。

f:id:ksby:20170318184923p:plain

.<SampleData>handle((p, h) -> {...} の中で log.info(...) でログを出力してみます。src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow printFlow() {
        return IntegrationFlows
                .from(redisQueue())
                .bridge(e -> e.poller(Pollers.fixedDelay(30000)))
                .<SampleData>handle((p, h) -> {
                    log.info("●●●");

                    System.out.println("★★★ " + p.getMessage());
                    return null;
                })
                .get();
    }

}

bootRun を実行すると、今度は X-B3-TraceId, X-B3-SpanId が出力されました。

f:id:ksby:20170318200509p:plain

MessageSource からデータを取得した時 ( GenericMessage [payload=... のログが出力される時 ) から X-B3-TraceId, X-B3-SpanId が出力されるものと思い込んでいましたが、このタイミングではまだ出力されないようです。FlowConfig#printFlow の方でもログを出力していなかったので、X-B3-TraceId, X-B3-SpanId が出力されないのは単に出力されない実装になっていたからでした。

今回は X-B3-TraceId, X-B3-SpanId を確認する必要はないので、このまま進めます。上で修正した内容は元に戻します。

動作確認2

動作確認に戻ります。

  1. redis-cli コマンドで Redis の中にデータが入っていないことを確認します。

    f:id:ksby:20170318203012p:plain

  2. bootRun を実行して ksbysample-eipapp-redisqueue を起動します。Redis にどのようにデータが格納されるのか確認したいので、Message が1~2個 Redis に格納されたと思われるタイミングで ksbysample-eipapp-redisqueue を停止します。

    f:id:ksby:20170318203439p:plain

    Message が2個 Redis に入ったタイミングで停止しました。Redis の状態を確認してみます。

    f:id:ksby:20170318204143p:plain

    “REDIS_QUEUE” という名称の list が作成されており、データが2件存在します。2件のデータを表示してみます。

    f:id:ksby:20170318212143p:plain

    Message が header, payload 全て入っているようですが、これでは内容がほとんど分かりませんね。。。

  3. 再び bootRun を実行して ksbysample-eipapp-redisqueue を起動します。Redis にメッセージが2個入っている状態で起動するので、起動時にデフォルトで出力される最初の1個と合わせて計3個出力されるはずです。

    f:id:ksby:20170318212911p:plain

    Redis に残っていた ID = 1, 2 のデータと、今回起動時に追加された ID = 0 のデータの計3個出力されました。

  4. Redis に残っているデータを削除します。

    f:id:ksby:20170318213508p:plain

動作確認3 ( Message の送信側と受信・出力側を別プロセスにしてみる )

Message を Redis に入れるようにしたので、Message の送信側と受信・出力側を別プロセスにしても動作するはずです。試してみます。

最初に jar ファイルを入れる C:\eipapp\ksbysample-eipapp-redisqueue ディレクトリを作成します。

次に送信側の jar ファイルを作成します。build.gradle を以下のように変更します。

group 'ksbysample'
version '1.0.0-RELEASE-send'

src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。FlowConfig#printFlow をコメントアウトし、送信タイミングを 200ミリ秒へ、受信タイミングを 1000ミリ秒へ変更します。

    @Bean
    public IntegrationFlow mainFlow() {
        return IntegrationFlows
                .from(sampleDataMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(200)))
                .log()
                .channel(redisQueue())
                .get();
    }

//    @Bean
//    public IntegrationFlow printFlow() {
//        return IntegrationFlows
//                .from(redisQueue())
//                .bridge(e -> e.poller(Pollers.fixedDelay(1000)))
//                .<SampleData>handle((p, h) -> {
//                    System.out.println("★★★ " + p.getMessage());
//                    return null;
//                })
//                .get();
//    }

clean タスク → Rebuild Project → build タスクを実行して ksbysample-eipapp-redisqueue-1.0.0-RELEASE-send.jar を作成した後、C:\eipapp\ksbysample-eipapp-redisqueue へコピーします。

受信・出力側の jar ファイルを作成します。build.gradle を以下のように変更します。

group 'ksbysample'
version '1.0.0-RELEASE-recv'

src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。今度は FlowConfig#mainFlow をコメントアウトします。

//    @Bean
//    public IntegrationFlow mainFlow() {
//        return IntegrationFlows
//                .from(sampleDataMessageSource()
//                        , e -> e.poller(Pollers.fixedDelay(200)))
//                .log()
//                .channel(redisQueue())
//                .get();
//    }

    @Bean
    public IntegrationFlow printFlow() {
        return IntegrationFlows
                .from(redisQueue())
                .bridge(e -> e.poller(Pollers.fixedDelay(1000)))
                .<SampleData>handle((p, h) -> {
                    System.out.println("★★★ " + p.getMessage());
                    return null;
                })
                .get();
    }

clean タスク → Rebuild Project → build タスクを実行して ksbysample-eipapp-redisqueue-1.0.0-RELEASE-recv.jar を作成した後、C:\eipapp\ksbysample-eipapp-redisqueue へコピーします。

実行してみます。コマンドプロンプトを3つ開き、1つは送信側の jar ファイルを起動し、残りの2つは受信・出力側の jar ファイルを起動します。

f:id:ksby:20170318222520p:plain

一定数出力されたところで送信側の jar ファイルを止めます。結果を見ると、

  • Message は重複して受信・出力されることはありませんでした。片方が受信した Message はもう片方では受信されません。
  • 標準出力に出力するだけの処理で時間がほとんどかからないので、Message の受信は一方のプロセスに偏ってしまいました。

f:id:ksby:20170318223243p:plain

続く。。。

もう少しいろいろ試してみたいことがあるので、続きます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.5.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
        maven { url "https://plugins.gradle.org/m2/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
        // for Error Prone ( http://errorprone.info/ )
        classpath("net.ltgt.gradle:gradle-errorprone-plugin:0.0.9")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'
apply plugin: 'net.ltgt.errorprone'
apply plugin: 'checkstyle'
apply plugin: 'findbugs'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing,-path']
compileJava.options.compilerArgs += ['-Xep:RemoveUnusedImports:WARN']

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

checkstyle {
    configFile = file("${rootProject.projectDir}/config/checkstyle/google_checks.xml")
    toolVersion = '7.6'
    sourceSets = [project.sourceSets.main]
}

findbugs {
    toolVersion = '3.0.1'
    sourceSets = [project.sourceSets.main]
    ignoreFailures = true
    effort = "max"
    excludeFilter = file("${rootProject.projectDir}/config/findbugs/findbugs-exclude.xml")
}

tasks.withType(FindBugs) {
    reports {
        xml.enabled = false
        html.enabled = true
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        // Spring Could の BOM を先に書かないと Spring IO Platform の bomProperty でのバージョン変更が適用されない
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE")
        mavenBom("io.spring.platform:platform-bom:Athens-SR4") {
            bomProperty 'commons-lang3.version', '3.5'
            bomProperty 'guava.version', '21.0'
        }
    }
}

dependencies {
    def spockVersion = "1.1-groovy-2.4-rc-3"
    def lombokVersion = "1.16.12"
    def errorproneVersion = '2.0.15'

    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.boot:spring-boot-starter-data-redis")
    compile("org.springframework.integration:spring-integration-redis")
    compile("org.codehaus.janino:janino")
    compile("org.apache.commons:commons-lang3")
    compile("com.google.guava:guava")
    testCompile("org.springframework.boot:spring-boot-starter-test")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    testCompile("org.assertj:assertj-core:3.6.2")
    testCompile("org.spockframework:spock-core:${spockVersion}")
    testCompile("org.spockframework:spock-spring:${spockVersion}")

    // for lombok
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
    testCompileOnly("org.projectlombok:lombok:${lombokVersion}")

    // for Error Prone ( http://errorprone.info/ )
    errorprone("com.google.errorprone:error_prone_core:${errorproneVersion}")
    compileOnly("com.google.errorprone:error_prone_annotations:${errorproneVersion}")
}
  • 24. Redis Support を使用するため、以下の2行を記述します。
    • compile(“org.springframework.boot:spring-boot-starter-data-redis”)
    • compile(“org.springframework.integration:spring-integration-redis”)

findbugs-exclude.xml

<?xml version="1.0" encoding="UTF-8"?>
<FindBugsFilter>
</FindBugsFilter>

Application.java

package ksbysample.eipapp.redisqueue;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

application.properties

spring.application.name=redisqueue
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0

spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=localhost:6381,localhost:6382,localhost:6383
  • Redis への接続設定として以下の2行を記述します。
    • spring.redis.sentinel.master
    • spring.redis.sentinel.nodes

logback-spring.xml

■その1

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

■その2

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    ..........

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
    <logger name="org.springframework.cloud.sleuth" level="ERROR"/>
</configuration>
  • <logger name="org.springframework.cloud.sleuth" level="ERROR"/> を追加します。

FlowConfig.java

package ksbysample.eipapp.redisqueue;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.redis.store.RedisChannelMessageStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
public class FlowConfig {

    private final RedisConnectionFactory redisConnectionFactory;

    public FlowConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    /**
     * Message の palyload にセットする SampleData クラス
     */
    @Data
    @AllArgsConstructor
    public static class SampleData implements Serializable {

        private static final long serialVersionUID = 6103271054731633658L;

        private Integer id;

        private LocalDateTime createDateTime;

        private String message;

    }

    /**
     * SampleData オブジェクトを送信する MessageSource
     */
    public static class SampleDataMessageSource implements MessageSource<SampleData> {

        private AtomicInteger idGenerator = new AtomicInteger(0);

        @Override
        public Message<SampleData> receive() {
            Integer id = idGenerator.getAndIncrement();
            LocalDateTime createDateTime = LocalDateTime.now();
            String message = String.format("idGenerator = %d は %s に生成されました"
                    , id, createDateTime.format(DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss")));

            SampleData sampleData = new SampleData(id, createDateTime, message);
            return MessageBuilder.withPayload(sampleData).build();
        }

    }

    @Bean
    public MessageSource<SampleData> sampleDataMessageSource() {
        return new SampleDataMessageSource();
    }

    @Bean
    public RedisChannelMessageStore redisMessageStore() {
        return new RedisChannelMessageStore(this.redisConnectionFactory);
    }

    @Bean
    public QueueChannel redisQueue() {
        return MessageChannels.queue("redisQueue", redisMessageStore(), "REDIS_QUEUE").get();
    }

    /**
     * メインフロー
     * 10秒毎に {@link SampleDataMessageSource} から SampleData オブジェクトが payload にセットされた Message を受信し、
     * {@link FlowConfig#redisQueue()} へ送信する
     *
     * @return {@link IntegrationFlow} オブジェクト
     */
    @Bean
    public IntegrationFlow mainFlow() {
        return IntegrationFlows
                .from(sampleDataMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(10000)))
                .log()
                .channel(redisQueue())
                .get();
    }

    /**
     * {@link SampleData} オブジェクトが payload にセットされた Message を受信し、
     * {@link SampleData#getMessage()} の出力結果を標準出力に出力する
     *
     * @return {@link IntegrationFlow} オブジェクト
     */
    @Bean
    public IntegrationFlow printFlow() {
        return IntegrationFlows
                .from(redisQueue())
                .bridge(e -> e.poller(Pollers.fixedDelay(30000)))
                .<SampleData>handle((p, h) -> {
                    System.out.println("★★★ " + p.getMessage());
                    return null;
                })
                .get();
    }

}

履歴

2017/03/18
初版発行。