かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その12 )( FTP サーバからファイルをダウンロードして SFTP サーバへアップロードする )

概要

記事一覧はこちらです。

  • Spring Integration DSL のサンプルを作成します。
  • 以下の処理を行う常駐型アプリケーションを作成します。
    • FTPサーバに recv01 ユーザでログインし /out ディレクトリにファイルがあるかチェックします。ファイルがあれば D:\eipapp\ksbysample-eipapp-ftp2sftp\recv ディレクトリにダウンロードします。
    • ダウンロードしたファイルを SFTP サーバにアップロードします。SFTP サーバには send01 ユーザでログインし /in ディレクトリにアップロードします。ファイルはアップロードが成功したら send ディレクトリに、失敗したら error ディレクトリに移動します。
  • Spring Cloud Sleuth による zipkin 連携は今回も入れます。
  • リトライ処理に spring-retry を使用してみます。

参照したサイト・書籍

  1. Xlight ftp server
    https://www.xlightftpd.com/

    • Windows で利用可能な FTP/SFTP サーバです。「Personal edition is free for personal use.」とのことなので今回はこれを利用します。
    • Spring Integration の FTP/FTPS Adapters でダウンロード、アップロードしても特に問題は発生しませんでした。
  2. spring-integration-java-dsl/build.gradle
    https://github.com/spring-projects/spring-integration-java-dsl/blob/master/build.gradle

  3. javac - Java プログラミング言語コンパイラ
    http://docs.oracle.com/javase/jp/7/technotes/tools/solaris/javac.html

  4. Spring Integration Reference Manual - 15. FTP/FTPS Adapters http://docs.spring.io/spring-integration/reference/html/ftp.html

  5. Spring Integration Reference Manual - 27. SFTP Adapters http://docs.spring.io/spring-integration/reference/html/sftp.html

  6. Spring-Retry - シンプルで本質的なコードを汚さないリトライ処理フレームワーク http://tech.atware.co.jp/spring-retry/

  7. spring-projects/spring-retry
    https://github.com/spring-projects/spring-retry

目次

  1. Windows の FTP/SFTP サーバとして Xlight ftp server をインストール・設定する
    1. Xlight ftp server をインストールする
    2. FTP サーバと recv01 ユーザを作成して FTP サーバを起動する
    3. SFTP サーバと send01 ユーザを作成して SFTP サーバを起動する
  2. recv, send, error ディレクトリを作成する
  3. ksbysample-eipapp-ftp2sftp プロジェクトを作成する
  4. FTP サーバからファイルをダウンロードする処理を作成する
  5. SFTP サーバにファイルをアップロードする処理を作成する
  6. アップロード成功時には send ディレクリへ、失敗時には error ディレクトリへ移動する処理を作成する
  7. 動作確認

手順

WindowsFTP/SFTP サーバとして Xlight ftp server をインストール・設定する

Spring Boot + Spring Integration でいろいろ試してみる ( その1 )( SFTP でファイルアップロードするバッチを作成する )freeFTPd をインストールしましたが、Spring Integration の 15. FTP/FTPS Adapters でダウンロードすると ダウンロード中であることを示す ~.writing がファイルの末尾に付いたままダウンロードが完了しない問題がありました。

他に Windows で利用可能な FTP サーバを探したところ Xlight ftp server が personal use ならば free とのことなので、今回はこれをインストールして FTP/SFTP サーバとして使用します。

Xlight ftp server をインストールする

  1. ダウンロードのページ から「Setup with installer - 64-bit」の Download Link をクリックして setup-x64.exe をダウンロードします。

    f:id:ksby:20170125135603p:plain

  2. setup-x64.exe を実行します。

  3. 「Setup - Xlight FTP Server」ダイアログが表示されます。「Next >」ボタンをクリックします。

  4. 「License Agreement」画面が表示されます。「I accept the agreement」を選択した後、「Next >」ボタンをクリックします。

  5. 「Select Destination Location」画面が表示されます。インストール先を “C:\Xlight” へ変更した後、「Next >」ボタンをクリックします。

  6. 「Select Start Menu Folder」画面が表示されます。「Next >」ボタンをクリックします。

  7. 「Select Additional Tasks」画面が表示されます。「Create desktop icon」のチェックを外した後、「Next >」ボタンをクリックします。

  8. 「Ready to Install」画面が表示されます。「Install」ボタンをクリックします。

  9. インストールが実行されます。

  10. インストールが完了すると「Completing the Xlight FTP Server Setup Wizard」画面が表示されます。「Launch application」のチェックを外した後、「Finish」ボタンをクリックします。

