かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その11 )( Spring Cloud Sleuth を使用して処理状況を Zipkin で表示する )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. Zipkin
    http://zipkin.io/

  2. Tracing Spring Integration Flow With Spring Cloud Sleuth
    https://dzone.com/articles/tracing-spring-integration-flow-with-spring-cloud

  3. LINE Engineers' Blog - LINEのマイクロサービス環境における分散トレーシング
    http://developers.linecorp.com/blog/ja/?p=3392

  4. Spring CloudとZipkinを利用した分散トレーシング
    http://www.slideshare.net/rakutentech/spring-cloudzipkin

  5. Spring Cloud Sleuth
    http://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/1.1.1.RELEASE/

  6. Spring Cloud
    http://projects.spring.io/spring-cloud/

  7. The logback manual - Chapter 6: Layouts
    http://logback.qos.ch/manual/layouts.html

目次

  1. org.springframework.cloud:spring-cloud-dependencies の BOM で適用される spring-cloud-sleuth と spring-boot のバージョンは?
  2. Zipkin サーバを起動して Spring Integration の処理状況を表示する
    1. Zipkin の jar ファイルをダウンロードして起動する
    2. build.gradle を変更する
    3. application.properties を作成して必要な設定を記述する
    4. logback-spring.xml でログのレイアウトを変更する
    5. 動作確認
  3. Zipkin 上に表示される channel は Spring Integration DSL で書いたどの処理に該当するのか?
  4. urlCheckChannel, writeFileChannel, deleteFileChannel を全て QueueChannel に変更しても traceId は同じになるのか?
  5. 最後に

手順

org.springframework.cloud:spring-cloud-dependencies の BOM で適用される spring-cloud-sleuth と spring-boot のバージョンは?

Spring Cloud のページを見ると Spring Cloud を利用する時は Spring IO Platform とは別の BOM を利用するようです。Spring IO Platform では io.spring.platform:platform-bom:Athens-SR2 ですが、Spring Cloud の場合には org.springframework.cloud:spring-cloud-dependencies:Camden.SR4 と書かれています。

ただし Spring Cloud のページと Spring Cloud Sleuth のページで書かれている BOM が異なっており、Spring Cloud のページでは org.springframework.cloud:spring-cloud-dependencies:Camden.SR4Spring Cloud Sleuth のページでは org.springframework.cloud:spring-cloud-dependencies:Brixton.RELEASE でした。

Spring Cloud のページに BOM と適用されるバージョンが記述されています。常駐型アプリケーションでは Spring Boot の 1.4.3.RELEASE を使用していて Camden.SR4 では1つ古い 1.4.2.RELEASE が記述されていますが、同じ 1.4 系なので Camden.SR4 を使用して試してみることにします。

BOM spring-cloud-sleuth spring-boot
Camden.SR4 1.1.1.RELEASE 1.4.2.RELEASE
Brixton.SR7 1.0.11.RELEASE 1.3.8.RELEASE

。。。と書きましたが、BOM の末尾は Camden.SR4 ではなく Camden.RELEASE のように .RELEASE と書かないと正常に動作しません。Camden.SR4 と書くと Zipkin のグラフがなぜか左揃えで表示されますし、数百 ms で終了するはずの処理がなぜか 4 秒もかかるようになりました。

f:id:ksby:20170120002738p:plain

ちなみに Camden.RELEASE と書くと spring-cloud-sleuth は 1.0.9.RELEASE が使用されます。今回は以下のバージョンになります。

BOM spring-cloud-sleuth spring-boot
Camden.RELEASE 1.0.9.RELEASE 1.4.3.RELEASE

Zipkin サーバを起動して Spring Integration の処理状況を表示する

