かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する )

概要

記事一覧はこちらです。

  • Spring Integration は DSL を使った時の方が XML ファイルや DSL なしの Java Config の時よりも面白いです。DSL に慣れるためにもいくつかサンプルを作っていこうと思います。
  • Spring Integration DSL を使用して、以下の処理を行う常駐型アプリケーションを作成します。
    1. in ディレクトリに URL 一覧が記入された拡張子が .txt のファイルが置かれているか 1 秒間隔でチェックします。
    2. ファイルが置かれていたら読み込んで行単位に分割します。
    3. URL が “http://” で始まる場合には、その URL にアクセス可能か ( HTTP ステータスコードの 200 が返るか ) チェックします。チェック処理はスレッドを生成して並行処理します。
    4. URL と HTTP ステータスコードを出力した結果ファイルを out ディレクトリに作成します。出力する時には元の .txt ファイルに書かれていた順になるようにします。
    5. 元の .txt ファイルを削除します。
  • 今回作る処理を Spring Integration のフロー図で描こうとすると channel がいちいち途中に挿入されて分かりにくいので、今回は省略します。というか、フロー図を描こうとして改めて DSL だと複雑な処理でも結構簡単に書けるんだなということに気づきましたね。。。

参照したサイト・書籍

  1. spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/
    https://github.com/spring-projects/spring-integration-java-dsl/tree/master/src/test/java/org/springframework/integration/dsl/test

目次

  1. ksbysample-eipapp-urlchecker プロジェクトを作成する
  2. in, out ディレクトリを作成する
  3. urlListFilePollerFlow を実装する
  4. urlCheckFlow を実装する
  5. writeFileFlow を実装する
  6. deleteFileFlow を実装する
  7. 動作確認
  8. メモ書き
    1. .channel(c -> c.executor(taskExecutor())) を入れるとどう処理が変わるのか?
    2. urlListFilePollerFlow でわざわざ sequenceSize header の値を付ける処理を入れている理由とは?

手順

ksbysample-eipapp-urlchecker プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.urlchecker パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/urlchecker の下に Application.java を作成し、リンク先の内容 を記述します。

  4. ログの出力は INFO レベルからにするため、src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

in, out ディレクトリを作成する

C:/eipapp/ksbysample-eipapp-dirpoller を作成し、その下に in, out ディレクトリを作成します。以下の構成になります。

C:/eipapp/ksbysample-eipapp-urlchecker
├ in
└ out

urlListFilePollerFlow を実装する

以降の実装中に動作確認をする際には、以下の内容を記述した sample.txt というファイルを使用します。

http://projects.spring.io/spring-boot/
http://projects.spring.io/spring-integration/
http://ksby.hatenablog.com/entry/2017/01/13/083621
http://ksby.hatenablog.com/entry/2017/01/11/054343
http://ksby.hatenablog.com/entry/2017/01/08/064129
  1. src/main/java/ksbysample/eipapp/urlchecker の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

urlCheckFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその2の内容 に変更します。

    この時点で bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のようにログが出力されて、複数スレッドで並行処理されていることが分かります。

    f:id:ksby:20170115203712p:plain

writeFileFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその3の内容 に変更します。

    この時点で bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のようにログが出力されます。.resequence() の後でファイルに書かれていた順番に変更されていること、.aggregate() の後で 1 つの List の 1 Message に集約されていること、が確認できます。

    f:id:ksby:20170115210521p:plain

deleteFileFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその4の内容 に変更します。

動作確認

動作確認します。以下のファイルを作成します。

■2014.txt

http://ksby.hatenablog.com/entry/2014/12/27/233427
http://ksby.hatenablog.com/entry/2014/12/29/175849

■2015.txt

http://ksby.hatenablog.com/entry/2015/04/18/194947
http://ksby.hatenablog.com/entry/2015/06/28/203752
http://ksby.hatenablog.com/entry/2015/07/22/232807
http://ksby.hatenablog.com/entry/2015/09/30/212728
http://ksby.hatenablog.com/entry/2015/12/27/211513

■2016.txt

http://ksby.hatenablog.com/entry/2016/02/17/103928
http://ksby.hatenablog.com/entry/2016/05/31/014201
http://ksby.hatenablog.com/entry/2016/12/29/230805

bootRun タスクを実行した後、in ディレクトリに 2014.txt, 2015.txt, 2016.txt を入れます。少しすると in ディレクトリからファイルが全てなくなり、out ディレクトリに 2014.txt, 2015.txt, 2016.txt が出来ました。

ログは以下のように出力されており、特に問題はありませんでした。

f:id:ksby:20170115225308p:plain f:id:ksby:20170115225517p:plain

out ディレクトリに出力された 2014.txt, 2015.txt, 2016.txt は以下の内容で、HTTPステータスコード付きで、元ファイルの順番通り出力されています。

■2014.txt

http://ksby.hatenablog.com/entry/2014/12/27/233427,200
http://ksby.hatenablog.com/entry/2014/12/29/175849,200

■2015.txt

http://ksby.hatenablog.com/entry/2015/04/18/194947,200
http://ksby.hatenablog.com/entry/2015/06/28/203752,200
http://ksby.hatenablog.com/entry/2015/07/22/232807,200
http://ksby.hatenablog.com/entry/2015/09/30/212728,200
http://ksby.hatenablog.com/entry/2015/12/27/211513,200

■2016.txt

http://ksby.hatenablog.com/entry/2016/02/17/103928,200
http://ksby.hatenablog.com/entry/2016/05/31/014201,200
http://ksby.hatenablog.com/entry/2016/12/29/230805,200