FTP サーバと recv01 ユーザを作成して FTP サーバを起動する

  1. C:\Xlight\xlight.exe を実行します。

  2. 「Xlight FTP Server」画面が表示されます。「New Virtual Server」ボタンをクリックします。

    f:id:ksby:20170125141928p:plain

  3. 「New Virtual Server」ダイアログが表示されます。何も変更せずに「OK」ボタンをクリックします。

    f:id:ksby:20170125142438p:plain

  4. 元の画面に戻り FTP サーバが追加されていることが確認できます。次に「User List」ボタンをクリックします。

    f:id:ksby:20170125142647p:plain

  5. 「User List」ダイアログが表示されます。「Add」ボタンをクリックします。

    f:id:ksby:20170125143421p:plain

  6. ユーザ登録用のダイアログが表示されます。以下の値を入力した後、「OK」ボタンをクリックします。

    • 「Username」「Password」に “recv01” を入力します。
    • 「Home Directory」に “C:\Xlight\ftproot\recv01” を入力します。

    f:id:ksby:20170125144137p:plain

  7. 「User List」ダイアログに戻り recv01 ユーザが登録されていることが確認できます。recv01 を選択した後、「Edit」ボタンをクリックします。

    f:id:ksby:20170125144435p:plain

  8. 「Username: recv01」ダイアログが表示されます。画面左側の「User Path」を選択した後、画面右側のリストから Virtual Path = “/” のデータを選択して「Edit」ボタンをクリックします。

    f:id:ksby:20170125145016p:plain

  9. 「Virtual Path」ダイアログが表示されます。全ての操作を行えるようにしたいので、以下の画像の赤枠内のチェックボックスをチェックした後、「OK」ボタンをクリックします。

    f:id:ksby:20170125145521p:plain

    「Username: recv01」ダイアログに戻ると Permission に “-” の部分がなくなっています。「OK」ボタンをクリックしてダイアログを閉じます。

    f:id:ksby:20170125145846p:plain

  10. C:\Xlight\ftproot\recv01\out ディレクトリを作成します。

  11. 「Xlight FTP Server」画面に戻ったら Port = 21 のサーバを選択した後、「Start Server」ボタンをクリックしてサーバを起動します。起動すると「Status」に “Running” と表示されます。

    f:id:ksby:20170125153551p:plain

SFTP サーバと send01 ユーザを作成して SFTP サーバを起動する

基本的には FTP サーバの時と同じです。異なる部分のみ記載します。

  1. Virtual Server 作成時に「Protocol」を “SSH2” にします。Port も 22 に変わります。

    f:id:ksby:20170125165244p:plain

  2. 以下のユーザを作成します。ディレクトリの操作権限は recv01 の時と同様に全てチェックします。

    • 「Username」「Password」に “send01” を入力します。
    • 「Home Directory」に “C:\Xlight\sftproot\send01” を入力します。

    f:id:ksby:20170125170413p:plain

  3. C:\Xlight\sftproot\send01\in ディレクトリを作成します。

  4. 最後に「Xlight FTP Server」画面で Port = 22 のサーバを選択した後、「Start Server」ボタンをクリックしてサーバを起動します。起動すると「Status」に “Running” と表示されます。

    f:id:ksby:20170125171552p:plain

C:\Xlight のディレクトリ構成は以下のようになります。

C:\Xlight
├ ftproot
│ └ recv01
│    └ out
└ sftproot
   └ send01
      └ in

recv, send, error ディレクトリを作成する

常駐型アプリケーション用として以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-ftp2sftp
├ error
├ recv
└ send

ksbysample-eipapp-ftp2sftp プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.ftp2sftp パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/ftp2sftp の下に Application.java を作成し、リンク先の内容 を記述します。

  4. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  5. src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

FTP サーバからファイルをダウンロードする処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下に ApplicationConfig.java を作成し、リンク先のその1の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/ftp2sftp の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