Zipkin の jar ファイルをダウンロードして起動する

  1. Zipkin のページにアクセスします。ページ左側の「Quickstart」リンクをクリックした後、「Java」のところの「latest release」リンクをクリックして zipkin-server-1.19.2-exec.jar をダウンロードします。

    f:id:ksby:20170118180355p:plain

  2. C:\zipkin ディレクトリを作成し、その下に zipkin-server-1.19.2-exec.jar を保存します。

  3. コマンドプロンプトを起動し、java -jar zipkin-server-1.19.2-exec.jar コマンドを実行して Zipkin サーバを起動します。

    f:id:ksby:20170118182420p:plain

    Zipkin って Sprint Boot で作られているんですね。

    f:id:ksby:20170118182615p:plain

    Started ZipkinServer in ... のメッセージが出たら起動完了です。

  4. ブラウザで http://localhost:9411/ にアクセスすると Zipkin の画面が表示されました。

    f:id:ksby:20170118183151p:plain

build.gradle を変更する

IntelliJ IDEA で ksbysample-eipapp-urlchecker プロジェクトを開いた後、build.gradle を リンク先のその1の内容 に変更します。変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

gradlew dependencies で依存関係を確認してみます。

compile - Dependencies for source set 'main'.
+--- org.springframework.boot:spring-boot-starter-integration: -> 1.4.3.RELEASE
|    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE
|    |    +--- org.springframework.boot:spring-boot:1.4.3.RELEASE
|    |    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |    \--- org.springframework:spring-context:4.3.5.RELEASE
|    |    |         +--- org.springframework:spring-aop:4.3.5.RELEASE
|    |    |         |    +--- org.springframework:spring-beans:4.3.5.RELEASE
|    |    |         |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |         +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         \--- org.springframework:spring-expression:4.3.5.RELEASE
|    |    |              \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework.boot:spring-boot-autoconfigure:1.4.3.RELEASE
|    |    |    \--- org.springframework.boot:spring-boot:1.4.3.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-logging:1.4.3.RELEASE
|    |    |    +--- ch.qos.logback:logback-classic:1.1.8
|    |    |    |    +--- ch.qos.logback:logback-core:1.1.8
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.21 -> 1.7.22
|    |    |    +--- org.slf4j:jcl-over-slf4j:1.7.22
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.22
|    |    |    +--- org.slf4j:jul-to-slf4j:1.7.22
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.22
|    |    |    \--- org.slf4j:log4j-over-slf4j:1.7.22
|    |    |         \--- org.slf4j:slf4j-api:1.7.22
|    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    \--- org.yaml:snakeyaml:1.17
|    +--- org.springframework.boot:spring-boot-starter-aop:1.4.3.RELEASE
|    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|    |    \--- org.aspectj:aspectjweaver:1.8.9
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE
|    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|    |    +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    +--- org.springframework:spring-messaging:4.3.5.RELEASE
|    |    |    +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |    +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework:spring-tx:4.3.5.RELEASE
|    |    |    +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    \--- org.springframework.retry:spring-retry:1.1.3.RELEASE -> 1.1.5.RELEASE
|    +--- org.springframework.integration:spring-integration-java-dsl:1.1.4.RELEASE -> 1.2.1.RELEASE
|    |    +--- org.springframework.integration:spring-integration-core:4.3.5.RELEASE -> 4.3.6.RELEASE (*)
|    |    \--- org.reactivestreams:reactive-streams:1.0.0
|    \--- org.springframework.integration:spring-integration-jmx:4.3.6.RELEASE
|         \--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
+--- org.springframework.integration:spring-integration-file: -> 4.3.6.RELEASE
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
|    \--- commons-io:commons-io:2.4
+--- org.springframework.integration:spring-integration-http: -> 4.3.6.RELEASE
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
|    \--- org.springframework:spring-webmvc:4.3.5.RELEASE
|         +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-core:4.3.5.RELEASE
|         +--- org.springframework:spring-expression:4.3.5.RELEASE (*)
|         \--- org.springframework:spring-web:4.3.5.RELEASE
|              +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|              +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|              +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|              \--- org.springframework:spring-core:4.3.5.RELEASE
+--- org.codehaus.janino:janino: -> 2.7.8
|    \--- org.codehaus.janino:commons-compiler:2.7.8
+--- org.springframework.cloud:spring-cloud-starter-zipkin: -> 1.0.9.RELEASE
|    +--- org.springframework.cloud:spring-cloud-starter-sleuth:1.0.9.RELEASE
|    |    +--- org.springframework.cloud:spring-cloud-starter:1.1.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.3.7.RELEASE -> 1.4.3.RELEASE (*)
|    |    |    +--- org.springframework.cloud:spring-cloud-context:1.1.3.RELEASE
|    |    |    |    \--- org.springframework.security:spring-security-crypto:4.0.4.RELEASE -> 4.1.4.RELEASE
|    |    |    +--- org.springframework.cloud:spring-cloud-commons:1.1.3.RELEASE
|    |    |    |    \--- org.springframework.security:spring-security-crypto:4.0.4.RELEASE -> 4.1.4.RELEASE
|    |    |    \--- org.springframework.security:spring-security-rsa:1.0.3.RELEASE
|    |    |         \--- org.bouncycastle:bcpkix-jdk15on:1.55 -> 1.54
|    |    |              \--- org.bouncycastle:bcprov-jdk15on:1.54
|    |    +--- org.springframework.boot:spring-boot-starter-web:1.3.7.RELEASE -> 1.4.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    |    +--- org.springframework.boot:spring-boot-starter-tomcat:1.4.3.RELEASE
|    |    |    |    +--- org.apache.tomcat.embed:tomcat-embed-core:8.5.6
|    |    |    |    +--- org.apache.tomcat.embed:tomcat-embed-el:8.5.6
|    |    |    |    \--- org.apache.tomcat.embed:tomcat-embed-websocket:8.5.6
|    |    |    |         \--- org.apache.tomcat.embed:tomcat-embed-core:8.5.6
|    |    |    +--- org.hibernate:hibernate-validator:5.2.4.Final
|    |    |    |    +--- javax.validation:validation-api:1.1.0.Final
|    |    |    |    +--- org.jboss.logging:jboss-logging:3.2.1.Final -> 3.3.0.Final
|    |    |    |    \--- com.fasterxml:classmate:1.1.0 -> 1.3.3
|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.8.5
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.8.0 -> 2.8.5
|    |    |    |    \--- com.fasterxml.jackson.core:jackson-core:2.8.5
|    |    |    +--- org.springframework:spring-web:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-webmvc:4.3.5.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-actuator:1.3.7.RELEASE -> 1.4.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    |    \--- org.springframework.boot:spring-boot-actuator:1.4.3.RELEASE
|    |    |         +--- org.springframework.boot:spring-boot:1.4.3.RELEASE (*)
|    |    |         +--- org.springframework.boot:spring-boot-autoconfigure:1.4.3.RELEASE (*)
|    |    |         +--- com.fasterxml.jackson.core:jackson-databind:2.8.5 (*)
|    |    |         +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         \--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-aop:1.3.7.RELEASE -> 1.4.3.RELEASE (*)
|    |    \--- org.springframework.cloud:spring-cloud-sleuth-core:1.0.9.RELEASE
|    |         +--- org.springframework:spring-context:4.2.7.RELEASE -> 4.3.5.RELEASE (*)
|    |         \--- org.aspectj:aspectjrt:1.8.9
|    \--- org.springframework.cloud:spring-cloud-sleuth-zipkin:1.0.9.RELEASE
|         +--- org.springframework.cloud:spring-cloud-sleuth-core:1.0.9.RELEASE (*)
|         +--- io.zipkin.java:zipkin:1.11.1
|         +--- io.zipkin.reporter:zipkin-reporter:0.5.0
|         |    \--- io.zipkin.java:zipkin:1.11.1
|         \--- io.zipkin.reporter:zipkin-sender-urlconnection:0.5.0
|              +--- io.zipkin.reporter:zipkin-reporter:0.5.0 (*)
|              \--- io.zipkin.java:zipkin:1.11.1
+--- org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE (*)
\--- org.projectlombok:lombok:1.16.12

