読者です 読者をやめる 読者になる 読者になる

かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その13 )( FTP サーバからファイルをダウンロードして SFTP サーバへアップロードする2 )

概要

記事一覧はこちらです。

参照したサイト・書籍

目次

  1. メモ書き
    1. FtpInboundChannelAdapter ( s -> s.ftp(this.ftpSessionFactory).~ ) はファイルを FTP ダウンロードしていなくても in ディレクトリにファイルがあれば Message を次に送信する?
    2. .localFilter(new AcceptAllFileListFilter<>()) を指定しないとどうなるのか?
    3. Pollers.maxMessagesPerPoll(...) を指定しないとどうなるのか?
    4. マルチスレッドで並列処理しても spring-retry のリトライ回数はスレッドの処理毎に維持されるのか?

手順

メモ書き

FtpInboundChannelAdapter ( s -> s.ftp(this.ftpSessionFactory).~ ) はファイルを FTP ダウンロードしていなくても in ディレクトリにファイルがあれば Message を次に送信する?

FtpInboundChannelAdapter を MessageSource に指定している場合 FTP サーバからファイルをダウンロードした時だけ次に Message を送信するものと思っていたのですが、実は .localDirectory(...) に指定したディレクトリにファイルを置いても次に Message を送信します。

分かりやすくするために FlowConfig.java の ftp2SftpFlow Bean の .from(...) の後に .log() を追加します。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

まずは bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと以下のログが出力されます。Message が送信されて .log() により “GenericMessage [payload=…” のログが出力されています。

f:id:ksby:20170127092504p:plain

今度は FTP サーバ側ではなく .localDirectory(...) に指定している C:\eipapp\ksbysample-eipapp-ftp2sftp\recv に直接 2015.txt を置いてみます。

以下のログが出力されます。FTP ダウンロードはしていないのに Message が送信されます。

f:id:ksby:20170127093145p:plain

動作状況を見ている感じだと FtpInboundChannelAdapter とは FTP ダウンロード機能が付いた FileInboundAdaper で、FTP ダウンロードできなくてもローカルディレクトリにファイルがあれば次に Message が送信されます。

.localFilter(new AcceptAllFileListFilter<>()) を指定しないとどうなるのか?

指定しないと AcceptOnceFileListFilter<File> だけが適用されます。

org.springframework.integration.file.remote.synchronizer の AbstractInboundFileSynchronizingMessageSource.java を見ると private volatile FileListFilter<File> localFileListFilter = new AcceptOnceFileListFilter<File>(); の記述があります。

上で変更した FlowConfig.java の ftp2SftpFlow Bean の .from(...) の後に .log() を追加したものに、.localFilter(...) の記述を全てコメントアウトして動作確認してみます。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
//                                .localFilter(new AcceptAllFileListFilter<>())
//                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

bootRun で起動していたら一度停止した後、bootRun で起動します。

まずは FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと以下のログが出力されます。

f:id:ksby:20170127095404p:plain

次に再度 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置いてみます。

f:id:ksby:20170127095638p:plain

今度は FTP ダウンロードは行われますが、既に1度処理しているので AcceptOnceFileListFilter により次に Message は送信されません。ダウンロードされた 2014.txt は C:\eipapp\ksbysample-eipapp-ftp2sftp\recv に残ったままでした。

また .localFilter(new AcceptAllFileListFilter<>()) を記述して localDirectory(...) に指定したディレクトリからファイルを移動しないとどうなるのかも見てみます。以下のようにソースを変更します。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                .channel((nullChannel))
                .get();

bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと、以下のように FTP ダウンロードされたファイルが次々と Message で送信され続けます。

f:id:ksby:20170127100748p:plain

.localFilter(new AcceptAllFileListFilter<>()) を記述するなら、同じスレッドの処理で localDirectory(...) に指定したディレクトリからファイルを移動・削除する必要があります。

Pollers.maxMessagesPerPoll(...) を指定しないとどうなるのか?

Message を送信する処理で、1度に送信するのが1ファイルだけになります。例えば2ファイルある場合、最初に1つ目のファイルの Message を送信したら、次は poller で指定した時間が経過した後に2つ目のファイルの Message が送信されます。

.maxMessagesPerPoll(100)コメントアウトして動作確認してみます。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
//                                .maxMessagesPerPoll(100)))
                        ))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt の3ファイルを置くと以下のようにログが出力されました。

f:id:ksby:20170127103404p:plain

  • ファイルは最初に3ファイル全て FTP ダウンロードされます。
  • その後 Message を送信する処理のところで、5秒間隔で1ファイルずつ送信されます。

マルチスレッドで並列処理しても spring-retry のリトライ回数はスレッドの処理毎に維持されるのか?

org.springframework.retry.support の RetrySynchronizationManager.java をみると private static final ThreadLocal<RetryContext> context = new ThreadLocal<RetryContext>(); と RetryContext に ThreadLocal を利用しているので維持されそうです。確認してみます。

リトライ処理を行う SFTP アップロードの処理をマルチスレッドで並列に処理されるようにしたいので、.channel(c -> c.executor(Executors.newCachedThreadPool())) を追加します。また recv ディレクトリに残っているファイルが何度も Message で送信されてこないよう .filter(...) の2行をコメントアウトします。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
//                                .localFilter(new AcceptAllFileListFilter<>())
//                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                // 以降の処理をマルチスレッドで並列処理する
                .channel(c -> c.executor(Executors.newCachedThreadPool()))
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r

SFTP サーバを停止した後、bootRun で起動します。起動後、C:\Xlight\ftproot\recv01\out に 2014.txt を配置 → 2回リトライのログが出る → 2015.txt を配置 → 1回リトライのログが出る → 2016.txt を配置、の順にファイルを置きます。

以下のようにログが出力されました。

f:id:ksby:20170128005309p:plain

  • 2014.txt は pool-1-thread-1、2015.txt は pool-1-thread-2、2016.txt は pool-1-thread-3 とファイル毎に異なるスレッドで処理されています。
  • リトライ回数はスレッド毎に 1, 2, 3, 4 と増えており、特に回数がおかしくなることはありませんでした。

履歴

2017/01/28
初版発行。