Spring Boot + Spring Integration でいろいろ試してみる ( その20 )( MessageChannel に Redis を使用する )
概要
記事一覧はこちらです。
- MessageChannel は通常メモリ上に作成されますが、Redis を使用することもできます。
- Redis を使用すればアプリケーションが終了しても Message の内容を MessageChannel に保持しておくことができますので、サンプルを作成して試してみます。
- Redis は以下の記事でインストール、設定したものを使用します。単体の Redis ではなく Redis Sentinel の構成です。
参照したサイト・書籍
Spring Integration Reference Manual - 24. Redis Support
http://docs.spring.io/spring-integration/reference/html/redis.htmlRDB技術者のためのNoSQLガイド
- 作者: 渡部徹太郎,河村康爾,北沢匠,佐伯嘉康,佐藤直生,原沢滋,平山毅,李昌桓
- 出版社/メーカー: 秀和システム
- 発売日: 2016/02/24
- メディア: 単行本
- この商品を含むブログ (3件) を見る
logback+MDCでWebアプリのリクエスト内容を簡単にログに出力する方法
http://qiita.com/namutaka/items/c35c437b7246c5e4d729
目次
- ksbysample-eipapp-redisqueue プロジェクトを作成する
- サンプルの Flow を作成する
- 動作確認
Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class
というログが出力される理由とは?- X-B3-TraceId, X-B3-SpanId がログに出力されない理由とは?
- 動作確認2
- 動作確認3 ( Message の送信側と受信・出力側を別プロセスにしてみる )
- 続く。。。
手順
ksbysample-eipapp-redisqueue プロジェクトを作成する
IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。
ksbysample-eipapp-wiretap プロジェクトのルート直下に config/checkstyle, config/findbugs ディレクトリを作成します。
config/checkstyle の下に ksbysample-eipapp-messaginggateway プロジェクトの config/checkstyle の下にある google_checks.xml をコピーします。
config/findbugs の下に findbugs-exclude.xml を新規作成し、リンク先の内容 の内容に変更します。
src/main/java の下に ksbysample.eipapp.redisqueue パッケージを作成します。
src/main/java/ksbysample/eipapp/redisqueue の下に Application.java を作成し、リンク先の内容 を記述します。
src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。
src/main/resources の下に logback-spring.xml を作成し、リンク先のその1の内容 を記述します。
サンプルの Flow を作成する
動作確認
まずは clean タスク → Rebuild Project → build タスクを実行して正常終了することを確認します。
http://zipkin.io/pages/quickstart.html から最新の ZipkinServer ( zipkin-server-1.21.0-exec.jar ) をダウンロードして C:\zipkin に保存した後、起動します。今回は起動するだけです。
redis-cli コマンドで Redis の中にデータが入っていないことを確認します。
bootRun を実行して ksbysample-eipapp-redisqueue を起動します。
起動後
sampleDataMessageSource()
から Message を受信して出力するところまで動いてはいるようですが、その確認の前に以下2点の問題が出ていることに気づきました。Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class
という WARN ログが出力されています。- traceId, spanId がログに出力されていません。
先にこの2点の問題を解消します。
Deprecated trace headers detected. Please upgrade Sleuth to 1.1 or start sending headers present in the TraceMessageHeaders class
というログが出力される理由とは?
Web で検索すると spring-cloud/spring-cloud-sleuth の Deprecated trace headers #467 の Issue がヒットしました。いろいろ書かれていますが、WARN なので気にしなくてよいようです。ログを出さないようにします。
src/main/resources/logback-spring.xml を リンク先のその2の内容 に変更します。
clean タスク → Rebuild Project → build タスクを実行した後、bootRun を実行すると今度はログは出ませんでした。
X-B3-TraceId, X-B3-SpanId がログに出力されない理由とは?
logback+MDCでWebアプリのリクエスト内容を簡単にログに出力する方法 の記事を見つけました。%X{...}
は MDC のデータを出力するためのもののようなので、%X{X-B3-TraceId:-}
で値が出力されないということは MDC に X-B3-TraceId がセットされていないということでしょうか?
MDC の値を出力してみます。src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。
@Bean public IntegrationFlow printFlow() { return IntegrationFlows .from(redisQueue()) .bridge(e -> e.poller(Pollers.fixedDelay(30000))) .<SampleData>handle((p, h) -> { // MDC のデータを出力する Map<String, String> mdcMap = MDC.getCopyOfContextMap(); mdcMap.entrySet().forEach(entry -> System.out.println("[MDC ] " + entry.getKey() + " = " + entry.getValue())); System.out.println("★★★ " + p.getMessage()); return null; }) .get(); }
bootRun を実行して MDC にセットされているデータを出力してみましたが、X-B3-TraceId, X-B3-SpanId はセットされていますね。。。
.<SampleData>handle((p, h) -> {...}
の中で log.info(...)
でログを出力してみます。src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。
@Slf4j @Configuration public class FlowConfig { .......... @Bean public IntegrationFlow printFlow() { return IntegrationFlows .from(redisQueue()) .bridge(e -> e.poller(Pollers.fixedDelay(30000))) .<SampleData>handle((p, h) -> { log.info("●●●"); System.out.println("★★★ " + p.getMessage()); return null; }) .get(); } }
bootRun を実行すると、今度は X-B3-TraceId, X-B3-SpanId が出力されました。
MessageSource からデータを取得した時 ( GenericMessage [payload=...
のログが出力される時 ) から X-B3-TraceId, X-B3-SpanId が出力されるものと思い込んでいましたが、このタイミングではまだ出力されないようです。FlowConfig#printFlow の方でもログを出力していなかったので、X-B3-TraceId, X-B3-SpanId が出力されないのは単に出力されない実装になっていたからでした。
今回は X-B3-TraceId, X-B3-SpanId を確認する必要はないので、このまま進めます。上で修正した内容は元に戻します。
動作確認2
動作確認に戻ります。
redis-cli コマンドで Redis の中にデータが入っていないことを確認します。
bootRun を実行して ksbysample-eipapp-redisqueue を起動します。Redis にどのようにデータが格納されるのか確認したいので、Message が1~2個 Redis に格納されたと思われるタイミングで ksbysample-eipapp-redisqueue を停止します。
Message が2個 Redis に入ったタイミングで停止しました。Redis の状態を確認してみます。
“REDIS_QUEUE” という名称の list が作成されており、データが2件存在します。2件のデータを表示してみます。
Message が header, payload 全て入っているようですが、これでは内容がほとんど分かりませんね。。。
再び bootRun を実行して ksbysample-eipapp-redisqueue を起動します。Redis にメッセージが2個入っている状態で起動するので、起動時にデフォルトで出力される最初の1個と合わせて計3個出力されるはずです。
Redis に残っていた ID = 1, 2 のデータと、今回起動時に追加された ID = 0 のデータの計3個出力されました。
Redis に残っているデータを削除します。
動作確認3 ( Message の送信側と受信・出力側を別プロセスにしてみる )
Message を Redis に入れるようにしたので、Message の送信側と受信・出力側を別プロセスにしても動作するはずです。試してみます。
最初に jar ファイルを入れる C:\eipapp\ksbysample-eipapp-redisqueue ディレクトリを作成します。
次に送信側の jar ファイルを作成します。build.gradle を以下のように変更します。
group 'ksbysample' version '1.0.0-RELEASE-send'
src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。FlowConfig#printFlow をコメントアウトし、送信タイミングを 200ミリ秒へ、受信タイミングを 1000ミリ秒へ変更します。
@Bean public IntegrationFlow mainFlow() { return IntegrationFlows .from(sampleDataMessageSource() , e -> e.poller(Pollers.fixedDelay(200))) .log() .channel(redisQueue()) .get(); } // @Bean // public IntegrationFlow printFlow() { // return IntegrationFlows // .from(redisQueue()) // .bridge(e -> e.poller(Pollers.fixedDelay(1000))) // .<SampleData>handle((p, h) -> { // System.out.println("★★★ " + p.getMessage()); // return null; // }) // .get(); // }
clean タスク → Rebuild Project → build タスクを実行して ksbysample-eipapp-redisqueue-1.0.0-RELEASE-send.jar を作成した後、C:\eipapp\ksbysample-eipapp-redisqueue へコピーします。
受信・出力側の jar ファイルを作成します。build.gradle を以下のように変更します。
group 'ksbysample' version '1.0.0-RELEASE-recv'
src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。今度は FlowConfig#mainFlow をコメントアウトします。
// @Bean // public IntegrationFlow mainFlow() { // return IntegrationFlows // .from(sampleDataMessageSource() // , e -> e.poller(Pollers.fixedDelay(200))) // .log() // .channel(redisQueue()) // .get(); // } @Bean public IntegrationFlow printFlow() { return IntegrationFlows .from(redisQueue()) .bridge(e -> e.poller(Pollers.fixedDelay(1000))) .<SampleData>handle((p, h) -> { System.out.println("★★★ " + p.getMessage()); return null; }) .get(); }
clean タスク → Rebuild Project → build タスクを実行して ksbysample-eipapp-redisqueue-1.0.0-RELEASE-recv.jar を作成した後、C:\eipapp\ksbysample-eipapp-redisqueue へコピーします。
実行してみます。コマンドプロンプトを3つ開き、1つは送信側の jar ファイルを起動し、残りの2つは受信・出力側の jar ファイルを起動します。
一定数出力されたところで送信側の jar ファイルを止めます。結果を見ると、
- Message は重複して受信・出力されることはありませんでした。片方が受信した Message はもう片方では受信されません。
- 標準出力に出力するだけの処理で時間がほとんどかからないので、Message の受信は一方のプロセスに偏ってしまいました。
続く。。。
もう少しいろいろ試してみたいことがあるので、続きます。
ソースコード
build.gradle
group 'ksbysample' version '1.0.0-RELEASE' buildscript { ext { springBootVersion = '1.4.5.RELEASE' } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } maven { url "https://plugins.gradle.org/m2/" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE") // for Error Prone ( http://errorprone.info/ ) classpath("net.ltgt.gradle:gradle-errorprone-plugin:0.0.9") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' apply plugin: 'groovy' apply plugin: 'net.ltgt.errorprone' apply plugin: 'checkstyle' apply plugin: 'findbugs' sourceCompatibility = 1.8 targetCompatibility = 1.8 [compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing,-path'] compileJava.options.compilerArgs += ['-Xep:RemoveUnusedImports:WARN'] idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } checkstyle { configFile = file("${rootProject.projectDir}/config/checkstyle/google_checks.xml") toolVersion = '7.6' sourceSets = [project.sourceSets.main] } findbugs { toolVersion = '3.0.1' sourceSets = [project.sourceSets.main] ignoreFailures = true effort = "max" excludeFilter = file("${rootProject.projectDir}/config/findbugs/findbugs-exclude.xml") } tasks.withType(FindBugs) { reports { xml.enabled = false html.enabled = true } } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } } dependencyManagement { imports { // Spring Could の BOM を先に書かないと Spring IO Platform の bomProperty でのバージョン変更が適用されない mavenBom("org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE") mavenBom("io.spring.platform:platform-bom:Athens-SR4") { bomProperty 'commons-lang3.version', '3.5' bomProperty 'guava.version', '21.0' } } } dependencies { def spockVersion = "1.1-groovy-2.4-rc-3" def lombokVersion = "1.16.12" def errorproneVersion = '2.0.15' // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 compile("org.springframework.boot:spring-boot-starter-integration") compile("org.springframework.boot:spring-boot-starter-data-redis") compile("org.springframework.integration:spring-integration-redis") compile("org.codehaus.janino:janino") compile("org.apache.commons:commons-lang3") compile("com.google.guava:guava") testCompile("org.springframework.boot:spring-boot-starter-test") // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの // http://projects.spring.io/spring-cloud/ の「Release Trains」参照 compile("org.springframework.cloud:spring-cloud-starter-zipkin") { exclude module: 'spring-boot-starter-web' } // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE") testCompile("org.assertj:assertj-core:3.6.2") testCompile("org.spockframework:spock-core:${spockVersion}") testCompile("org.spockframework:spock-spring:${spockVersion}") // for lombok compileOnly("org.projectlombok:lombok:${lombokVersion}") testCompileOnly("org.projectlombok:lombok:${lombokVersion}") // for Error Prone ( http://errorprone.info/ ) errorprone("com.google.errorprone:error_prone_core:${errorproneVersion}") compileOnly("com.google.errorprone:error_prone_annotations:${errorproneVersion}") }
- 24. Redis Support を使用するため、以下の2行を記述します。
- compile(“org.springframework.boot:spring-boot-starter-data-redis”)
- compile(“org.springframework.integration:spring-integration-redis”)
findbugs-exclude.xml
<?xml version="1.0" encoding="UTF-8"?> <FindBugsFilter> </FindBugsFilter>
Application.java
package ksbysample.eipapp.redisqueue; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
application.properties
spring.application.name=redisqueue spring.zipkin.base-url=http://localhost:9411/ spring.sleuth.sampler.percentage=1.0 spring.redis.sentinel.master=mymaster spring.redis.sentinel.nodes=localhost:6381,localhost:6382,localhost:6383
- Redis への接続設定として以下の2行を記述します。
- spring.redis.sentinel.master
- spring.redis.sentinel.nodes
logback-spring.xml
■その1
<?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml"/> <springProperty scope="context" name="springAppName" source="spring.application.name"/> <property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> <charset>UTF-8</charset> </encoder> </appender> <root level="INFO"> <appender-ref ref="CONSOLE"/> </root> </configuration>
■その2
<?xml version="1.0" encoding="UTF-8"?> <configuration> .......... <root level="INFO"> <appender-ref ref="CONSOLE"/> </root> <logger name="org.springframework.cloud.sleuth" level="ERROR"/> </configuration>
<logger name="org.springframework.cloud.sleuth" level="ERROR"/>
を追加します。
FlowConfig.java
package ksbysample.eipapp.redisqueue; import lombok.AllArgsConstructor; import lombok.Data; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.channel.MessageChannels; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.redis.store.RedisChannelMessageStore; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import java.io.Serializable; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.atomic.AtomicInteger; @Configuration public class FlowConfig { private final RedisConnectionFactory redisConnectionFactory; public FlowConfig(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } /** * Message の palyload にセットする SampleData クラス */ @Data @AllArgsConstructor public static class SampleData implements Serializable { private static final long serialVersionUID = 6103271054731633658L; private Integer id; private LocalDateTime createDateTime; private String message; } /** * SampleData オブジェクトを送信する MessageSource */ public static class SampleDataMessageSource implements MessageSource<SampleData> { private AtomicInteger idGenerator = new AtomicInteger(0); @Override public Message<SampleData> receive() { Integer id = idGenerator.getAndIncrement(); LocalDateTime createDateTime = LocalDateTime.now(); String message = String.format("idGenerator = %d は %s に生成されました" , id, createDateTime.format(DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss"))); SampleData sampleData = new SampleData(id, createDateTime, message); return MessageBuilder.withPayload(sampleData).build(); } } @Bean public MessageSource<SampleData> sampleDataMessageSource() { return new SampleDataMessageSource(); } @Bean public RedisChannelMessageStore redisMessageStore() { return new RedisChannelMessageStore(this.redisConnectionFactory); } @Bean public QueueChannel redisQueue() { return MessageChannels.queue("redisQueue", redisMessageStore(), "REDIS_QUEUE").get(); } /** * メインフロー * 10秒毎に {@link SampleDataMessageSource} から SampleData オブジェクトが payload にセットされた Message を受信し、 * {@link FlowConfig#redisQueue()} へ送信する * * @return {@link IntegrationFlow} オブジェクト */ @Bean public IntegrationFlow mainFlow() { return IntegrationFlows .from(sampleDataMessageSource() , e -> e.poller(Pollers.fixedDelay(10000))) .log() .channel(redisQueue()) .get(); } /** * {@link SampleData} オブジェクトが payload にセットされた Message を受信し、 * {@link SampleData#getMessage()} の出力結果を標準出力に出力する * * @return {@link IntegrationFlow} オブジェクト */ @Bean public IntegrationFlow printFlow() { return IntegrationFlows .from(redisQueue()) .bridge(e -> e.poller(Pollers.fixedDelay(30000))) .<SampleData>handle((p, h) -> { System.out.println("★★★ " + p.getMessage()); return null; }) .get(); } }
履歴
2017/03/18
初版発行。