Spring Boot + Spring Integration でいろいろ試してみる ( その11 )( Spring Cloud Sleuth を使用して処理状況を Zipkin で表示する )
概要
記事一覧はこちらです。
- Spring Cloud Sleuth を利用すると Spring Integration の処理状況を Zipkin で表示させることができるらしいので、Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する ) で作成した常駐型アプリケーションで試してみたいと思います。
参照したサイト・書籍
Zipkin
http://zipkin.io/Tracing Spring Integration Flow With Spring Cloud Sleuth
https://dzone.com/articles/tracing-spring-integration-flow-with-spring-cloudLINE Engineers' Blog - LINEのマイクロサービス環境における分散トレーシング
http://developers.linecorp.com/blog/ja/?p=3392Spring CloudとZipkinを利用した分散トレーシング
http://www.slideshare.net/rakutentech/spring-cloudzipkinSpring Cloud Sleuth
http://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/1.1.1.RELEASE/Spring Cloud
http://projects.spring.io/spring-cloud/The logback manual - Chapter 6: Layouts
http://logback.qos.ch/manual/layouts.html
目次
- org.springframework.cloud:spring-cloud-dependencies の BOM で適用される spring-cloud-sleuth と spring-boot のバージョンは?
- Zipkin サーバを起動して Spring Integration の処理状況を表示する
- Zipkin 上に表示される channel は Spring Integration DSL で書いたどの処理に該当するのか?
- urlCheckChannel, writeFileChannel, deleteFileChannel を全て QueueChannel に変更しても traceId は同じになるのか?
- 最後に
手順
org.springframework.cloud:spring-cloud-dependencies の BOM で適用される spring-cloud-sleuth と spring-boot のバージョンは?
Spring Cloud のページを見ると Spring Cloud を利用する時は Spring IO Platform とは別の BOM を利用するようです。Spring IO Platform では io.spring.platform:platform-bom:Athens-SR2
ですが、Spring Cloud の場合には org.springframework.cloud:spring-cloud-dependencies:Camden.SR4
と書かれています。
ただし Spring Cloud のページと Spring Cloud Sleuth のページで書かれている BOM が異なっており、Spring Cloud のページでは org.springframework.cloud:spring-cloud-dependencies:Camden.SR4
、Spring Cloud Sleuth のページでは org.springframework.cloud:spring-cloud-dependencies:Brixton.RELEASE
でした。
Spring Cloud のページに BOM と適用されるバージョンが記述されています。常駐型アプリケーションでは Spring Boot の 1.4.3.RELEASE を使用していて Camden.SR4 では1つ古い 1.4.2.RELEASE が記述されていますが、同じ 1.4 系なので Camden.SR4 を使用して試してみることにします。
BOM | spring-cloud-sleuth | spring-boot |
---|---|---|
Camden.SR4 | 1.1.1.RELEASE | 1.4.2.RELEASE |
Brixton.SR7 | 1.0.11.RELEASE | 1.3.8.RELEASE |
。。。と書きましたが、BOM の末尾は Camden.SR4
ではなく Camden.RELEASE
のように .RELEASE
と書かないと正常に動作しません。Camden.SR4
と書くと Zipkin のグラフがなぜか左揃えで表示されますし、数百 ms で終了するはずの処理がなぜか 4 秒もかかるようになりました。
ちなみに Camden.RELEASE
と書くと spring-cloud-sleuth は 1.0.9.RELEASE が使用されます。今回は以下のバージョンになります。
BOM | spring-cloud-sleuth | spring-boot |
---|---|---|
Camden.RELEASE | 1.0.9.RELEASE | 1.4.3.RELEASE |
Zipkin サーバを起動して Spring Integration の処理状況を表示する
Zipkin の jar ファイルをダウンロードして起動する
Zipkin のページにアクセスします。ページ左側の「Quickstart」リンクをクリックした後、「Java」のところの「latest release」リンクをクリックして zipkin-server-1.19.2-exec.jar をダウンロードします。
C:\zipkin ディレクトリを作成し、その下に zipkin-server-1.19.2-exec.jar を保存します。
コマンドプロンプトを起動し、
java -jar zipkin-server-1.19.2-exec.jar
コマンドを実行して Zipkin サーバを起動します。Zipkin って Sprint Boot で作られているんですね。
Started ZipkinServer in ...
のメッセージが出たら起動完了です。ブラウザで http://localhost:9411/ にアクセスすると Zipkin の画面が表示されました。
build.gradle を変更する
IntelliJ IDEA で ksbysample-eipapp-urlchecker プロジェクトを開いた後、build.gradle を リンク先のその1の内容 に変更します。変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。
gradlew dependencies
で依存関係を確認してみます。
compile - Dependencies for source set 'main'. +--- org.springframework.boot:spring-boot-starter-integration: -> 1.4.3.RELEASE | +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE | | +--- org.springframework.boot:spring-boot:1.4.3.RELEASE | | | +--- org.springframework:spring-core:4.3.5.RELEASE | | | \--- org.springframework:spring-context:4.3.5.RELEASE | | | +--- org.springframework:spring-aop:4.3.5.RELEASE | | | | +--- org.springframework:spring-beans:4.3.5.RELEASE | | | | | \--- org.springframework:spring-core:4.3.5.RELEASE | | | | \--- org.springframework:spring-core:4.3.5.RELEASE | | | +--- org.springframework:spring-beans:4.3.5.RELEASE (*) | | | +--- org.springframework:spring-core:4.3.5.RELEASE | | | \--- org.springframework:spring-expression:4.3.5.RELEASE | | | \--- org.springframework:spring-core:4.3.5.RELEASE | | +--- org.springframework.boot:spring-boot-autoconfigure:1.4.3.RELEASE | | | \--- org.springframework.boot:spring-boot:1.4.3.RELEASE (*) | | +--- org.springframework.boot:spring-boot-starter-logging:1.4.3.RELEASE | | | +--- ch.qos.logback:logback-classic:1.1.8 | | | | +--- ch.qos.logback:logback-core:1.1.8 | | | | \--- org.slf4j:slf4j-api:1.7.21 -> 1.7.22 | | | +--- org.slf4j:jcl-over-slf4j:1.7.22 | | | | \--- org.slf4j:slf4j-api:1.7.22 | | | +--- org.slf4j:jul-to-slf4j:1.7.22 | | | | \--- org.slf4j:slf4j-api:1.7.22 | | | \--- org.slf4j:log4j-over-slf4j:1.7.22 | | | \--- org.slf4j:slf4j-api:1.7.22 | | +--- org.springframework:spring-core:4.3.5.RELEASE | | \--- org.yaml:snakeyaml:1.17 | +--- org.springframework.boot:spring-boot-starter-aop:1.4.3.RELEASE | | +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*) | | +--- org.springframework:spring-aop:4.3.5.RELEASE (*) | | \--- org.aspectj:aspectjweaver:1.8.9 | +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE | | +--- org.springframework:spring-core:4.3.5.RELEASE | | +--- org.springframework:spring-aop:4.3.5.RELEASE (*) | | +--- org.springframework:spring-context:4.3.5.RELEASE (*) | | +--- org.springframework:spring-messaging:4.3.5.RELEASE | | | +--- org.springframework:spring-beans:4.3.5.RELEASE (*) | | | +--- org.springframework:spring-context:4.3.5.RELEASE (*) | | | \--- org.springframework:spring-core:4.3.5.RELEASE | | +--- org.springframework:spring-tx:4.3.5.RELEASE | | | +--- org.springframework:spring-beans:4.3.5.RELEASE (*) | | | \--- org.springframework:spring-core:4.3.5.RELEASE | | \--- org.springframework.retry:spring-retry:1.1.3.RELEASE -> 1.1.5.RELEASE | +--- org.springframework.integration:spring-integration-java-dsl:1.1.4.RELEASE -> 1.2.1.RELEASE | | +--- org.springframework.integration:spring-integration-core:4.3.5.RELEASE -> 4.3.6.RELEASE (*) | | \--- org.reactivestreams:reactive-streams:1.0.0 | \--- org.springframework.integration:spring-integration-jmx:4.3.6.RELEASE | \--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*) +--- org.springframework.integration:spring-integration-file: -> 4.3.6.RELEASE | +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*) | \--- commons-io:commons-io:2.4 +--- org.springframework.integration:spring-integration-http: -> 4.3.6.RELEASE | +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*) | \--- org.springframework:spring-webmvc:4.3.5.RELEASE | +--- org.springframework:spring-aop:4.3.5.RELEASE (*) | +--- org.springframework:spring-beans:4.3.5.RELEASE (*) | +--- org.springframework:spring-context:4.3.5.RELEASE (*) | +--- org.springframework:spring-core:4.3.5.RELEASE | +--- org.springframework:spring-expression:4.3.5.RELEASE (*) | \--- org.springframework:spring-web:4.3.5.RELEASE | +--- org.springframework:spring-aop:4.3.5.RELEASE (*) | +--- org.springframework:spring-beans:4.3.5.RELEASE (*) | +--- org.springframework:spring-context:4.3.5.RELEASE (*) | \--- org.springframework:spring-core:4.3.5.RELEASE +--- org.codehaus.janino:janino: -> 2.7.8 | \--- org.codehaus.janino:commons-compiler:2.7.8 +--- org.springframework.cloud:spring-cloud-starter-zipkin: -> 1.0.9.RELEASE | +--- org.springframework.cloud:spring-cloud-starter-sleuth:1.0.9.RELEASE | | +--- org.springframework.cloud:spring-cloud-starter:1.1.3.RELEASE | | | +--- org.springframework.boot:spring-boot-starter:1.3.7.RELEASE -> 1.4.3.RELEASE (*) | | | +--- org.springframework.cloud:spring-cloud-context:1.1.3.RELEASE | | | | \--- org.springframework.security:spring-security-crypto:4.0.4.RELEASE -> 4.1.4.RELEASE | | | +--- org.springframework.cloud:spring-cloud-commons:1.1.3.RELEASE | | | | \--- org.springframework.security:spring-security-crypto:4.0.4.RELEASE -> 4.1.4.RELEASE | | | \--- org.springframework.security:spring-security-rsa:1.0.3.RELEASE | | | \--- org.bouncycastle:bcpkix-jdk15on:1.55 -> 1.54 | | | \--- org.bouncycastle:bcprov-jdk15on:1.54 | | +--- org.springframework.boot:spring-boot-starter-web:1.3.7.RELEASE -> 1.4.3.RELEASE | | | +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*) | | | +--- org.springframework.boot:spring-boot-starter-tomcat:1.4.3.RELEASE | | | | +--- org.apache.tomcat.embed:tomcat-embed-core:8.5.6 | | | | +--- org.apache.tomcat.embed:tomcat-embed-el:8.5.6 | | | | \--- org.apache.tomcat.embed:tomcat-embed-websocket:8.5.6 | | | | \--- org.apache.tomcat.embed:tomcat-embed-core:8.5.6 | | | +--- org.hibernate:hibernate-validator:5.2.4.Final | | | | +--- javax.validation:validation-api:1.1.0.Final | | | | +--- org.jboss.logging:jboss-logging:3.2.1.Final -> 3.3.0.Final | | | | \--- com.fasterxml:classmate:1.1.0 -> 1.3.3 | | | +--- com.fasterxml.jackson.core:jackson-databind:2.8.5 | | | | +--- com.fasterxml.jackson.core:jackson-annotations:2.8.0 -> 2.8.5 | | | | \--- com.fasterxml.jackson.core:jackson-core:2.8.5 | | | +--- org.springframework:spring-web:4.3.5.RELEASE (*) | | | \--- org.springframework:spring-webmvc:4.3.5.RELEASE (*) | | +--- org.springframework.boot:spring-boot-starter-actuator:1.3.7.RELEASE -> 1.4.3.RELEASE | | | +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*) | | | \--- org.springframework.boot:spring-boot-actuator:1.4.3.RELEASE | | | +--- org.springframework.boot:spring-boot:1.4.3.RELEASE (*) | | | +--- org.springframework.boot:spring-boot-autoconfigure:1.4.3.RELEASE (*) | | | +--- com.fasterxml.jackson.core:jackson-databind:2.8.5 (*) | | | +--- org.springframework:spring-core:4.3.5.RELEASE | | | \--- org.springframework:spring-context:4.3.5.RELEASE (*) | | +--- org.springframework.boot:spring-boot-starter-aop:1.3.7.RELEASE -> 1.4.3.RELEASE (*) | | \--- org.springframework.cloud:spring-cloud-sleuth-core:1.0.9.RELEASE | | +--- org.springframework:spring-context:4.2.7.RELEASE -> 4.3.5.RELEASE (*) | | \--- org.aspectj:aspectjrt:1.8.9 | \--- org.springframework.cloud:spring-cloud-sleuth-zipkin:1.0.9.RELEASE | +--- org.springframework.cloud:spring-cloud-sleuth-core:1.0.9.RELEASE (*) | +--- io.zipkin.java:zipkin:1.11.1 | +--- io.zipkin.reporter:zipkin-reporter:0.5.0 | | \--- io.zipkin.java:zipkin:1.11.1 | \--- io.zipkin.reporter:zipkin-sender-urlconnection:0.5.0 | +--- io.zipkin.reporter:zipkin-reporter:0.5.0 (*) | \--- io.zipkin.java:zipkin:1.11.1 +--- org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE (*) \--- org.projectlombok:lombok:1.16.12
Camden の Spring Boot のバージョンは 1.4 系なのかと思ったら org.springframework.boot:spring-boot-starter:1.3.7.RELEASE -> 1.4.3.RELEASE (*)
と 1.3 系が表示されますね ( 1.4 系に変更されていますが )。
また org.springframework.boot:spring-boot-starter-web
が入っています。これがあると Tomcat が起動してしまうので、依存関係から除外します。build.gradle を リンク先のその2の内容 に変更した後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。
application.properties を作成して必要な設定を記述する
- src/main/resources の下に application.properties を新規作成した後、リンク先の内容 を記述します。
logback-spring.xml でログのレイアウトを変更する
src/main/resources の下の logback-spring.xml を リンク先の内容 に変更します。この変更を行うことで、ログに “boostrap” ではなく application.properties の spring.application.name に設定した文字列が出力されるようになります。
動作確認
bootRun タスクを実行し、アプリケーションを起動します。
in フォルダに 2014.txt を置きます。処理が実行されてコンソールにログが出力されます。"urlchecker" の次が traceId で1回の処理で全て同じ値が出力されています。
Zipkin の画面には今回のデータが表示され、
クリックすると詳細な状況が表示されます。並列で処理されていることが分かります。
その中のバーの1つをクリックすると payload の情報や traceId, spanId が掲載されたダイアログが表示されます。
Zipkin のグラフを見た感想ですが、
- 内部的に
urllistfilepollerflow.channel#1
、urllistfilepollerflow.channel#2
といった channel が作成されていることに初めて気づきました。でもどれが何の処理なのか全然分かりませんね。。。 - IntegrationFlows の
.from(...)
、.handle(...)
等のメソッド間のデータはどうも chennel 経由でやり取りされているようです。
- 内部的に
Zipkin 上に表示される channel は Spring Integration DSL で書いたどの処理に該当するのか?
起動時のログに channel と割り当てられている処理が出力されていますので、それを見れば分かります。
例えば urllistfilepollerflow.channel#1
、urllistfilepollerflow.channel#2
は以下の処理が該当します。
channel | 処理 |
---|---|
urllistfilepollerflow.channel#1 | .split(new FileSplitter()) |
urlListFilePollerFlow.channel#2 | .headerFilter(MESSAGE_HEADER_SEQUENCE_SIZE, false) |
urlCheckChannel, writeFileChannel, deleteFileChannel を全て QueueChannel に変更しても traceId は同じになるのか?
DirectChannel ではなく QueueChannel でデータをやり取りするよう src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.java を リンク先の内容 に変更します。
bootRun タスクを実行し、アプリケーションを起動します。
in フォルダに 2014.txt を置きます。処理が実行されてコンソールにログが出力されます。traceId は QueueChannel の時でも同じ値でした。
グラフは少し遅延が途中に入る程度のように見えます。
Message の header を見ると traceId 等の情報が埋め込まれています。
GenericMessage [ payload=http://ksby.hatenablog.com/entry/2014/12/27/233427 , headers={ sequenceNumber=1 , file_name=2014.txt , sequenceSize=2 , X-B3-ParentSpanId=553e9dfb0d0b90b3 , X-Message-Sent=true , messageSent=true , file_originalFile=C:\eipapp\ksbysample-eipapp-urlchecker\in\2014.txt , spanName=message:urlListFilePollerFlow.channel#3 , lines.size=2 , spanTraceId=13b92684b6371460 , spanId=4f65cf944c245ba , spanParentSpanId=553e9dfb0d0b90b3 , X-Span-Name=message:urlListFilePollerFlow.channel#3 , X-B3-SpanId=4f65cf944c245ba , currentSpan=[ Trace: 13b92684b6371460 , Span: 4f65cf944c245ba , Parent: 553e9dfb0d0b90b3 , exportable:true ] , X-B3-Sampled=1 , X-B3-TraceId=13b92684b6371460 , correlationId=43f57730-e42f-74a8-dcbe-e28d549cd0f3 , id=5db6324f-313f-df80-4d5b-61b93ef48096 , X-Current-Span=[ Trace: 13b92684b6371460 , Span: 4f65cf944c245ba , Parent: 553e9dfb0d0b90b3 , exportable:true ] , spanSampled=1 , timestamp=1484848137184 }]
最後に
並列処理の状況がこんな簡単に可視化できるとは、Spring Cloud Sleuth+Zipkin いいですね! Spring Cloud で分散処理させている時に可視化するのに便利なソフトウェアとは聞いていましたが、Spring Integration の処理状況も表示させることができるとは意外でした。
ソースコード
build.gradle
■その1
dependencyManagement { imports { mavenBom 'io.spring.platform:platform-bom:Athens-SR2' mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE' } } dependencies { // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 compile("org.springframework.boot:spring-boot-starter-integration") compile("org.springframework.integration:spring-integration-file") compile("org.springframework.integration:spring-integration-http") compile("org.codehaus.janino:janino") testCompile("org.springframework.boot:spring-boot-starter-test") testCompile("org.spockframework:spock-core") testCompile("org.spockframework:spock-spring") // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの // http://projects.spring.io/spring-cloud/ の「Release Trains」参照 compile("org.springframework.cloud:spring-cloud-starter-zipkin") // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE") compile("org.projectlombok:lombok:1.16.12") testCompile("org.assertj:assertj-core:3.6.1") }
- dependencyManagement に
mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
を追加します。 - dependencies に
compile("org.springframework.cloud:spring-cloud-starter-zipkin")
を追加します。
■その2
// org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの // http://projects.spring.io/spring-cloud/ の「Release Trains」参照 compile("org.springframework.cloud:spring-cloud-starter-zipkin") { exclude module: 'spring-boot-starter-web' }
- dependencies の
compile("org.springframework.cloud:spring-cloud-starter-zipkin")
の後に{ exclude module: "spring-boot-starter-web" }
を追加します。
application.properties
spring.application.name=urlchecker spring.zipkin.base-url=http://localhost:9411/ spring.sleuth.sampler.percentage=1.0
- 上の3行を追加します。
spring.application.name
に設定した文字列が Zipkin の画面に表示されます。
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml"/> <springProperty scope="context" name="springAppName" source="spring.application.name"/> <property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> <charset>UTF-8</charset> </encoder> </appender> <root level="INFO"> <appender-ref ref="CONSOLE"/> </root> </configuration>
<springProperty scope="context" name="springAppName" source="spring.application.name"/>
を追加します。<property name="CONSOLE_LOG_PATTERN" value="..."/>
を追加します。JSON Logback with Logstash の「Logback setup」には%clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},...
と書かれているのですが、これだと traceId が2ヶ所出力されてしまうので%clr(${LOG_LEVEL_PATTERN:-%5p})
→%clr(${level:-%5p})
へ変更しています。また左側に出力される情報が多く、ログの右側の文字列が見えなくなるため,%X{X-B3-ParentSpanId:-}
は削除しました。
FlowConfig.java
package ksbysample.eipapp.urlchecker; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.ResponseEntity; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.dsl.support.GenericHandler; import org.springframework.integration.file.splitter.FileSplitter; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.RestTemplate; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Slf4j @Configuration public class FlowConfig { private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in"; private final static String URLLISTFILE_EXT_PATTERN = "*.txt"; private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out"; private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size"; private final static String MESSAGE_HEADER_SEQUENCE_SIZE = "sequenceSize"; private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus"; private final static String MESSAGE_HEADER_FILE_NAME = "file_name"; private final static String MESSAGE_HEADER_FILE_ORIGINALFILE = "file_originalFile"; @Bean public MessageChannel urlCheckChannel() { return new QueueChannel(); } @Bean public MessageChannel writeFileChannel() { return new QueueChannel(); } @Bean public MessageChannel deleteFileChannel() { return new QueueChannel(); } @Bean public Executor taskExecutor() { return Executors.newCachedThreadPool(); } @Bean public RestTemplate restTemplate() { return new RestTemplate(); } @Bean public IntegrationFlow urlListFilePollerFlow() { return IntegrationFlows // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN) , c -> c.poller(Pollers.fixedDelay(1000))) // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する .channel(c -> c.executor(taskExecutor())) // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate // の処理のために "sequenceSize" header に正しい行数をセットするためのものである .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE , m -> { List<String> lines; try { lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8); } catch (IOException e) { throw new RuntimeException(e); } return lines.size(); })) // FileSplitter クラスを利用して、ファイルを行毎に分解する .split(new FileSplitter()) // スレッドを生成して .headerFilter 以降の処理を更に別のスレッドで並行処理する .channel(c -> c.executor(taskExecutor())) // Message の header から "sequenceSize" というキーの header を削除する .headerFilter(MESSAGE_HEADER_SEQUENCE_SIZE, false) // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize" // というキーの header の値をセットする .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_SEQUENCE_SIZE , m -> m.getHeaders().get(MESSAGE_HEADER_LINES_SIZE))) // Message の内容をログに出力する .log() // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、 // そうでない場合には writeFileChannel へ Message を送信する .<String, Boolean>route(p -> p.startsWith("http://") , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel())) .subFlowMapping(false, sf -> sf.channel(writeFileChannel()))) .get(); } @Bean public IntegrationFlow urlCheckFlow() { return IntegrationFlows.from(urlCheckChannel()) .handle((GenericHandler<Object>) (p, h) -> { return p; }, e -> e.poller(Pollers.fixedDelay(1000))) // スレッドを生成して、以降の処理を別のスレッドで並行処理する .channel(c -> c.executor(taskExecutor())) // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする .handle((p, h) -> { String statusCode; try { ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class); statusCode = response.getStatusCode().toString(); } catch (HttpClientErrorException e) { statusCode = e.getStatusCode().toString(); } catch (Exception e) { statusCode = e.getMessage(); } log.info(statusCode + " : " + p.toString()); return MessageBuilder.withPayload(p) .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode) .build(); }) // writeFileChannel へ Message を送信する .channel(writeFileChannel()) .get(); } @Bean public IntegrationFlow writeFileFlow() { return IntegrationFlows.from(writeFileChannel()) // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する .handle((GenericHandler<Object>) (p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS)).build() , e -> e.poller(Pollers.fixedDelay(1000))) // Message が流れる順番をファイルに書かれている順番にする // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている .resequence() .log() // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、 // 1つの List に集約して 1 Message に変更する .aggregate() .log() // out ディレクトリに結果ファイルを出力する // 結果ファイルには URL と HTTP ステータスコード を出力する .handle((p, h) -> { Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(MESSAGE_HEADER_FILE_NAME).toString()); @SuppressWarnings("unchecked") List<String> lines = (List<String>) p; try { Files.write(outPath, lines, StandardCharsets.UTF_8 , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); } catch (IOException e) { throw new RuntimeException(e); } return p; }) // deleteFileChannel へ Message を送信する .channel(deleteFileChannel()) .get(); } @Bean public IntegrationFlow deleteFileFlow() { return IntegrationFlows.from(deleteFileChannel()) // in ディレクトリのファイルを削除する .handle((GenericHandler<Object>) (p, h) -> { try { Files.delete(Paths.get(h.get(MESSAGE_HEADER_FILE_ORIGINALFILE).toString())); } catch (IOException e) { throw new RuntimeException(e); } // ここで処理が終了するので null を返す return null; }, e -> e.poller(Pollers.fixedDelay(1000))) .get(); } }
- urlCheckChannel Bean, writeFileChannel Bean, deleteFileChannel Bean を全て
return new DirectChannel();
→return new QueueChannel();
へ変更します。 - urlCheckFlow メソッドの
.from(urlCheckChannel())
の後に.handle((GenericHandler<Object>) (p, h) -> { return p; }, e -> e.poller(Pollers.fixedDelay(1000)))
を追加します。 - writeFileFlow メソッドの最初の
.handle(...)
の第2引数に, e -> e.poller(Pollers.fixedDelay(1000))
を追加します。 - deleteFileFlow メソッドの最初の
.handle(...)
の第2引数に, e -> e.poller(Pollers.fixedDelay(1000))
を追加します。
履歴
2017/01/20
初版発行。