メモ書き

.channel(c -> c.executor(taskExecutor())) を入れるとどう処理が変わるのか?

.channel(c -> c.executor(taskExecutor())) を入れると、それ以降の処理が別スレッドで実行されるようになります。

urlListFilePollerFlow を以下のように変更して bootRun タスクを実行します。

  • .channel(c -> c.executor(taskExecutor())) を全てコメントアウトします。
  • .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE, ... 内に log.info(String.valueOf(lines.size())); を追加して、取得した行数をログに出力します。
    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
//                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            log.info(String.valueOf(lines.size()));
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して enrichHeaders 以降の処理を更に別のスレッドで並行処理する
//                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

sample.txt を in ディレクトリに置きます。

ログは以下のように出力され、処理が全て ask-scheduler-4 スレッド ( MessageSource を polling しているため main スレッドとは別に作成されるタスクスケジュール用のスレッド ) で実行されていることが確認できます。

f:id:ksby:20170115183038p:plain

次はコメントアウトしていた .channel(c -> c.executor(taskExecutor())) を元に戻して bootRun タスクを実行し直します。sample.txt を in ディレクトリに置くと以下のログが出力されて各処理がそれぞれ別のスレッドで実行されていることが確認できます。

f:id:ksby:20170115183610p:plain

urlListFilePollerFlow でわざわざ sequenceSize header の値を付ける処理を入れている理由とは?

urlListFilePollerFlow を以下のように変更し、.split(new FileSplitter()) の前後で header の内容がどう変わるかみてみます。

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
//                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
//                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
//                        , m -> {
//                            List<String> lines;
//                            try {
//                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
//                            } catch (IOException e) {
//                                throw new RuntimeException(e);
//                            }
//                            return lines.size();
//                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })
                .split(new FileSplitter())
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })

bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のログが出力されます。

f:id:ksby:20170115190040p:plain

このログを見ると .split(new FileSplitter()) の後だと header に以下のキーが追加されています。

  • sequenceNumber
  • file_name
  • sequenceSize
  • correlationId
  • file_originalFile

sequenceSize header の値を付ける処理を入れている理由ですが、以下の通りです。

  • FileSplitter で行毎に分割している場合 sequenceNumber に値を割り当ててはくれますが sequenceSize は 0 です。
  • この後 writeFileFlow で .resequence(), .aggregate() の処理を行いますが、この2つの処理はデフォルトでは 同じ correlationId のデータを sequenceSize 分のデータを受信してから処理する という動作をします。
  • そのため、このままでは sequenceSize = 0 なので .resequence(), .aggregate() が動作しません。それで正しい値を付ける処理を入れています。

以下のように .split(new FileSplitter()) の後のログ出力処理の位置を変更して実行してみます。

                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })

今度は sequenceSize=5 とファイルの行数がセットされていることが確認できます。

f:id:ksby:20170115191820p:plain

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.springframework.integration:spring-integration-http")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • Spring Integration の File Support を使用するので compile('org.springframework.integration:spring-integration-file') を記述します。
  • Spring Integration の HTTP Support は使用しませんが、RestTemplate を使用するために compile('org.springframework.integration:spring-integration-http') を記述します。
  • Spring Integration DSL は最新の 1.2 を使いたいので compile('org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE') を記述します。

Application.java

package ksbysample.eipapp.urlchecker;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

FlowConfig.java

■その1

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.messaging.MessageChannel;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    @Bean
    public MessageChannel urlCheckChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                .channel(nullChannel)
                .get();
    }

}
  • urlListFilePollerFlow 内の各処理についてはコメントを参照。
  • この時点でも動作できるようにするために urlCheckFlow, writeFileFlow を仮実装で記述しておきます。

■その2

@Slf4j
@Configuration
public class FlowConfig {


    ..........
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";

    ..........

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    ..........

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    ..........

}

■その3

@Slf4j
@Configuration
public class FlowConfig {


    ..........
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";


    ..........

    @Bean
    public MessageChannel deleteFileChannel() {
        return new DirectChannel();
    }

    ..........

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS))
                        .build())
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(FileHeaders.FILENAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                .channel(nullChannel)
                .get();
    }

}

■その4

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(FileHeaders.ORIGINAL_FILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                })
                .get();
    }

}
  • 上記の記述を追加する以外に nullChannel のフィールドとコンストラクタを削除します。

■完成形

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";

    @Bean
    public MessageChannel urlCheckChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel deleteFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS))
                        .build())
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(FileHeaders.FILENAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(FileHeaders.ORIGINAL_FILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                })
                .get();
    }

}

履歴

2017/01/15
初版発行。
2017/01/18
* build.gradle の dependencies の記述を ‘’ → “” へ統一しました。
* logback-spring.xml から使用していない記述を削除しました。
2017/01/22
* 以下の3つの定数文字列は Spring Integration で定義済だったので、それを使用するよう変更しました。
* “sequenceSize” を MESSAGE_HEADER_SEQUENCE_SIZE → IntegrationMessageHeaderAccessor.SEQUENCE_SIZE へ変更。
* “file_name” は MESSAGE_HEADER_FILE_NAME → FileHeaders.FILENAME へ変更。
* “file_originalFile” は MESSAGE_HEADER_FILE_ORIGINALFILE → FileHeaders.ORIGINAL_FILE へ変更。
* .enrichHeaders(h -> h.headerFunction.enrichHeaders(h -> h.headerExpression へ変更すれば値を .headerFilter で削除しなくても値を上書きできるので変更しました。