SFTP サーバにファイルをアップロードする処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下の ApplicationConfig.javaリンク先のその2の内容 に変更します。

  2. src/main/java/ksbysample/eipapp/ftp2sftp の下に SftpUploadMessageHandler.java を作成し、リンク先の内容 を記述します。

  3. src/main/java/ksbysample/eipapp/ftp2sftp の下の FlowConfig.javaリンク先のその2の内容 に変更します。

アップロード成功時には send ディレクリへ、失敗時には error ディレクトリへ移動する処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下の FlowConfig.javaリンク先のその3の内容 に変更します。

動作確認

  1. clean タスク実行 → Rebuild Project 実行をした後、build タスクを実行して “BUILD SUCCESSFUL” のメッセージが表示されることを確認します。

    f:id:ksby:20170127004240p:plain

  2. テストでは Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する ) で作成した 2014.txt, 2015.txt, 2016.txt を使用します。

  3. まずは FTP サーバ、SFTP サーバを起動して正常に処理される場合を確認します。ディレクトリにファイルがないことを確認してから bootRun タスクを実行します。

  4. C:\Xlight\ftproot\recv01\out に 2014.txt を入れます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、 C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127005635p:plain

  5. send, in のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt の3ファイルを入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt, 2015.txt, 2016.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127010005p:plain

    • 最初に 2014.txt, 2015.txt, 2016.txt の全てのファイルをダウンロードしています。
    • その後で1ファイルずつ Message 送信 → SFTP アップロード が行われています。
  6. 今度は SFTP サーバを停止してアップロードに失敗する場合を確認します。

  7. send, in のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\error に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127010946p:plain

    • リトライは4回 ( 初回と合わせて実行されたのは計5回 ) 行われています。

    Zipkin のグラフを見てもリトライしている状況は分かりませんでした。途中で途切れたりはせず、単に時間がかかっているので横に長いグラフが出るだけのようです。

    f:id:ksby:20170127011600p:plain

  8. error のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\error に 2014.txt, 2015.txt, 2016.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127011953p:plain

    • 並列処理にはしていないので、3ファイルダウンロードした後、1ファイルずつ Message 送信 → SFTP アップロードを5回リトライ → エラーとなり error ディレクトリへ移動 が行われています。
  9. 最後に SFTP サーバを停止した状態で out ディレクトリにファイルを置いた後、リトライの途中で SFTP サーバを起動して正常に処理される場合を確認します。

  10. error のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなりリトライが実行されますので、途中で SFTP サーバを起動します。更に少しすると C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127012807p:plain

    • リトライ中に SFTP サーバが起動すれば正常に処理が進みました。

長くなったので、メモ書きは次に書きます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-ftp")
    compile("org.springframework.integration:spring-integration-sftp")
    compile("org.springframework.retry:spring-retry")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}
  • compileJava.options.compilerArgs 等の compileArgs の記述を spring-integration-java-dsl/build.gradle をまねて1行にして、-options,-processing を追加してみました。
  • Spring Integration の FTP/FTPS Adapters, SFTP Adapters を使用するので以下の2行を記述します。
    • compile("org.springframework.integration:spring-integration-ftp")
    • compile("org.springframework.integration:spring-integration-sftp")
  • 今回 spring-retry を使用するので以下の1行を記述します。
    • compile("org.springframework.retry:spring-retry")

Application.java

package ksbysample.eipapp.ftp2sftp;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;

@SpringBootApplication
@EnableRetry(proxyTargetClass = true)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
  • 今回は spring-retry を使用するので @EnableRetry(proxyTargetClass = true) を記述します。

application.properties

spring.application.name=ftp2sftp
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
    <logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/>
    <logger name="com.jcraft.jsch" level="ERROR"/>
</configuration>

ApplicationConfig.java

■その1

package ksbysample.eipapp.ftp2sftp;

import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;

@Configuration
public class ApplicationConfig {

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("localhost");
        factory.setPort(21);
        factory.setUsername("recv01");
        factory.setPassword("recv01");
        return new CachingSessionFactory<>(factory);
    }

}

■その2

@Configuration
public class ApplicationConfig {

    ..........

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

}
  • @Bean public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() { ... } を追加します。

■完成形

package ksbysample.eipapp.ftp2sftp;

import com.jcraft.jsch.ChannelSftp;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;