Camden の Spring Boot のバージョンは 1.4 系なのかと思ったら org.springframework.boot:spring-boot-starter:1.3.7.RELEASE -> 1.4.3.RELEASE (*) と 1.3 系が表示されますね ( 1.4 系に変更されていますが )。

また org.springframework.boot:spring-boot-starter-web が入っています。これがあると Tomcat が起動してしまうので、依存関係から除外します。build.gradle を リンク先のその2の内容 に変更した後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

application.properties を作成して必要な設定を記述する

  1. src/main/resources の下に application.properties を新規作成した後、リンク先の内容 を記述します。

logback-spring.xml でログのレイアウトを変更する

  1. src/main/resources の下の logback-spring.xmlリンク先の内容 に変更します。この変更を行うことで、ログに “boostrap” ではなく application.properties の spring.application.name に設定した文字列が出力されるようになります。

    f:id:ksby:20170118203648p:plain

    f:id:ksby:20170118204456p:plain

動作確認

  1. bootRun タスクを実行し、アプリケーションを起動します。

  2. in フォルダに 2014.txt を置きます。処理が実行されてコンソールにログが出力されます。"urlchecker" の次が traceId で1回の処理で全て同じ値が出力されています。

    f:id:ksby:20170120010113p:plain

    Zipkin の画面には今回のデータが表示され、

    f:id:ksby:20170120010315p:plain

    クリックすると詳細な状況が表示されます。並列で処理されていることが分かります。

    f:id:ksby:20170120010524p:plain

    その中のバーの1つをクリックすると payload の情報や traceId, spanId が掲載されたダイアログが表示されます。

    f:id:ksby:20170120010655p:plain

  3. Zipkin のグラフを見た感想ですが、

    • 内部的に urllistfilepollerflow.channel#1urllistfilepollerflow.channel#2 といった channel が作成されていることに初めて気づきました。でもどれが何の処理なのか全然分かりませんね。。。
    • IntegrationFlows の .from(...).handle(...) 等のメソッド間のデータはどうも chennel 経由でやり取りされているようです。

