Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する )
概要
記事一覧はこちらです。
- Spring Integration は DSL を使った時の方が XML ファイルや DSL なしの Java Config の時よりも面白いです。DSL に慣れるためにもいくつかサンプルを作っていこうと思います。
- Spring Integration DSL を使用して、以下の処理を行う常駐型アプリケーションを作成します。
- 今回作る処理を Spring Integration のフロー図で描こうとすると channel がいちいち途中に挿入されて分かりにくいので、今回は省略します。というか、フロー図を描こうとして改めて DSL だと複雑な処理でも結構簡単に書けるんだなということに気づきましたね。。。
参照したサイト・書籍
spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/
https://github.com/spring-projects/spring-integration-java-dsl/tree/master/src/test/java/org/springframework/integration/dsl/test- Spring Integration DSL のテストコードです。DSL の使い方を学ぶにはこれと Spring Integration Java DSL Reference をまず見るのが良いです。
目次
- ksbysample-eipapp-urlchecker プロジェクトを作成する
- in, out ディレクトリを作成する
- urlListFilePollerFlow を実装する
- urlCheckFlow を実装する
- writeFileFlow を実装する
- deleteFileFlow を実装する
- 動作確認
- メモ書き
手順
ksbysample-eipapp-urlchecker プロジェクトを作成する
IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。
src/main/java の下に ksbysample.eipapp.urlchecker パッケージを作成します。
src/main/java/ksbysample/eipapp/urlchecker の下に Application.java を作成し、リンク先の内容 を記述します。
ログの出力は INFO レベルからにするため、src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。
in, out ディレクトリを作成する
C:/eipapp/ksbysample-eipapp-dirpoller を作成し、その下に in, out ディレクトリを作成します。以下の構成になります。
C:/eipapp/ksbysample-eipapp-urlchecker ├ in └ out
urlListFilePollerFlow を実装する
以降の実装中に動作確認をする際には、以下の内容を記述した sample.txt というファイルを使用します。
http://projects.spring.io/spring-boot/ http://projects.spring.io/spring-integration/ http://ksby.hatenablog.com/entry/2017/01/13/083621 http://ksby.hatenablog.com/entry/2017/01/11/054343 http://ksby.hatenablog.com/entry/2017/01/08/064129
- src/main/java/ksbysample/eipapp/urlchecker の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。
urlCheckFlow を実装する
src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.java を リンク先のその2の内容 に変更します。
この時点で bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のようにログが出力されて、複数スレッドで並行処理されていることが分かります。
writeFileFlow を実装する
src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.java を リンク先のその3の内容 に変更します。
この時点で bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のようにログが出力されます。
.resequence()
の後でファイルに書かれていた順番に変更されていること、.aggregate()
の後で 1 つの List の 1 Message に集約されていること、が確認できます。
deleteFileFlow を実装する
- src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.java を リンク先のその4の内容 に変更します。
動作確認
動作確認します。以下のファイルを作成します。
■2014.txt
http://ksby.hatenablog.com/entry/2014/12/27/233427 http://ksby.hatenablog.com/entry/2014/12/29/175849
■2015.txt
http://ksby.hatenablog.com/entry/2015/04/18/194947 http://ksby.hatenablog.com/entry/2015/06/28/203752 http://ksby.hatenablog.com/entry/2015/07/22/232807 http://ksby.hatenablog.com/entry/2015/09/30/212728 http://ksby.hatenablog.com/entry/2015/12/27/211513
■2016.txt
http://ksby.hatenablog.com/entry/2016/02/17/103928 http://ksby.hatenablog.com/entry/2016/05/31/014201 http://ksby.hatenablog.com/entry/2016/12/29/230805
bootRun タスクを実行した後、in ディレクトリに 2014.txt, 2015.txt, 2016.txt を入れます。少しすると in ディレクトリからファイルが全てなくなり、out ディレクトリに 2014.txt, 2015.txt, 2016.txt が出来ました。
ログは以下のように出力されており、特に問題はありませんでした。
out ディレクトリに出力された 2014.txt, 2015.txt, 2016.txt は以下の内容で、HTTPステータスコード付きで、元ファイルの順番通り出力されています。
■2014.txt
http://ksby.hatenablog.com/entry/2014/12/27/233427,200 http://ksby.hatenablog.com/entry/2014/12/29/175849,200
■2015.txt
http://ksby.hatenablog.com/entry/2015/04/18/194947,200 http://ksby.hatenablog.com/entry/2015/06/28/203752,200 http://ksby.hatenablog.com/entry/2015/07/22/232807,200 http://ksby.hatenablog.com/entry/2015/09/30/212728,200 http://ksby.hatenablog.com/entry/2015/12/27/211513,200
■2016.txt
http://ksby.hatenablog.com/entry/2016/02/17/103928,200 http://ksby.hatenablog.com/entry/2016/05/31/014201,200 http://ksby.hatenablog.com/entry/2016/12/29/230805,200
メモ書き
.channel(c -> c.executor(taskExecutor()))
を入れるとどう処理が変わるのか?
.channel(c -> c.executor(taskExecutor()))
を入れると、それ以降の処理が別スレッドで実行されるようになります。
urlListFilePollerFlow を以下のように変更して bootRun タスクを実行します。
.channel(c -> c.executor(taskExecutor()))
を全てコメントアウトします。.enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE, ...
内にlog.info(String.valueOf(lines.size()));
を追加して、取得した行数をログに出力します。
@Bean public IntegrationFlow urlListFilePollerFlow() { return IntegrationFlows // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN) , c -> c.poller(Pollers.fixedDelay(1000))) // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する // .channel(c -> c.executor(taskExecutor())) // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate // の処理のために "sequenceSize" header に正しい行数をセットするためのものである .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE , m -> { List<String> lines; try { lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8); } catch (IOException e) { throw new RuntimeException(e); } log.info(String.valueOf(lines.size())); return lines.size(); })) // FileSplitter クラスを利用して、ファイルを行毎に分解する .split(new FileSplitter()) // スレッドを生成して enrichHeaders 以降の処理を更に別のスレッドで並行処理する // .channel(c -> c.executor(taskExecutor())) // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize" // というキーの header の値をセットする .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE , "headers['lines.size']", true)) // Message の内容をログに出力する .log() // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、 // そうでない場合には writeFileChannel へ Message を送信する .<String, Boolean>route(p -> p.startsWith("http://") , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel())) .subFlowMapping(false, sf -> sf.channel(writeFileChannel()))) .get(); }
sample.txt を in ディレクトリに置きます。
ログは以下のように出力され、処理が全て ask-scheduler-4 スレッド ( MessageSource を polling しているため main スレッドとは別に作成されるタスクスケジュール用のスレッド ) で実行されていることが確認できます。
次はコメントアウトしていた .channel(c -> c.executor(taskExecutor()))
を元に戻して bootRun タスクを実行し直します。sample.txt を in ディレクトリに置くと以下のログが出力されて各処理がそれぞれ別のスレッドで実行されていることが確認できます。
urlListFilePollerFlow でわざわざ sequenceSize header の値を付ける処理を入れている理由とは?
urlListFilePollerFlow を以下のように変更し、.split(new FileSplitter())
の前後で header の内容がどう変わるかみてみます。
@Bean public IntegrationFlow urlListFilePollerFlow() { return IntegrationFlows // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN) , c -> c.poller(Pollers.fixedDelay(1000))) // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する // .channel(c -> c.executor(taskExecutor())) // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate // の処理のために "sequenceSize" header に正しい行数をセットするためのものである // .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE // , m -> { // List<String> lines; // try { // lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8); // } catch (IOException e) { // throw new RuntimeException(e); // } // return lines.size(); // })) // FileSplitter クラスを利用して、ファイルを行毎に分解する .handle((p, h) -> { System.out.println("★★★ " + p); h.entrySet().stream().forEach(System.out::println); return p; }) .split(new FileSplitter()) .handle((p, h) -> { System.out.println("★★★ " + p); h.entrySet().stream().forEach(System.out::println); return p; })
bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のログが出力されます。
このログを見ると .split(new FileSplitter())
の後だと header に以下のキーが追加されています。
- sequenceNumber
- file_name
- sequenceSize
- correlationId
- file_originalFile
sequenceSize header の値を付ける処理を入れている理由ですが、以下の通りです。
- FileSplitter で行毎に分割している場合 sequenceNumber に値を割り当ててはくれますが sequenceSize は 0 です。
- この後 writeFileFlow で
.resequence()
,.aggregate()
の処理を行いますが、この2つの処理はデフォルトでは同じ correlationId のデータを sequenceSize 分のデータを受信してから処理する
という動作をします。 - そのため、このままでは sequenceSize = 0 なので
.resequence()
,.aggregate()
が動作しません。それで正しい値を付ける処理を入れています。
以下のように .split(new FileSplitter())
の後のログ出力処理の位置を変更して実行してみます。
// FileSplitter クラスを利用して、ファイルを行毎に分解する .handle((p, h) -> { System.out.println("★★★ " + p); h.entrySet().stream().forEach(System.out::println); return p; }) .split(new FileSplitter()) // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する .channel(c -> c.executor(taskExecutor())) // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize" // というキーの header の値をセットする .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE , "headers['lines.size']", true)) .handle((p, h) -> { System.out.println("★★★ " + p); h.entrySet().stream().forEach(System.out::println); return p; })
今度は sequenceSize=5 とファイルの行数がセットされていることが確認できます。
ソースコード
build.gradle
group 'ksbysample' version '1.0.0-RELEASE' buildscript { ext { springBootVersion = '1.4.3.RELEASE' } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' apply plugin: 'groovy' sourceCompatibility = 1.8 targetCompatibility = 1.8 compileJava.options.compilerArgs = ['-Xlint:all'] compileTestGroovy.options.compilerArgs = ['-Xlint:all'] compileTestJava.options.compilerArgs = ['-Xlint:all'] eclipse { classpath { containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER') containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8' } } idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } repositories { jcenter() } dependencyManagement { imports { mavenBom 'io.spring.platform:platform-bom:Athens-SR2' } } dependencies { // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 compile("org.springframework.boot:spring-boot-starter-integration") compile("org.springframework.integration:spring-integration-file") compile("org.springframework.integration:spring-integration-http") compile("org.codehaus.janino:janino") testCompile("org.springframework.boot:spring-boot-starter-test") testCompile("org.spockframework:spock-core") testCompile("org.spockframework:spock-spring") // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE") compile("org.projectlombok:lombok:1.16.12") testCompile("org.assertj:assertj-core:3.6.1") }
- Spring Integration の File Support を使用するので
compile('org.springframework.integration:spring-integration-file')
を記述します。 - Spring Integration の HTTP Support は使用しませんが、RestTemplate を使用するために
compile('org.springframework.integration:spring-integration-http')
を記述します。 - Spring Integration DSL は最新の 1.2 を使いたいので
compile('org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE')
を記述します。
Application.java
package ksbysample.eipapp.urlchecker; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> <charset>UTF-8</charset> </encoder> </appender> <root level="INFO"> <appender-ref ref="CONSOLE"/> </root> </configuration>
FlowConfig.java
■その1
package ksbysample.eipapp.urlchecker; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.NullChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.file.FileHeaders; import org.springframework.integration.file.splitter.FileSplitter; import org.springframework.messaging.MessageChannel; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Slf4j @Configuration public class FlowConfig { private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in"; private final static String URLLISTFILE_EXT_PATTERN = "*.txt"; private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size"; private final NullChannel nullChannel; public FlowConfig(NullChannel nullChannel) { this.nullChannel = nullChannel; } @Bean public MessageChannel urlCheckChannel() { return new DirectChannel(); } @Bean public MessageChannel writeFileChannel() { return new DirectChannel(); } @Bean public Executor taskExecutor() { return Executors.newCachedThreadPool(); } @Bean public IntegrationFlow urlListFilePollerFlow() { return IntegrationFlows // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN) , c -> c.poller(Pollers.fixedDelay(1000))) // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する .channel(c -> c.executor(taskExecutor())) // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate // の処理のために "sequenceSize" header に正しい行数をセットするためのものである .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE , m -> { List<String> lines; try { lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8); } catch (IOException e) { throw new RuntimeException(e); } return lines.size(); })) // FileSplitter クラスを利用して、ファイルを行毎に分解する .split(new FileSplitter()) // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する .channel(c -> c.executor(taskExecutor())) // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize" // というキーの header の値をセットする .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE , "headers['lines.size']", true)) // Message の内容をログに出力する .log() // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、 // そうでない場合には writeFileChannel へ Message を送信する .<String, Boolean>route(p -> p.startsWith("http://") , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel())) .subFlowMapping(false, sf -> sf.channel(writeFileChannel()))) .get(); } @Bean public IntegrationFlow urlCheckFlow() { return IntegrationFlows.from(urlCheckChannel()) .channel(nullChannel) .get(); } @Bean public IntegrationFlow writeFileFlow() { return IntegrationFlows.from(writeFileChannel()) .channel(nullChannel) .get(); } }
- urlListFilePollerFlow 内の各処理についてはコメントを参照。
- この時点でも動作できるようにするために urlCheckFlow, writeFileFlow を仮実装で記述しておきます。
■その2
@Slf4j @Configuration public class FlowConfig { .......... private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus"; .......... @Bean public RestTemplate restTemplate() { return new RestTemplate(); } .......... @Bean public IntegrationFlow urlCheckFlow() { return IntegrationFlows.from(urlCheckChannel()) // スレッドを生成して、以降の処理を別のスレッドで並行処理する .channel(c -> c.executor(taskExecutor())) // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする .handle((p, h) -> { String statusCode; try { ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class); statusCode = response.getStatusCode().toString(); } catch (HttpClientErrorException e) { statusCode = e.getStatusCode().toString(); } catch (Exception e) { statusCode = e.getMessage(); } log.info(statusCode + " : " + p.toString()); return MessageBuilder.withPayload(p) .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode) .build(); }) // writeFileChannel へ Message を送信する .channel(writeFileChannel()) .get(); } .......... }
■その3
@Slf4j @Configuration public class FlowConfig { .......... private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out"; .......... @Bean public MessageChannel deleteFileChannel() { return new DirectChannel(); } .......... @Bean public IntegrationFlow writeFileFlow() { return IntegrationFlows.from(writeFileChannel()) // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する .handle((p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS)) .build()) // Message が流れる順番をファイルに書かれている順番にする // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている .resequence() .log() // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、 // 1つの List に集約して 1 Message に変更する .aggregate() .log() // out ディレクトリに結果ファイルを出力する // 結果ファイルには URL と HTTP ステータスコード を出力する .handle((p, h) -> { Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(FileHeaders.FILENAME).toString()); @SuppressWarnings("unchecked") List<String> lines = (List<String>) p; try { Files.write(outPath, lines, StandardCharsets.UTF_8 , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); } catch (IOException e) { throw new RuntimeException(e); } return p; }) // deleteFileChannel へ Message を送信する .channel(deleteFileChannel()) .get(); } @Bean public IntegrationFlow deleteFileFlow() { return IntegrationFlows.from(deleteFileChannel()) .channel(nullChannel) .get(); } }
■その4
@Slf4j @Configuration public class FlowConfig { .......... @Bean public IntegrationFlow deleteFileFlow() { return IntegrationFlows.from(deleteFileChannel()) // in ディレクトリのファイルを削除する .handle((p, h) -> { try { Files.delete(Paths.get(h.get(FileHeaders.ORIGINAL_FILE).toString())); } catch (IOException e) { throw new RuntimeException(e); } // ここで処理が終了するので null を返す return null; }) .get(); } }
- 上記の記述を追加する以外に nullChannel のフィールドとコンストラクタを削除します。
■完成形
package ksbysample.eipapp.urlchecker; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.ResponseEntity; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.file.FileHeaders; import org.springframework.integration.file.splitter.FileSplitter; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.RestTemplate; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Slf4j @Configuration public class FlowConfig { private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in"; private final static String URLLISTFILE_EXT_PATTERN = "*.txt"; private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out"; private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size"; private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus"; @Bean public MessageChannel urlCheckChannel() { return new DirectChannel(); } @Bean public MessageChannel writeFileChannel() { return new DirectChannel(); } @Bean public MessageChannel deleteFileChannel() { return new DirectChannel(); } @Bean public Executor taskExecutor() { return Executors.newCachedThreadPool(); } @Bean public RestTemplate restTemplate() { return new RestTemplate(); } @Bean public IntegrationFlow urlListFilePollerFlow() { return IntegrationFlows // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN) , c -> c.poller(Pollers.fixedDelay(1000))) // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する .channel(c -> c.executor(taskExecutor())) // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate // の処理のために "sequenceSize" header に正しい行数をセットするためのものである .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE , m -> { List<String> lines; try { lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8); } catch (IOException e) { throw new RuntimeException(e); } return lines.size(); })) // FileSplitter クラスを利用して、ファイルを行毎に分解する .split(new FileSplitter()) // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する .channel(c -> c.executor(taskExecutor())) // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize" // というキーの header の値をセットする .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE , "headers['lines.size']", true)) // Message の内容をログに出力する .log() // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、 // そうでない場合には writeFileChannel へ Message を送信する .<String, Boolean>route(p -> p.startsWith("http://") , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel())) .subFlowMapping(false, sf -> sf.channel(writeFileChannel()))) .get(); } @Bean public IntegrationFlow urlCheckFlow() { return IntegrationFlows.from(urlCheckChannel()) // スレッドを生成して、以降の処理を別のスレッドで並行処理する .channel(c -> c.executor(taskExecutor())) // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする .handle((p, h) -> { String statusCode; try { ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class); statusCode = response.getStatusCode().toString(); } catch (HttpClientErrorException e) { statusCode = e.getStatusCode().toString(); } catch (Exception e) { statusCode = e.getMessage(); } log.info(statusCode + " : " + p.toString()); return MessageBuilder.withPayload(p) .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode) .build(); }) // writeFileChannel へ Message を送信する .channel(writeFileChannel()) .get(); } @Bean public IntegrationFlow writeFileFlow() { return IntegrationFlows.from(writeFileChannel()) // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する .handle((p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS)) .build()) // Message が流れる順番をファイルに書かれている順番にする // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている .resequence() .log() // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、 // 1つの List に集約して 1 Message に変更する .aggregate() .log() // out ディレクトリに結果ファイルを出力する // 結果ファイルには URL と HTTP ステータスコード を出力する .handle((p, h) -> { Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(FileHeaders.FILENAME).toString()); @SuppressWarnings("unchecked") List<String> lines = (List<String>) p; try { Files.write(outPath, lines, StandardCharsets.UTF_8 , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); } catch (IOException e) { throw new RuntimeException(e); } return p; }) // deleteFileChannel へ Message を送信する .channel(deleteFileChannel()) .get(); } @Bean public IntegrationFlow deleteFileFlow() { return IntegrationFlows.from(deleteFileChannel()) // in ディレクトリのファイルを削除する .handle((p, h) -> { try { Files.delete(Paths.get(h.get(FileHeaders.ORIGINAL_FILE).toString())); } catch (IOException e) { throw new RuntimeException(e); } // ここで処理が終了するので null を返す return null; }) .get(); } }
履歴
2017/01/15
初版発行。
2017/01/18
* build.gradle の dependencies の記述を ‘’ → “” へ統一しました。
* logback-spring.xml から使用していない記述を削除しました。
2017/01/22
* 以下の3つの定数文字列は Spring Integration で定義済だったので、それを使用するよう変更しました。
* “sequenceSize” を MESSAGE_HEADER_SEQUENCE_SIZE → IntegrationMessageHeaderAccessor.SEQUENCE_SIZE へ変更。
* “file_name” は MESSAGE_HEADER_FILE_NAME → FileHeaders.FILENAME へ変更。
* “file_originalFile” は MESSAGE_HEADER_FILE_ORIGINALFILE → FileHeaders.ORIGINAL_FILE へ変更。
* .enrichHeaders(h -> h.headerFunction
→ .enrichHeaders(h -> h.headerExpression
へ変更すれば値を .headerFilter
で削除しなくても値を上書きできるので変更しました。