@Configuration
public class ApplicationConfig {

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("localhost");
        factory.setPort(21);
        factory.setUsername("recv01");
        factory.setPassword("recv01");
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.ftp2sftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

@Slf4j
@Configuration
public class FlowConfig {

    private final SessionFactory<FTPFile> ftpSessionFactory;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory) {
        this.ftpSessionFactory = ftpSessionFactory;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .handle((p, h) -> {
                    // C:/eipapp/ksbysample-eipapp-ftp2sftp/recv にダウンロードされたファイルを削除しないと
                    // .localFilter(new AcceptAllFileListFilter<>()) を記述しているために毎回同じファイルが
                    // Message で流れてくるので、動作確認するためにここで削除する
                    try {
                        Files.delete(Paths.get(p.toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                })
                .get();
    }

}

■その2

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    private final SftpUploadMessageHandler sftpUploadMessageHandler;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory
            , SftpUploadMessageHandler sftpUploadMessageHandler) {
        this.ftpSessionFactory = ftpSessionFactory;
        this.sftpUploadMessageHandler = sftpUploadMessageHandler;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(...)
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                .handle((p, h) -> {
                    // C:/eipapp/ksbysample-eipapp-ftp2sftp/recv にダウンロードされたファイルを削除しないと
                    // .localFilter(new AcceptAllFileListFilter<>()) を記述しているために毎回同じファイルが
                    // Message で流れてくるので、動作確認するためにここで削除する
                    try {
                        Files.delete(Paths.get(p.toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                })
                .get();
    }

}
  • .from(...) の次に以下の処理を追加します。
    • .enrichHeaders(h -> h.header("sftpUploadError", false))
    • .handle(this.sftpUploadMessageHandler)

■その3

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(...)
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r
                        .recipientFlow("headers['sftpUploadError'] == false"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/send/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP 正常終了 ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                }))
                        .recipientFlow("headers['sftpUploadError'] == true"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/error/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP エラー ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                })))
                .get();
    }

}
  • .routeToRecipients(...) の処理を追加します。

■完成形

package ksbysample.eipapp.ftp2sftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;

@Slf4j
@Configuration
public class FlowConfig {

    private final SessionFactory<FTPFile> ftpSessionFactory;

    private final SftpUploadMessageHandler sftpUploadMessageHandler;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory
            , SftpUploadMessageHandler sftpUploadMessageHandler) {
        this.ftpSessionFactory = ftpSessionFactory;
        this.sftpUploadMessageHandler = sftpUploadMessageHandler;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r
                        .recipientFlow("headers['sftpUploadError'] == false"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/send/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP 正常終了 ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                }))
                        .recipientFlow("headers['sftpUploadError'] == true"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/error/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP エラー ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                })))
                .get();
    }

}

SftpUploadMessageHandler.java

package ksbysample.eipapp.ftp2sftp;

import com.jcraft.jsch.ChannelSftp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.retry.RetryContext;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.retry.support.RetrySynchronizationManager;

import java.io.File;
import java.util.Map;

@Slf4j
@MessageEndpoint
public class SftpUploadMessageHandler implements GenericHandler<File> {

    private final SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

    public SftpUploadMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory) {
        this.sftpSessionFactory = sftpSessionFactory;
    }

    @Override
    @Retryable(value = {Exception.class}, maxAttempts = 5, backoff = @Backoff(delay = 10000))
    public Object handle(File payload, Map<String, Object> headers) {
        // リトライした場合にはリトライ回数をログに出力する
        RetryContext retryContext = RetrySynchronizationManager.getContext();
        if (retryContext.getRetryCount() > 0) {
            log.info("リトライ回数 = {}", retryContext.getRetryCount());
        }

        SftpRemoteFileTemplate sftpClient = new SftpRemoteFileTemplate(sftpSessionFactory);
        sftpClient.setRemoteDirectoryExpression(new LiteralExpression("/in"));
        sftpClient.send(MessageBuilder.withPayload(payload).build(), FileExistsMode.REPLACE);
        return payload;
    }

    @Recover
    public Object recoverException(Exception e, File payload, Map<String, Object> headers) {
        log.error("SFTPサーバにアップロードできませんでした ( {} )", payload.getAbsolutePath());
        return MessageBuilder.withPayload(payload)
                .setHeader("sftpUploadError", true)
                .build();
    }

}

履歴

2017/01/27
初版発行。