Zipkin 上に表示される channel は Spring Integration DSL で書いたどの処理に該当するのか?

起動時のログに channel と割り当てられている処理が出力されていますので、それを見れば分かります。

f:id:ksby:20170120012800p:plain

例えば urllistfilepollerflow.channel#1urllistfilepollerflow.channel#2 は以下の処理が該当します。

channel 処理
urllistfilepollerflow.channel#1 .split(new FileSplitter())
urlListFilePollerFlow.channel#2 .headerFilter(MESSAGE_HEADER_SEQUENCE_SIZE, false)

urlCheckChannel, writeFileChannel, deleteFileChannel を全て QueueChannel に変更しても traceId は同じになるのか?

  1. DirectChannel ではなく QueueChannel でデータをやり取りするよう src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先の内容 に変更します。

  2. bootRun タスクを実行し、アプリケーションを起動します。

  3. in フォルダに 2014.txt を置きます。処理が実行されてコンソールにログが出力されます。traceId は QueueChannel の時でも同じ値でした。

    f:id:ksby:20170120021319p:plain

    グラフは少し遅延が途中に入る程度のように見えます。

    f:id:ksby:20170120021543p:plain

Message の header を見ると traceId 等の情報が埋め込まれています。

GenericMessage [
payload=http://ksby.hatenablog.com/entry/2014/12/27/233427
, headers={
    sequenceNumber=1
    , file_name=2014.txt
    , sequenceSize=2
    , X-B3-ParentSpanId=553e9dfb0d0b90b3
    , X-Message-Sent=true
    , messageSent=true
    , file_originalFile=C:\eipapp\ksbysample-eipapp-urlchecker\in\2014.txt
    , spanName=message:urlListFilePollerFlow.channel#3
    , lines.size=2
    , spanTraceId=13b92684b6371460
    , spanId=4f65cf944c245ba
    , spanParentSpanId=553e9dfb0d0b90b3
    , X-Span-Name=message:urlListFilePollerFlow.channel#3
    , X-B3-SpanId=4f65cf944c245ba
    , currentSpan=[
        Trace: 13b92684b6371460
        , Span: 4f65cf944c245ba
        , Parent: 553e9dfb0d0b90b3
        , exportable:true
    ]
    , X-B3-Sampled=1
    , X-B3-TraceId=13b92684b6371460
    , correlationId=43f57730-e42f-74a8-dcbe-e28d549cd0f3
    , id=5db6324f-313f-df80-4d5b-61b93ef48096
    , X-Current-Span=[
        Trace: 13b92684b6371460
        , Span: 4f65cf944c245ba
        , Parent: 553e9dfb0d0b90b3
        , exportable:true
    ]
    , spanSampled=1
    , timestamp=1484848137184
}]

最後に

並列処理の状況がこんな簡単に可視化できるとは、Spring Cloud Sleuth+Zipkin いいですね! Spring Cloud で分散処理させている時に可視化するのに便利なソフトウェアとは聞いていましたが、Spring Integration の処理状況も表示させることができるとは意外でした。

ソースコード

build.gradle

■その1

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.springframework.integration:spring-integration-http")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • dependencyManagement に mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE' を追加します。
  • dependencies に compile("org.springframework.cloud:spring-cloud-starter-zipkin") を追加します。

■その2

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }
  • dependencies の compile("org.springframework.cloud:spring-cloud-starter-zipkin") の後に { exclude module: "spring-boot-starter-web" } を追加します。

application.properties

spring.application.name=urlchecker
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0
  • 上の3行を追加します。spring.application.name に設定した文字列が Zipkin の画面に表示されます。

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>
  • <springProperty scope="context" name="springAppName" source="spring.application.name"/> を追加します。
  • <property name="CONSOLE_LOG_PATTERN" value="..."/> を追加します。JSON Logback with Logstash の「Logback setup」には %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},... と書かれているのですが、これだと traceId が2ヶ所出力されてしまうので %clr(${LOG_LEVEL_PATTERN:-%5p})%clr(${level:-%5p}) へ変更しています。また左側に出力される情報が多く、ログの右側の文字列が見えなくなるため ,%X{X-B3-ParentSpanId:-} は削除しました。

FlowConfig.java

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";
    private final static String MESSAGE_HEADER_SEQUENCE_SIZE = "sequenceSize";
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";
    private final static String MESSAGE_HEADER_FILE_NAME = "file_name";
    private final static String MESSAGE_HEADER_FILE_ORIGINALFILE = "file_originalFile";

    @Bean
    public MessageChannel urlCheckChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel deleteFileChannel() {
        return new QueueChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .headerFilter 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header から "sequenceSize" というキーの header を削除する
                .headerFilter(MESSAGE_HEADER_SEQUENCE_SIZE, false)
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_SEQUENCE_SIZE
                        , m -> m.getHeaders().get(MESSAGE_HEADER_LINES_SIZE)))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    return p;
                }, e -> e.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((GenericHandler<Object>) (p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS)).build()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(MESSAGE_HEADER_FILE_NAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((GenericHandler<Object>) (p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(MESSAGE_HEADER_FILE_ORIGINALFILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                }, e -> e.poller(Pollers.fixedDelay(1000)))
                .get();
    }

}
  • urlCheckChannel Bean, writeFileChannel Bean, deleteFileChannel Bean を全て return new DirectChannel();return new QueueChannel(); へ変更します。
  • urlCheckFlow メソッドの .from(urlCheckChannel()) の後に .handle((GenericHandler<Object>) (p, h) -> { return p; }, e -> e.poller(Pollers.fixedDelay(1000))) を追加します。
  • writeFileFlow メソッドの最初の .handle(...) の第2引数に , e -> e.poller(Pollers.fixedDelay(1000)) を追加します。
  • deleteFileFlow メソッドの最初の .handle(...) の第2引数に , e -> e.poller(Pollers.fixedDelay(1000)) を追加します。

履歴

2017/01/20
初版発行。