かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その13 )( FTP サーバからファイルをダウンロードして SFTP サーバへアップロードする2 )

概要

記事一覧はこちらです。

参照したサイト・書籍

目次

  1. メモ書き
    1. FtpInboundChannelAdapter ( s -> s.ftp(this.ftpSessionFactory).~ ) はファイルを FTP ダウンロードしていなくても in ディレクトリにファイルがあれば Message を次に送信する?
    2. .localFilter(new AcceptAllFileListFilter<>()) を指定しないとどうなるのか?
    3. Pollers.maxMessagesPerPoll(...) を指定しないとどうなるのか?
    4. マルチスレッドで並列処理しても spring-retry のリトライ回数はスレッドの処理毎に維持されるのか?

手順

メモ書き

FtpInboundChannelAdapter ( s -> s.ftp(this.ftpSessionFactory).~ ) はファイルを FTP ダウンロードしていなくても in ディレクトリにファイルがあれば Message を次に送信する?

FtpInboundChannelAdapter を MessageSource に指定している場合 FTP サーバからファイルをダウンロードした時だけ次に Message を送信するものと思っていたのですが、実は .localDirectory(...) に指定したディレクトリにファイルを置いても次に Message を送信します。

分かりやすくするために FlowConfig.java の ftp2SftpFlow Bean の .from(...) の後に .log() を追加します。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

まずは bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと以下のログが出力されます。Message が送信されて .log() により “GenericMessage [payload=…” のログが出力されています。

f:id:ksby:20170127092504p:plain

今度は FTP サーバ側ではなく .localDirectory(...) に指定している C:\eipapp\ksbysample-eipapp-ftp2sftp\recv に直接 2015.txt を置いてみます。

以下のログが出力されます。FTP ダウンロードはしていないのに Message が送信されます。

f:id:ksby:20170127093145p:plain

動作状況を見ている感じだと FtpInboundChannelAdapter とは FTP ダウンロード機能が付いた FileInboundAdaper で、FTP ダウンロードできなくてもローカルディレクトリにファイルがあれば次に Message が送信されます。

.localFilter(new AcceptAllFileListFilter<>()) を指定しないとどうなるのか?

指定しないと AcceptOnceFileListFilter<File> だけが適用されます。

org.springframework.integration.file.remote.synchronizer の AbstractInboundFileSynchronizingMessageSource.java を見ると private volatile FileListFilter<File> localFileListFilter = new AcceptOnceFileListFilter<File>(); の記述があります。

上で変更した FlowConfig.java の ftp2SftpFlow Bean の .from(...) の後に .log() を追加したものに、.localFilter(...) の記述を全てコメントアウトして動作確認してみます。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
//                                .localFilter(new AcceptAllFileListFilter<>())
//                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

bootRun で起動していたら一度停止した後、bootRun で起動します。

まずは FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと以下のログが出力されます。

f:id:ksby:20170127095404p:plain

次に再度 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置いてみます。

f:id:ksby:20170127095638p:plain

今度は FTP ダウンロードは行われますが、既に1度処理しているので AcceptOnceFileListFilter により次に Message は送信されません。ダウンロードされた 2014.txt は C:\eipapp\ksbysample-eipapp-ftp2sftp\recv に残ったままでした。

また .localFilter(new AcceptAllFileListFilter<>()) を記述して localDirectory(...) に指定したディレクトリからファイルを移動しないとどうなるのかも見てみます。以下のようにソースを変更します。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                .channel((nullChannel))
                .get();

bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと、以下のように FTP ダウンロードされたファイルが次々と Message で送信され続けます。

f:id:ksby:20170127100748p:plain

.localFilter(new AcceptAllFileListFilter<>()) を記述するなら、同じスレッドの処理で localDirectory(...) に指定したディレクトリからファイルを移動・削除する必要があります。

Pollers.maxMessagesPerPoll(...) を指定しないとどうなるのか?

Message を送信する処理で、1度に送信するのが1ファイルだけになります。例えば2ファイルある場合、最初に1つ目のファイルの Message を送信したら、次は poller で指定した時間が経過した後に2つ目のファイルの Message が送信されます。

.maxMessagesPerPoll(100)コメントアウトして動作確認してみます。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
//                                .maxMessagesPerPoll(100)))
                        ))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt の3ファイルを置くと以下のようにログが出力されました。

f:id:ksby:20170127103404p:plain

  • ファイルは最初に3ファイル全て FTP ダウンロードされます。
  • その後 Message を送信する処理のところで、5秒間隔で1ファイルずつ送信されます。

マルチスレッドで並列処理しても spring-retry のリトライ回数はスレッドの処理毎に維持されるのか?

org.springframework.retry.support の RetrySynchronizationManager.java をみると private static final ThreadLocal<RetryContext> context = new ThreadLocal<RetryContext>(); と RetryContext に ThreadLocal を利用しているので維持されそうです。確認してみます。

リトライ処理を行う SFTP アップロードの処理をマルチスレッドで並列に処理されるようにしたいので、.channel(c -> c.executor(Executors.newCachedThreadPool())) を追加します。また recv ディレクトリに残っているファイルが何度も Message で送信されてこないよう .filter(...) の2行をコメントアウトします。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
//                                .localFilter(new AcceptAllFileListFilter<>())
//                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                // 以降の処理をマルチスレッドで並列処理する
                .channel(c -> c.executor(Executors.newCachedThreadPool()))
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r

SFTP サーバを停止した後、bootRun で起動します。起動後、C:\Xlight\ftproot\recv01\out に 2014.txt を配置 → 2回リトライのログが出る → 2015.txt を配置 → 1回リトライのログが出る → 2016.txt を配置、の順にファイルを置きます。

以下のようにログが出力されました。

f:id:ksby:20170128005309p:plain

  • 2014.txt は pool-1-thread-1、2015.txt は pool-1-thread-2、2016.txt は pool-1-thread-3 とファイル毎に異なるスレッドで処理されています。
  • リトライ回数はスレッド毎に 1, 2, 3, 4 と増えており、特に回数がおかしくなることはありませんでした。

履歴

2017/01/28
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その12 )( FTP サーバからファイルをダウンロードして SFTP サーバへアップロードする )

概要

記事一覧はこちらです。

  • Spring Integration DSL のサンプルを作成します。
  • 以下の処理を行う常駐型アプリケーションを作成します。
    • FTPサーバに recv01 ユーザでログインし /out ディレクトリにファイルがあるかチェックします。ファイルがあれば D:\eipapp\ksbysample-eipapp-ftp2sftp\recv ディレクトリにダウンロードします。
    • ダウンロードしたファイルを SFTP サーバにアップロードします。SFTP サーバには send01 ユーザでログインし /in ディレクトリにアップロードします。ファイルはアップロードが成功したら send ディレクトリに、失敗したら error ディレクトリに移動します。
  • Spring Cloud Sleuth による zipkin 連携は今回も入れます。
  • リトライ処理に spring-retry を使用してみます。

参照したサイト・書籍

  1. Xlight ftp server
    https://www.xlightftpd.com/

    • Windows で利用可能な FTP/SFTP サーバです。「Personal edition is free for personal use.」とのことなので今回はこれを利用します。
    • Spring Integration の FTP/FTPS Adapters でダウンロード、アップロードしても特に問題は発生しませんでした。
  2. spring-integration-java-dsl/build.gradle
    https://github.com/spring-projects/spring-integration-java-dsl/blob/master/build.gradle

  3. javac - Java プログラミング言語コンパイラ
    http://docs.oracle.com/javase/jp/7/technotes/tools/solaris/javac.html

  4. Spring Integration Reference Manual - 15. FTP/FTPS Adapters http://docs.spring.io/spring-integration/reference/html/ftp.html

  5. Spring Integration Reference Manual - 27. SFTP Adapters http://docs.spring.io/spring-integration/reference/html/sftp.html

  6. Spring-Retry - シンプルで本質的なコードを汚さないリトライ処理フレームワーク http://tech.atware.co.jp/spring-retry/

  7. spring-projects/spring-retry
    https://github.com/spring-projects/spring-retry

目次

  1. Windows の FTP/SFTP サーバとして Xlight ftp server をインストール・設定する
    1. Xlight ftp server をインストールする
    2. FTP サーバと recv01 ユーザを作成して FTP サーバを起動する
    3. SFTP サーバと send01 ユーザを作成して SFTP サーバを起動する
  2. recv, send, error ディレクトリを作成する
  3. ksbysample-eipapp-ftp2sftp プロジェクトを作成する
  4. FTP サーバからファイルをダウンロードする処理を作成する
  5. SFTP サーバにファイルをアップロードする処理を作成する
  6. アップロード成功時には send ディレクリへ、失敗時には error ディレクトリへ移動する処理を作成する
  7. 動作確認

手順

WindowsFTP/SFTP サーバとして Xlight ftp server をインストール・設定する

Spring Boot + Spring Integration でいろいろ試してみる ( その1 )( SFTP でファイルアップロードするバッチを作成する )freeFTPd をインストールしましたが、Spring Integration の 15. FTP/FTPS Adapters でダウンロードすると ダウンロード中であることを示す ~.writing がファイルの末尾に付いたままダウンロードが完了しない問題がありました。

他に Windows で利用可能な FTP サーバを探したところ Xlight ftp server が personal use ならば free とのことなので、今回はこれをインストールして FTP/SFTP サーバとして使用します。

Xlight ftp server をインストールする

  1. ダウンロードのページ から「Setup with installer - 64-bit」の Download Link をクリックして setup-x64.exe をダウンロードします。

    f:id:ksby:20170125135603p:plain

  2. setup-x64.exe を実行します。

  3. 「Setup - Xlight FTP Server」ダイアログが表示されます。「Next >」ボタンをクリックします。

  4. 「License Agreement」画面が表示されます。「I accept the agreement」を選択した後、「Next >」ボタンをクリックします。

  5. 「Select Destination Location」画面が表示されます。インストール先を “C:\Xlight” へ変更した後、「Next >」ボタンをクリックします。

  6. 「Select Start Menu Folder」画面が表示されます。「Next >」ボタンをクリックします。

  7. 「Select Additional Tasks」画面が表示されます。「Create desktop icon」のチェックを外した後、「Next >」ボタンをクリックします。

  8. 「Ready to Install」画面が表示されます。「Install」ボタンをクリックします。

  9. インストールが実行されます。

  10. インストールが完了すると「Completing the Xlight FTP Server Setup Wizard」画面が表示されます。「Launch application」のチェックを外した後、「Finish」ボタンをクリックします。

FTP サーバと recv01 ユーザを作成して FTP サーバを起動する

  1. C:\Xlight\xlight.exe を実行します。

  2. 「Xlight FTP Server」画面が表示されます。「New Virtual Server」ボタンをクリックします。

    f:id:ksby:20170125141928p:plain

  3. 「New Virtual Server」ダイアログが表示されます。何も変更せずに「OK」ボタンをクリックします。

    f:id:ksby:20170125142438p:plain

  4. 元の画面に戻り FTP サーバが追加されていることが確認できます。次に「User List」ボタンをクリックします。

    f:id:ksby:20170125142647p:plain

  5. 「User List」ダイアログが表示されます。「Add」ボタンをクリックします。

    f:id:ksby:20170125143421p:plain

  6. ユーザ登録用のダイアログが表示されます。以下の値を入力した後、「OK」ボタンをクリックします。

    • 「Username」「Password」に “recv01” を入力します。
    • 「Home Directory」に “C:\Xlight\ftproot\recv01” を入力します。

    f:id:ksby:20170125144137p:plain

  7. 「User List」ダイアログに戻り recv01 ユーザが登録されていることが確認できます。recv01 を選択した後、「Edit」ボタンをクリックします。

    f:id:ksby:20170125144435p:plain

  8. 「Username: recv01」ダイアログが表示されます。画面左側の「User Path」を選択した後、画面右側のリストから Virtual Path = “/” のデータを選択して「Edit」ボタンをクリックします。

    f:id:ksby:20170125145016p:plain

  9. 「Virtual Path」ダイアログが表示されます。全ての操作を行えるようにしたいので、以下の画像の赤枠内のチェックボックスをチェックした後、「OK」ボタンをクリックします。

    f:id:ksby:20170125145521p:plain

    「Username: recv01」ダイアログに戻ると Permission に “-” の部分がなくなっています。「OK」ボタンをクリックしてダイアログを閉じます。

    f:id:ksby:20170125145846p:plain

  10. C:\Xlight\ftproot\recv01\out ディレクトリを作成します。

  11. 「Xlight FTP Server」画面に戻ったら Port = 21 のサーバを選択した後、「Start Server」ボタンをクリックしてサーバを起動します。起動すると「Status」に “Running” と表示されます。

    f:id:ksby:20170125153551p:plain

SFTP サーバと send01 ユーザを作成して SFTP サーバを起動する

基本的には FTP サーバの時と同じです。異なる部分のみ記載します。

  1. Virtual Server 作成時に「Protocol」を “SSH2” にします。Port も 22 に変わります。

    f:id:ksby:20170125165244p:plain

  2. 以下のユーザを作成します。ディレクトリの操作権限は recv01 の時と同様に全てチェックします。

    • 「Username」「Password」に “send01” を入力します。
    • 「Home Directory」に “C:\Xlight\sftproot\send01” を入力します。

    f:id:ksby:20170125170413p:plain

  3. C:\Xlight\sftproot\send01\in ディレクトリを作成します。

  4. 最後に「Xlight FTP Server」画面で Port = 22 のサーバを選択した後、「Start Server」ボタンをクリックしてサーバを起動します。起動すると「Status」に “Running” と表示されます。

    f:id:ksby:20170125171552p:plain

C:\Xlight のディレクトリ構成は以下のようになります。

C:\Xlight
├ ftproot
│ └ recv01
│    └ out
└ sftproot
   └ send01
      └ in

recv, send, error ディレクトリを作成する

常駐型アプリケーション用として以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-ftp2sftp
├ error
├ recv
└ send

ksbysample-eipapp-ftp2sftp プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.ftp2sftp パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/ftp2sftp の下に Application.java を作成し、リンク先の内容 を記述します。

  4. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  5. src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

FTP サーバからファイルをダウンロードする処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下に ApplicationConfig.java を作成し、リンク先のその1の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/ftp2sftp の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

SFTP サーバにファイルをアップロードする処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下の ApplicationConfig.javaリンク先のその2の内容 に変更します。

  2. src/main/java/ksbysample/eipapp/ftp2sftp の下に SftpUploadMessageHandler.java を作成し、リンク先の内容 を記述します。

  3. src/main/java/ksbysample/eipapp/ftp2sftp の下の FlowConfig.javaリンク先のその2の内容 に変更します。

アップロード成功時には send ディレクリへ、失敗時には error ディレクトリへ移動する処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下の FlowConfig.javaリンク先のその3の内容 に変更します。

動作確認

  1. clean タスク実行 → Rebuild Project 実行をした後、build タスクを実行して “BUILD SUCCESSFUL” のメッセージが表示されることを確認します。

    f:id:ksby:20170127004240p:plain

  2. テストでは Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する ) で作成した 2014.txt, 2015.txt, 2016.txt を使用します。

  3. まずは FTP サーバ、SFTP サーバを起動して正常に処理される場合を確認します。ディレクトリにファイルがないことを確認してから bootRun タスクを実行します。

  4. C:\Xlight\ftproot\recv01\out に 2014.txt を入れます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、 C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127005635p:plain

  5. send, in のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt の3ファイルを入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt, 2015.txt, 2016.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127010005p:plain

    • 最初に 2014.txt, 2015.txt, 2016.txt の全てのファイルをダウンロードしています。
    • その後で1ファイルずつ Message 送信 → SFTP アップロード が行われています。
  6. 今度は SFTP サーバを停止してアップロードに失敗する場合を確認します。

  7. send, in のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\error に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127010946p:plain

    • リトライは4回 ( 初回と合わせて実行されたのは計5回 ) 行われています。

    Zipkin のグラフを見てもリトライしている状況は分かりませんでした。途中で途切れたりはせず、単に時間がかかっているので横に長いグラフが出るだけのようです。

    f:id:ksby:20170127011600p:plain

  8. error のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\error に 2014.txt, 2015.txt, 2016.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127011953p:plain

    • 並列処理にはしていないので、3ファイルダウンロードした後、1ファイルずつ Message 送信 → SFTP アップロードを5回リトライ → エラーとなり error ディレクトリへ移動 が行われています。
  9. 最後に SFTP サーバを停止した状態で out ディレクトリにファイルを置いた後、リトライの途中で SFTP サーバを起動して正常に処理される場合を確認します。

  10. error のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなりリトライが実行されますので、途中で SFTP サーバを起動します。更に少しすると C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127012807p:plain

    • リトライ中に SFTP サーバが起動すれば正常に処理が進みました。

長くなったので、メモ書きは次に書きます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-ftp")
    compile("org.springframework.integration:spring-integration-sftp")
    compile("org.springframework.retry:spring-retry")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}
  • compileJava.options.compilerArgs 等の compileArgs の記述を spring-integration-java-dsl/build.gradle をまねて1行にして、-options,-processing を追加してみました。
  • Spring Integration の FTP/FTPS Adapters, SFTP Adapters を使用するので以下の2行を記述します。
    • compile("org.springframework.integration:spring-integration-ftp")
    • compile("org.springframework.integration:spring-integration-sftp")
  • 今回 spring-retry を使用するので以下の1行を記述します。
    • compile("org.springframework.retry:spring-retry")

Application.java

package ksbysample.eipapp.ftp2sftp;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;

@SpringBootApplication
@EnableRetry(proxyTargetClass = true)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
  • 今回は spring-retry を使用するので @EnableRetry(proxyTargetClass = true) を記述します。

application.properties

spring.application.name=ftp2sftp
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
    <logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/>
    <logger name="com.jcraft.jsch" level="ERROR"/>
</configuration>

ApplicationConfig.java

■その1

package ksbysample.eipapp.ftp2sftp;

import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;

@Configuration
public class ApplicationConfig {

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("localhost");
        factory.setPort(21);
        factory.setUsername("recv01");
        factory.setPassword("recv01");
        return new CachingSessionFactory<>(factory);
    }

}

■その2

@Configuration
public class ApplicationConfig {

    ..........

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

}
  • @Bean public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() { ... } を追加します。

■完成形

package ksbysample.eipapp.ftp2sftp;

import com.jcraft.jsch.ChannelSftp;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;

@Configuration
public class ApplicationConfig {

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("localhost");
        factory.setPort(21);
        factory.setUsername("recv01");
        factory.setPassword("recv01");
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.ftp2sftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

@Slf4j
@Configuration
public class FlowConfig {

    private final SessionFactory<FTPFile> ftpSessionFactory;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory) {
        this.ftpSessionFactory = ftpSessionFactory;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .handle((p, h) -> {
                    // C:/eipapp/ksbysample-eipapp-ftp2sftp/recv にダウンロードされたファイルを削除しないと
                    // .localFilter(new AcceptAllFileListFilter<>()) を記述しているために毎回同じファイルが
                    // Message で流れてくるので、動作確認するためにここで削除する
                    try {
                        Files.delete(Paths.get(p.toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                })
                .get();
    }

}

■その2

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    private final SftpUploadMessageHandler sftpUploadMessageHandler;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory
            , SftpUploadMessageHandler sftpUploadMessageHandler) {
        this.ftpSessionFactory = ftpSessionFactory;
        this.sftpUploadMessageHandler = sftpUploadMessageHandler;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(...)
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                .handle((p, h) -> {
                    // C:/eipapp/ksbysample-eipapp-ftp2sftp/recv にダウンロードされたファイルを削除しないと
                    // .localFilter(new AcceptAllFileListFilter<>()) を記述しているために毎回同じファイルが
                    // Message で流れてくるので、動作確認するためにここで削除する
                    try {
                        Files.delete(Paths.get(p.toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                })
                .get();
    }

}
  • .from(...) の次に以下の処理を追加します。
    • .enrichHeaders(h -> h.header("sftpUploadError", false))
    • .handle(this.sftpUploadMessageHandler)

■その3

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(...)
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r
                        .recipientFlow("headers['sftpUploadError'] == false"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/send/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP 正常終了 ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                }))
                        .recipientFlow("headers['sftpUploadError'] == true"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/error/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP エラー ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                })))
                .get();
    }

}
  • .routeToRecipients(...) の処理を追加します。

■完成形

package ksbysample.eipapp.ftp2sftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;

@Slf4j
@Configuration
public class FlowConfig {

    private final SessionFactory<FTPFile> ftpSessionFactory;

    private final SftpUploadMessageHandler sftpUploadMessageHandler;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory
            , SftpUploadMessageHandler sftpUploadMessageHandler) {
        this.ftpSessionFactory = ftpSessionFactory;
        this.sftpUploadMessageHandler = sftpUploadMessageHandler;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r
                        .recipientFlow("headers['sftpUploadError'] == false"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/send/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP 正常終了 ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                }))
                        .recipientFlow("headers['sftpUploadError'] == true"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/error/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP エラー ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                })))
                .get();
    }

}

SftpUploadMessageHandler.java

package ksbysample.eipapp.ftp2sftp;

import com.jcraft.jsch.ChannelSftp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.retry.RetryContext;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.retry.support.RetrySynchronizationManager;

import java.io.File;
import java.util.Map;

@Slf4j
@MessageEndpoint
public class SftpUploadMessageHandler implements GenericHandler<File> {

    private final SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

    public SftpUploadMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory) {
        this.sftpSessionFactory = sftpSessionFactory;
    }

    @Override
    @Retryable(value = {Exception.class}, maxAttempts = 5, backoff = @Backoff(delay = 10000))
    public Object handle(File payload, Map<String, Object> headers) {
        // リトライした場合にはリトライ回数をログに出力する
        RetryContext retryContext = RetrySynchronizationManager.getContext();
        if (retryContext.getRetryCount() > 0) {
            log.info("リトライ回数 = {}", retryContext.getRetryCount());
        }

        SftpRemoteFileTemplate sftpClient = new SftpRemoteFileTemplate(sftpSessionFactory);
        sftpClient.setRemoteDirectoryExpression(new LiteralExpression("/in"));
        sftpClient.send(MessageBuilder.withPayload(payload).build(), FileExistsMode.REPLACE);
        return payload;
    }

    @Recover
    public Object recoverException(Exception e, File payload, Map<String, Object> headers) {
        log.error("SFTPサーバにアップロードできませんでした ( {} )", payload.getAbsolutePath());
        return MessageBuilder.withPayload(payload)
                .setHeader("sftpUploadError", true)
                .build();
    }

}

履歴

2017/01/27
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その11 )( Spring Cloud Sleuth を使用して処理状況を Zipkin で表示する )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. Zipkin
    http://zipkin.io/

  2. Tracing Spring Integration Flow With Spring Cloud Sleuth
    https://dzone.com/articles/tracing-spring-integration-flow-with-spring-cloud

  3. LINE Engineers' Blog - LINEのマイクロサービス環境における分散トレーシング
    http://developers.linecorp.com/blog/ja/?p=3392

  4. Spring CloudとZipkinを利用した分散トレーシング
    http://www.slideshare.net/rakutentech/spring-cloudzipkin

  5. Spring Cloud Sleuth
    http://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/1.1.1.RELEASE/

  6. Spring Cloud
    http://projects.spring.io/spring-cloud/

  7. The logback manual - Chapter 6: Layouts
    http://logback.qos.ch/manual/layouts.html

目次

  1. org.springframework.cloud:spring-cloud-dependencies の BOM で適用される spring-cloud-sleuth と spring-boot のバージョンは?
  2. Zipkin サーバを起動して Spring Integration の処理状況を表示する
    1. Zipkin の jar ファイルをダウンロードして起動する
    2. build.gradle を変更する
    3. application.properties を作成して必要な設定を記述する
    4. logback-spring.xml でログのレイアウトを変更する
    5. 動作確認
  3. Zipkin 上に表示される channel は Spring Integration DSL で書いたどの処理に該当するのか?
  4. urlCheckChannel, writeFileChannel, deleteFileChannel を全て QueueChannel に変更しても traceId は同じになるのか?
  5. 最後に

手順

org.springframework.cloud:spring-cloud-dependencies の BOM で適用される spring-cloud-sleuth と spring-boot のバージョンは?

Spring Cloud のページを見ると Spring Cloud を利用する時は Spring IO Platform とは別の BOM を利用するようです。Spring IO Platform では io.spring.platform:platform-bom:Athens-SR2 ですが、Spring Cloud の場合には org.springframework.cloud:spring-cloud-dependencies:Camden.SR4 と書かれています。

ただし Spring Cloud のページと Spring Cloud Sleuth のページで書かれている BOM が異なっており、Spring Cloud のページでは org.springframework.cloud:spring-cloud-dependencies:Camden.SR4Spring Cloud Sleuth のページでは org.springframework.cloud:spring-cloud-dependencies:Brixton.RELEASE でした。

Spring Cloud のページに BOM と適用されるバージョンが記述されています。常駐型アプリケーションでは Spring Boot の 1.4.3.RELEASE を使用していて Camden.SR4 では1つ古い 1.4.2.RELEASE が記述されていますが、同じ 1.4 系なので Camden.SR4 を使用して試してみることにします。

BOM spring-cloud-sleuth spring-boot
Camden.SR4 1.1.1.RELEASE 1.4.2.RELEASE
Brixton.SR7 1.0.11.RELEASE 1.3.8.RELEASE

。。。と書きましたが、BOM の末尾は Camden.SR4 ではなく Camden.RELEASE のように .RELEASE と書かないと正常に動作しません。Camden.SR4 と書くと Zipkin のグラフがなぜか左揃えで表示されますし、数百 ms で終了するはずの処理がなぜか 4 秒もかかるようになりました。

f:id:ksby:20170120002738p:plain

ちなみに Camden.RELEASE と書くと spring-cloud-sleuth は 1.0.9.RELEASE が使用されます。今回は以下のバージョンになります。

BOM spring-cloud-sleuth spring-boot
Camden.RELEASE 1.0.9.RELEASE 1.4.3.RELEASE

Zipkin サーバを起動して Spring Integration の処理状況を表示する

Zipkin の jar ファイルをダウンロードして起動する

  1. Zipkin のページにアクセスします。ページ左側の「Quickstart」リンクをクリックした後、「Java」のところの「latest release」リンクをクリックして zipkin-server-1.19.2-exec.jar をダウンロードします。

    f:id:ksby:20170118180355p:plain

  2. C:\zipkin ディレクトリを作成し、その下に zipkin-server-1.19.2-exec.jar を保存します。

  3. コマンドプロンプトを起動し、java -jar zipkin-server-1.19.2-exec.jar コマンドを実行して Zipkin サーバを起動します。

    f:id:ksby:20170118182420p:plain

    Zipkin って Sprint Boot で作られているんですね。

    f:id:ksby:20170118182615p:plain

    Started ZipkinServer in ... のメッセージが出たら起動完了です。

  4. ブラウザで http://localhost:9411/ にアクセスすると Zipkin の画面が表示されました。

    f:id:ksby:20170118183151p:plain

build.gradle を変更する

IntelliJ IDEA で ksbysample-eipapp-urlchecker プロジェクトを開いた後、build.gradle を リンク先のその1の内容 に変更します。変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

gradlew dependencies で依存関係を確認してみます。

compile - Dependencies for source set 'main'.
+--- org.springframework.boot:spring-boot-starter-integration: -> 1.4.3.RELEASE
|    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE
|    |    +--- org.springframework.boot:spring-boot:1.4.3.RELEASE
|    |    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |    \--- org.springframework:spring-context:4.3.5.RELEASE
|    |    |         +--- org.springframework:spring-aop:4.3.5.RELEASE
|    |    |         |    +--- org.springframework:spring-beans:4.3.5.RELEASE
|    |    |         |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |         +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         \--- org.springframework:spring-expression:4.3.5.RELEASE
|    |    |              \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework.boot:spring-boot-autoconfigure:1.4.3.RELEASE
|    |    |    \--- org.springframework.boot:spring-boot:1.4.3.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-logging:1.4.3.RELEASE
|    |    |    +--- ch.qos.logback:logback-classic:1.1.8
|    |    |    |    +--- ch.qos.logback:logback-core:1.1.8
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.21 -> 1.7.22
|    |    |    +--- org.slf4j:jcl-over-slf4j:1.7.22
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.22
|    |    |    +--- org.slf4j:jul-to-slf4j:1.7.22
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.22
|    |    |    \--- org.slf4j:log4j-over-slf4j:1.7.22
|    |    |         \--- org.slf4j:slf4j-api:1.7.22
|    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    \--- org.yaml:snakeyaml:1.17
|    +--- org.springframework.boot:spring-boot-starter-aop:1.4.3.RELEASE
|    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|    |    \--- org.aspectj:aspectjweaver:1.8.9
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE
|    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|    |    +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    +--- org.springframework:spring-messaging:4.3.5.RELEASE
|    |    |    +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |    +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework:spring-tx:4.3.5.RELEASE
|    |    |    +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    \--- org.springframework.retry:spring-retry:1.1.3.RELEASE -> 1.1.5.RELEASE
|    +--- org.springframework.integration:spring-integration-java-dsl:1.1.4.RELEASE -> 1.2.1.RELEASE
|    |    +--- org.springframework.integration:spring-integration-core:4.3.5.RELEASE -> 4.3.6.RELEASE (*)
|    |    \--- org.reactivestreams:reactive-streams:1.0.0
|    \--- org.springframework.integration:spring-integration-jmx:4.3.6.RELEASE
|         \--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
+--- org.springframework.integration:spring-integration-file: -> 4.3.6.RELEASE
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
|    \--- commons-io:commons-io:2.4
+--- org.springframework.integration:spring-integration-http: -> 4.3.6.RELEASE
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
|    \--- org.springframework:spring-webmvc:4.3.5.RELEASE
|         +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-core:4.3.5.RELEASE
|         +--- org.springframework:spring-expression:4.3.5.RELEASE (*)
|         \--- org.springframework:spring-web:4.3.5.RELEASE
|              +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|              +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|              +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|              \--- org.springframework:spring-core:4.3.5.RELEASE
+--- org.codehaus.janino:janino: -> 2.7.8
|    \--- org.codehaus.janino:commons-compiler:2.7.8
+--- org.springframework.cloud:spring-cloud-starter-zipkin: -> 1.0.9.RELEASE
|    +--- org.springframework.cloud:spring-cloud-starter-sleuth:1.0.9.RELEASE
|    |    +--- org.springframework.cloud:spring-cloud-starter:1.1.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.3.7.RELEASE -> 1.4.3.RELEASE (*)
|    |    |    +--- org.springframework.cloud:spring-cloud-context:1.1.3.RELEASE
|    |    |    |    \--- org.springframework.security:spring-security-crypto:4.0.4.RELEASE -> 4.1.4.RELEASE
|    |    |    +--- org.springframework.cloud:spring-cloud-commons:1.1.3.RELEASE
|    |    |    |    \--- org.springframework.security:spring-security-crypto:4.0.4.RELEASE -> 4.1.4.RELEASE
|    |    |    \--- org.springframework.security:spring-security-rsa:1.0.3.RELEASE
|    |    |         \--- org.bouncycastle:bcpkix-jdk15on:1.55 -> 1.54
|    |    |              \--- org.bouncycastle:bcprov-jdk15on:1.54
|    |    +--- org.springframework.boot:spring-boot-starter-web:1.3.7.RELEASE -> 1.4.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    |    +--- org.springframework.boot:spring-boot-starter-tomcat:1.4.3.RELEASE
|    |    |    |    +--- org.apache.tomcat.embed:tomcat-embed-core:8.5.6
|    |    |    |    +--- org.apache.tomcat.embed:tomcat-embed-el:8.5.6
|    |    |    |    \--- org.apache.tomcat.embed:tomcat-embed-websocket:8.5.6
|    |    |    |         \--- org.apache.tomcat.embed:tomcat-embed-core:8.5.6
|    |    |    +--- org.hibernate:hibernate-validator:5.2.4.Final
|    |    |    |    +--- javax.validation:validation-api:1.1.0.Final
|    |    |    |    +--- org.jboss.logging:jboss-logging:3.2.1.Final -> 3.3.0.Final
|    |    |    |    \--- com.fasterxml:classmate:1.1.0 -> 1.3.3
|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.8.5
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.8.0 -> 2.8.5
|    |    |    |    \--- com.fasterxml.jackson.core:jackson-core:2.8.5
|    |    |    +--- org.springframework:spring-web:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-webmvc:4.3.5.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-actuator:1.3.7.RELEASE -> 1.4.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    |    \--- org.springframework.boot:spring-boot-actuator:1.4.3.RELEASE
|    |    |         +--- org.springframework.boot:spring-boot:1.4.3.RELEASE (*)
|    |    |         +--- org.springframework.boot:spring-boot-autoconfigure:1.4.3.RELEASE (*)
|    |    |         +--- com.fasterxml.jackson.core:jackson-databind:2.8.5 (*)
|    |    |         +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         \--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-aop:1.3.7.RELEASE -> 1.4.3.RELEASE (*)
|    |    \--- org.springframework.cloud:spring-cloud-sleuth-core:1.0.9.RELEASE
|    |         +--- org.springframework:spring-context:4.2.7.RELEASE -> 4.3.5.RELEASE (*)
|    |         \--- org.aspectj:aspectjrt:1.8.9
|    \--- org.springframework.cloud:spring-cloud-sleuth-zipkin:1.0.9.RELEASE
|         +--- org.springframework.cloud:spring-cloud-sleuth-core:1.0.9.RELEASE (*)
|         +--- io.zipkin.java:zipkin:1.11.1
|         +--- io.zipkin.reporter:zipkin-reporter:0.5.0
|         |    \--- io.zipkin.java:zipkin:1.11.1
|         \--- io.zipkin.reporter:zipkin-sender-urlconnection:0.5.0
|              +--- io.zipkin.reporter:zipkin-reporter:0.5.0 (*)
|              \--- io.zipkin.java:zipkin:1.11.1
+--- org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE (*)
\--- org.projectlombok:lombok:1.16.12

Camden の Spring Boot のバージョンは 1.4 系なのかと思ったら org.springframework.boot:spring-boot-starter:1.3.7.RELEASE -> 1.4.3.RELEASE (*) と 1.3 系が表示されますね ( 1.4 系に変更されていますが )。

また org.springframework.boot:spring-boot-starter-web が入っています。これがあると Tomcat が起動してしまうので、依存関係から除外します。build.gradle を リンク先のその2の内容 に変更した後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

application.properties を作成して必要な設定を記述する

  1. src/main/resources の下に application.properties を新規作成した後、リンク先の内容 を記述します。

logback-spring.xml でログのレイアウトを変更する

  1. src/main/resources の下の logback-spring.xmlリンク先の内容 に変更します。この変更を行うことで、ログに “boostrap” ではなく application.properties の spring.application.name に設定した文字列が出力されるようになります。

    f:id:ksby:20170118203648p:plain

    f:id:ksby:20170118204456p:plain

動作確認

  1. bootRun タスクを実行し、アプリケーションを起動します。

  2. in フォルダに 2014.txt を置きます。処理が実行されてコンソールにログが出力されます。"urlchecker" の次が traceId で1回の処理で全て同じ値が出力されています。

    f:id:ksby:20170120010113p:plain

    Zipkin の画面には今回のデータが表示され、

    f:id:ksby:20170120010315p:plain

    クリックすると詳細な状況が表示されます。並列で処理されていることが分かります。

    f:id:ksby:20170120010524p:plain

    その中のバーの1つをクリックすると payload の情報や traceId, spanId が掲載されたダイアログが表示されます。

    f:id:ksby:20170120010655p:plain

  3. Zipkin のグラフを見た感想ですが、

    • 内部的に urllistfilepollerflow.channel#1urllistfilepollerflow.channel#2 といった channel が作成されていることに初めて気づきました。でもどれが何の処理なのか全然分かりませんね。。。
    • IntegrationFlows の .from(...).handle(...) 等のメソッド間のデータはどうも chennel 経由でやり取りされているようです。

Zipkin 上に表示される channel は Spring Integration DSL で書いたどの処理に該当するのか?

起動時のログに channel と割り当てられている処理が出力されていますので、それを見れば分かります。

f:id:ksby:20170120012800p:plain

例えば urllistfilepollerflow.channel#1urllistfilepollerflow.channel#2 は以下の処理が該当します。

channel 処理
urllistfilepollerflow.channel#1 .split(new FileSplitter())
urlListFilePollerFlow.channel#2 .headerFilter(MESSAGE_HEADER_SEQUENCE_SIZE, false)

urlCheckChannel, writeFileChannel, deleteFileChannel を全て QueueChannel に変更しても traceId は同じになるのか?

  1. DirectChannel ではなく QueueChannel でデータをやり取りするよう src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先の内容 に変更します。

  2. bootRun タスクを実行し、アプリケーションを起動します。

  3. in フォルダに 2014.txt を置きます。処理が実行されてコンソールにログが出力されます。traceId は QueueChannel の時でも同じ値でした。

    f:id:ksby:20170120021319p:plain

    グラフは少し遅延が途中に入る程度のように見えます。

    f:id:ksby:20170120021543p:plain

Message の header を見ると traceId 等の情報が埋め込まれています。

GenericMessage [
payload=http://ksby.hatenablog.com/entry/2014/12/27/233427
, headers={
    sequenceNumber=1
    , file_name=2014.txt
    , sequenceSize=2
    , X-B3-ParentSpanId=553e9dfb0d0b90b3
    , X-Message-Sent=true
    , messageSent=true
    , file_originalFile=C:\eipapp\ksbysample-eipapp-urlchecker\in\2014.txt
    , spanName=message:urlListFilePollerFlow.channel#3
    , lines.size=2
    , spanTraceId=13b92684b6371460
    , spanId=4f65cf944c245ba
    , spanParentSpanId=553e9dfb0d0b90b3
    , X-Span-Name=message:urlListFilePollerFlow.channel#3
    , X-B3-SpanId=4f65cf944c245ba
    , currentSpan=[
        Trace: 13b92684b6371460
        , Span: 4f65cf944c245ba
        , Parent: 553e9dfb0d0b90b3
        , exportable:true
    ]
    , X-B3-Sampled=1
    , X-B3-TraceId=13b92684b6371460
    , correlationId=43f57730-e42f-74a8-dcbe-e28d549cd0f3
    , id=5db6324f-313f-df80-4d5b-61b93ef48096
    , X-Current-Span=[
        Trace: 13b92684b6371460
        , Span: 4f65cf944c245ba
        , Parent: 553e9dfb0d0b90b3
        , exportable:true
    ]
    , spanSampled=1
    , timestamp=1484848137184
}]

最後に

並列処理の状況がこんな簡単に可視化できるとは、Spring Cloud Sleuth+Zipkin いいですね! Spring Cloud で分散処理させている時に可視化するのに便利なソフトウェアとは聞いていましたが、Spring Integration の処理状況も表示させることができるとは意外でした。

ソースコード

build.gradle

■その1

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.springframework.integration:spring-integration-http")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • dependencyManagement に mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE' を追加します。
  • dependencies に compile("org.springframework.cloud:spring-cloud-starter-zipkin") を追加します。

■その2

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }
  • dependencies の compile("org.springframework.cloud:spring-cloud-starter-zipkin") の後に { exclude module: "spring-boot-starter-web" } を追加します。

application.properties

spring.application.name=urlchecker
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0
  • 上の3行を追加します。spring.application.name に設定した文字列が Zipkin の画面に表示されます。

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>
  • <springProperty scope="context" name="springAppName" source="spring.application.name"/> を追加します。
  • <property name="CONSOLE_LOG_PATTERN" value="..."/> を追加します。JSON Logback with Logstash の「Logback setup」には %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},... と書かれているのですが、これだと traceId が2ヶ所出力されてしまうので %clr(${LOG_LEVEL_PATTERN:-%5p})%clr(${level:-%5p}) へ変更しています。また左側に出力される情報が多く、ログの右側の文字列が見えなくなるため ,%X{X-B3-ParentSpanId:-} は削除しました。

FlowConfig.java

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";
    private final static String MESSAGE_HEADER_SEQUENCE_SIZE = "sequenceSize";
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";
    private final static String MESSAGE_HEADER_FILE_NAME = "file_name";
    private final static String MESSAGE_HEADER_FILE_ORIGINALFILE = "file_originalFile";

    @Bean
    public MessageChannel urlCheckChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel deleteFileChannel() {
        return new QueueChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .headerFilter 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header から "sequenceSize" というキーの header を削除する
                .headerFilter(MESSAGE_HEADER_SEQUENCE_SIZE, false)
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_SEQUENCE_SIZE
                        , m -> m.getHeaders().get(MESSAGE_HEADER_LINES_SIZE)))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    return p;
                }, e -> e.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((GenericHandler<Object>) (p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS)).build()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(MESSAGE_HEADER_FILE_NAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((GenericHandler<Object>) (p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(MESSAGE_HEADER_FILE_ORIGINALFILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                }, e -> e.poller(Pollers.fixedDelay(1000)))
                .get();
    }

}
  • urlCheckChannel Bean, writeFileChannel Bean, deleteFileChannel Bean を全て return new DirectChannel();return new QueueChannel(); へ変更します。
  • urlCheckFlow メソッドの .from(urlCheckChannel()) の後に .handle((GenericHandler<Object>) (p, h) -> { return p; }, e -> e.poller(Pollers.fixedDelay(1000))) を追加します。
  • writeFileFlow メソッドの最初の .handle(...) の第2引数に , e -> e.poller(Pollers.fixedDelay(1000)) を追加します。
  • deleteFileFlow メソッドの最初の .handle(...) の第2引数に , e -> e.poller(Pollers.fixedDelay(1000)) を追加します。

履歴

2017/01/20
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する )

概要

記事一覧はこちらです。

  • Spring Integration は DSL を使った時の方が XML ファイルや DSL なしの Java Config の時よりも面白いです。DSL に慣れるためにもいくつかサンプルを作っていこうと思います。
  • Spring Integration DSL を使用して、以下の処理を行う常駐型アプリケーションを作成します。
    1. in ディレクトリに URL 一覧が記入された拡張子が .txt のファイルが置かれているか 1 秒間隔でチェックします。
    2. ファイルが置かれていたら読み込んで行単位に分割します。
    3. URL が “http://” で始まる場合には、その URL にアクセス可能か ( HTTP ステータスコードの 200 が返るか ) チェックします。チェック処理はスレッドを生成して並行処理します。
    4. URL と HTTP ステータスコードを出力した結果ファイルを out ディレクトリに作成します。出力する時には元の .txt ファイルに書かれていた順になるようにします。
    5. 元の .txt ファイルを削除します。
  • 今回作る処理を Spring Integration のフロー図で描こうとすると channel がいちいち途中に挿入されて分かりにくいので、今回は省略します。というか、フロー図を描こうとして改めて DSL だと複雑な処理でも結構簡単に書けるんだなということに気づきましたね。。。

参照したサイト・書籍

  1. spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/
    https://github.com/spring-projects/spring-integration-java-dsl/tree/master/src/test/java/org/springframework/integration/dsl/test

目次

  1. ksbysample-eipapp-urlchecker プロジェクトを作成する
  2. in, out ディレクトリを作成する
  3. urlListFilePollerFlow を実装する
  4. urlCheckFlow を実装する
  5. writeFileFlow を実装する
  6. deleteFileFlow を実装する
  7. 動作確認
  8. メモ書き
    1. .channel(c -> c.executor(taskExecutor())) を入れるとどう処理が変わるのか?
    2. urlListFilePollerFlow でわざわざ sequenceSize header の値を付ける処理を入れている理由とは?

手順

ksbysample-eipapp-urlchecker プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.urlchecker パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/urlchecker の下に Application.java を作成し、リンク先の内容 を記述します。

  4. ログの出力は INFO レベルからにするため、src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

in, out ディレクトリを作成する

C:/eipapp/ksbysample-eipapp-dirpoller を作成し、その下に in, out ディレクトリを作成します。以下の構成になります。

C:/eipapp/ksbysample-eipapp-urlchecker
├ in
└ out

urlListFilePollerFlow を実装する

以降の実装中に動作確認をする際には、以下の内容を記述した sample.txt というファイルを使用します。

http://projects.spring.io/spring-boot/
http://projects.spring.io/spring-integration/
http://ksby.hatenablog.com/entry/2017/01/13/083621
http://ksby.hatenablog.com/entry/2017/01/11/054343
http://ksby.hatenablog.com/entry/2017/01/08/064129
  1. src/main/java/ksbysample/eipapp/urlchecker の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

urlCheckFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその2の内容 に変更します。

    この時点で bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のようにログが出力されて、複数スレッドで並行処理されていることが分かります。

    f:id:ksby:20170115203712p:plain

writeFileFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその3の内容 に変更します。

    この時点で bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のようにログが出力されます。.resequence() の後でファイルに書かれていた順番に変更されていること、.aggregate() の後で 1 つの List の 1 Message に集約されていること、が確認できます。

    f:id:ksby:20170115210521p:plain

deleteFileFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその4の内容 に変更します。

動作確認

動作確認します。以下のファイルを作成します。

■2014.txt

http://ksby.hatenablog.com/entry/2014/12/27/233427
http://ksby.hatenablog.com/entry/2014/12/29/175849

■2015.txt

http://ksby.hatenablog.com/entry/2015/04/18/194947
http://ksby.hatenablog.com/entry/2015/06/28/203752
http://ksby.hatenablog.com/entry/2015/07/22/232807
http://ksby.hatenablog.com/entry/2015/09/30/212728
http://ksby.hatenablog.com/entry/2015/12/27/211513

■2016.txt

http://ksby.hatenablog.com/entry/2016/02/17/103928
http://ksby.hatenablog.com/entry/2016/05/31/014201
http://ksby.hatenablog.com/entry/2016/12/29/230805

bootRun タスクを実行した後、in ディレクトリに 2014.txt, 2015.txt, 2016.txt を入れます。少しすると in ディレクトリからファイルが全てなくなり、out ディレクトリに 2014.txt, 2015.txt, 2016.txt が出来ました。

ログは以下のように出力されており、特に問題はありませんでした。

f:id:ksby:20170115225308p:plain f:id:ksby:20170115225517p:plain

out ディレクトリに出力された 2014.txt, 2015.txt, 2016.txt は以下の内容で、HTTPステータスコード付きで、元ファイルの順番通り出力されています。

■2014.txt

http://ksby.hatenablog.com/entry/2014/12/27/233427,200
http://ksby.hatenablog.com/entry/2014/12/29/175849,200

■2015.txt

http://ksby.hatenablog.com/entry/2015/04/18/194947,200
http://ksby.hatenablog.com/entry/2015/06/28/203752,200
http://ksby.hatenablog.com/entry/2015/07/22/232807,200
http://ksby.hatenablog.com/entry/2015/09/30/212728,200
http://ksby.hatenablog.com/entry/2015/12/27/211513,200

■2016.txt

http://ksby.hatenablog.com/entry/2016/02/17/103928,200
http://ksby.hatenablog.com/entry/2016/05/31/014201,200
http://ksby.hatenablog.com/entry/2016/12/29/230805,200

メモ書き

.channel(c -> c.executor(taskExecutor())) を入れるとどう処理が変わるのか?

.channel(c -> c.executor(taskExecutor())) を入れると、それ以降の処理が別スレッドで実行されるようになります。

urlListFilePollerFlow を以下のように変更して bootRun タスクを実行します。

  • .channel(c -> c.executor(taskExecutor())) を全てコメントアウトします。
  • .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE, ... 内に log.info(String.valueOf(lines.size())); を追加して、取得した行数をログに出力します。
    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
//                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            log.info(String.valueOf(lines.size()));
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して enrichHeaders 以降の処理を更に別のスレッドで並行処理する
//                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

sample.txt を in ディレクトリに置きます。

ログは以下のように出力され、処理が全て ask-scheduler-4 スレッド ( MessageSource を polling しているため main スレッドとは別に作成されるタスクスケジュール用のスレッド ) で実行されていることが確認できます。

f:id:ksby:20170115183038p:plain

次はコメントアウトしていた .channel(c -> c.executor(taskExecutor())) を元に戻して bootRun タスクを実行し直します。sample.txt を in ディレクトリに置くと以下のログが出力されて各処理がそれぞれ別のスレッドで実行されていることが確認できます。

f:id:ksby:20170115183610p:plain

urlListFilePollerFlow でわざわざ sequenceSize header の値を付ける処理を入れている理由とは?

urlListFilePollerFlow を以下のように変更し、.split(new FileSplitter()) の前後で header の内容がどう変わるかみてみます。

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
//                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
//                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
//                        , m -> {
//                            List<String> lines;
//                            try {
//                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
//                            } catch (IOException e) {
//                                throw new RuntimeException(e);
//                            }
//                            return lines.size();
//                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })
                .split(new FileSplitter())
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })

bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のログが出力されます。

f:id:ksby:20170115190040p:plain

このログを見ると .split(new FileSplitter()) の後だと header に以下のキーが追加されています。

  • sequenceNumber
  • file_name
  • sequenceSize
  • correlationId
  • file_originalFile

sequenceSize header の値を付ける処理を入れている理由ですが、以下の通りです。

  • FileSplitter で行毎に分割している場合 sequenceNumber に値を割り当ててはくれますが sequenceSize は 0 です。
  • この後 writeFileFlow で .resequence(), .aggregate() の処理を行いますが、この2つの処理はデフォルトでは 同じ correlationId のデータを sequenceSize 分のデータを受信してから処理する という動作をします。
  • そのため、このままでは sequenceSize = 0 なので .resequence(), .aggregate() が動作しません。それで正しい値を付ける処理を入れています。

以下のように .split(new FileSplitter()) の後のログ出力処理の位置を変更して実行してみます。

                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })

今度は sequenceSize=5 とファイルの行数がセットされていることが確認できます。

f:id:ksby:20170115191820p:plain

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.springframework.integration:spring-integration-http")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • Spring Integration の File Support を使用するので compile('org.springframework.integration:spring-integration-file') を記述します。
  • Spring Integration の HTTP Support は使用しませんが、RestTemplate を使用するために compile('org.springframework.integration:spring-integration-http') を記述します。
  • Spring Integration DSL は最新の 1.2 を使いたいので compile('org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE') を記述します。

Application.java

package ksbysample.eipapp.urlchecker;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

FlowConfig.java

■その1

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.messaging.MessageChannel;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    @Bean
    public MessageChannel urlCheckChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                .channel(nullChannel)
                .get();
    }

}
  • urlListFilePollerFlow 内の各処理についてはコメントを参照。
  • この時点でも動作できるようにするために urlCheckFlow, writeFileFlow を仮実装で記述しておきます。

■その2

@Slf4j
@Configuration
public class FlowConfig {


    ..........
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";

    ..........

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    ..........

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    ..........

}

■その3

@Slf4j
@Configuration
public class FlowConfig {


    ..........
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";


    ..........

    @Bean
    public MessageChannel deleteFileChannel() {
        return new DirectChannel();
    }

    ..........

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS))
                        .build())
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(FileHeaders.FILENAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                .channel(nullChannel)
                .get();
    }

}

■その4

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(FileHeaders.ORIGINAL_FILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                })
                .get();
    }

}
  • 上記の記述を追加する以外に nullChannel のフィールドとコンストラクタを削除します。

■完成形

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";

    @Bean
    public MessageChannel urlCheckChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel deleteFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS))
                        .build())
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(FileHeaders.FILENAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(FileHeaders.ORIGINAL_FILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                })
                .get();
    }

}

履歴

2017/01/15
初版発行。
2017/01/18
* build.gradle の dependencies の記述を ‘’ → “” へ統一しました。
* logback-spring.xml から使用していない記述を削除しました。
2017/01/22
* 以下の3つの定数文字列は Spring Integration で定義済だったので、それを使用するよう変更しました。
* “sequenceSize” を MESSAGE_HEADER_SEQUENCE_SIZE → IntegrationMessageHeaderAccessor.SEQUENCE_SIZE へ変更。
* “file_name” は MESSAGE_HEADER_FILE_NAME → FileHeaders.FILENAME へ変更。
* “file_originalFile” は MESSAGE_HEADER_FILE_ORIGINALFILE → FileHeaders.ORIGINAL_FILE へ変更。
* .enrichHeaders(h -> h.headerFunction.enrichHeaders(h -> h.headerExpression へ変更すれば値を .headerFilter で削除しなくても値を上書きできるので変更しました。

Spring Boot + Spring Integration でいろいろ試してみる ( その9 )( Pollers.fixedRate で待機時間を指定しても意味がない場合がある? )

概要

記事一覧はこちらです。

  • Spring Integration DSL の Pollers.fixedRate で次の処理までの待機時間をミリ秒で指定できますが、単純に「処理をする」→「指定されたミリ秒待機する」→「処理をする」→。。。、という処理だと漠然と思っていたら、QueueChannel からデータを取得する場合には少し違っていたというお話です。

参照したサイト・書籍

目次

  1. ksbysample-eipapp-pollertest プロジェクトを作成する
  2. poller の動作確認をするための実装をする
  3. 動作確認
  4. 結局どのように処理されているのか?

手順

ksbysample-eipapp-pollertest プロジェクトを作成する

まずは動作確認をするためのプロジェクトを作成します。

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

poller の動作確認をするための実装をする

  1. 以下の仕様のプログラムを実装します。

    f:id:ksby:20170113001933p:plain

    • MessageSource として呼び出されたら必ず Person オブジェクトの ArrayList を返すものを作成します。
    • getFlow では 1秒毎に MessageSource からデータを取得し、ログを出力した後 nextChannel にデータを渡します。
    • nextChannel は次の nextFlow でもポーリングをしたいので、QueueChannel として作成します。
    • nextFlow では nextChannel から 5秒毎にデータを取得してログを出力します。
  2. src/main/java の下に ksbysample.eipapp.pollertest パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/pollertest の下に Application.java を作成し、リンク先の内容 を記述します。

  4. src/main/java/ksbysample/eipapp/pollertest の下に Person.java を作成し、リンク先の内容 を記述します。

  5. src/main/java/ksbysample/eipapp/pollertest の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

動作確認

  1. bootRun タスクを実行します。

    f:id:ksby:20170113011005p:plain

    • nextFlow の poller の initialDelay (第2引数) に設定した 10秒後に nextFlow の処理が実行されて、getFlow が nextChannel に渡したデータ 11件が出力されます。
    • ただしその後は getFlow, nextFlow がほぼ同時に出力され続けており、nextFlow の poller の period (第1引数) に設定した 10秒がどうも機能していません???
  2. 起動したアプリケーションを停止します。

  3. src/main/java/ksbysample/eipapp/pollertest の下の FlowConfig.javaリンク先のその2の内容 に変更します。

  4. 再度 bootRun タスクを実行します。

    f:id:ksby:20170113012830p:plain

    • 今度は nextFlow はひたすら実行されたりはせずに、だいたい指定された 8秒毎に実行されているようです。

結局どのように処理されているのか?

  • MessageSource からデータを取得する場合には Pollers.fixedRate で指定された待機時間がそのまま機能する。
  • QueueChannel から取得する場合、QueueChannel をチェックしてデータがなくなって初めて待機するようになり、Pollers.fixedRate で指定された待機時間後にまた処理が実行される。QueueChannel にデータが次々と流れてくる状況では Pollers.fixedRate で待機時間を指定しても処理は待機することなく次々と実行されて実質意味がない。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile('org.springframework.boot:spring-boot-starter-integration')
    compile('org.springframework.integration:spring-integration-java-dsl')
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}

Application.java

package ksbysample.eipapp.pollertest;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

Person.java

package ksbysample.eipapp.pollertest;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Person {

    private String name;

    private int age;

}

FlowConfig.java

■その1

package ksbysample.eipapp.pollertest;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Configuration
public class FlowConfig {

    @Bean
    public MessageSource<List<Person>> personListMessageSource() {
        return () -> {
            List<Person> personList = new ArrayList<>();
            personList.add(new Person("tanaka", 35));
            personList.add(new Person("suzuki", 28));
            personList.add(new Person("kimura", 41));
            return MessageBuilder.withPayload(personList).build();
        };
    }

    @Bean
    public MessageChannel nextChannel() {
        return new QueueChannel(100);
    }

    @Bean
    public IntegrationFlow getFlow() {
        // 指定された時間毎に personListMessageSource からデータを取得して
        // ログを出力した後、nextChannel にデータを渡す
        return IntegrationFlows.from(personListMessageSource()
                , c -> c.poller(Pollers.fixedRate(1000)))
                .handle((p, h) -> {
                    log.info("★★★ getFlow");
                    return p;
                })
                .channel(nextChannel())
                .get();
    }

    @Bean
    public IntegrationFlow nextFlow() {
        // 指定された時間毎に nextChannel からデータを取得してログを出力する
        return IntegrationFlows.from(nextChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    log.info("■■■ nextFlow");
                    return null;
                }, c -> c.poller(Pollers.fixedRate(5000, 5000)))
                .get();
    }

}

■その2

    @Bean
    public IntegrationFlow getFlow() {
        // 指定された時間毎に personListMessageSource からデータを取得して
        // ログを出力した後、nextChannel にデータを渡す
        return IntegrationFlows.from(personListMessageSource()
                , c -> c.poller(Pollers.fixedRate(2000)))
                .handle((p, h) -> {
                    log.info("★★★ getFlow");
                    return p;
                })
                .channel(nextChannel())
                .get();
    }

    @Bean
    public IntegrationFlow nextFlow() {
        // 指定された時間毎に nextChannel からデータを取得してログを出力する
        return IntegrationFlows.from(nextChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    log.info("■■■ nextFlow");
                    return null;
                }, c -> c.poller(Pollers.fixedRate(8000, 5000)))
                .get();
    }
  • getFlow 内の Pollers.fixedRate に指定するミリ秒数を 1000 → 2000 に変更します。
  • nextFlow 内の Pollers.fixedRate に指定するミリ秒数を 5000 → 8000 に変更します。

履歴

2017/01/13
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その8 )( MySQL のテーブルのデータを取得して PostgreSQL のテーブルへ登録する常駐型アプリケーションを作成する )

概要

記事一覧はこちらです。

  • Spring Integration の 18. JDBC Support の機能を利用して、以下の処理を行う常駐型アプリケーションを作成してみます。
    1. MySQL の orders テーブルにデータを登録します。データには status を持たせ、登録時には status = ‘00’ にします。
    2. 登録されたデータを常駐型アプリケーションで取得します。取得時に status = ‘01’ に更新します。
    3. 取得したデータを PostgreSQL の orders テーブルに登録します。
    4. 登録できたデータを MySQL の orders テーブルから削除します。
  • Spring Integration のフロー図で描くと以下の構成になります。 f:id:ksby:20170109102124p:plain
  • 今回 Spring Integration DSL も使用します。XML ファイルは使用しません。
  • MySQLPostgreSQL は以前インストールした 5.6、9.4 を使用します。
  • Spring Boot は 1.4.3 を、Spring Integration は 4.3.6 を使用します。どちらも Spring IO Platform の Athens-SR2 を利用して指定します。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 18. JDBC Support
    http://docs.spring.io/spring-integration/docs/4.3.6.RELEASE/reference/html/jdbc.html

  2. spring-integration/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java
    https://github.com/spring-projects/spring-integration/blob/master/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java

    • JdbcMessageHandler の使い方を参考にしました。
  3. Spring Integration Java DSL Reference
    https://github.com/spring-projects/spring-integration-java-dsl/wiki/spring-integration-java-dsl-reference

  4. Spring Integration Java DSL: Line by line tutorial
    https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

    • Spring Integration DSL の tutorial です。
  5. Spring Integration Java DSL sample - further simplification with Jms namespace factories
    http://www.java-allandsundry.com/2014/06/spring-integration-java-dsl-sample.html

  6. Spring Integration Java DSL sample
    https://dzone.com/articles/spring-integration-java-dsl

  7. JDBC Spring Integration with Annotations
    http://stackoverflow.com/questions/27247013/jdbc-spring-integration-with-annotations

目次

  1. ksbysample-eipapp-datacopy プロジェクトを作成する
  2. MySQL と PostgreSQL に orders テーブルを作成する
  3. データベースの設定を記述し、DataSource 等の Bean を定義する
  4. copyChannel, delChannel を作成する
  5. getFlow を作成する
  6. copyFlow を作成する
  7. delFlow を作成する
  8. 動作確認
  9. 感想

手順

ksbysample-eipapp-datacopy プロジェクトを作成する

  1. feature/6-issue ブランチを作成します。

  2. IntelliJ IDEA の「Welcome to IntelliJ IDEA」ダイアログを表示した後、「Create New Project」をクリックします。

    f:id:ksby:20170109165105p:plain

  3. 「New Project」ダイアログが表示されます。画面左側で「Gradle」を選択した後、画面右側は何も変更せずに「Next」ボタンをクリックします。

    f:id:ksby:20170109165235p:plain

  4. GroupId、ArtifactId を入力する画面が表示されます。以下の画像の文字列を入力した後、「Next」ボタンをクリックします。

    f:id:ksby:20170109165833p:plain

  5. 次の画面が表示されます。「Create directories for empty content roots automatically」をチェックした後、「Next」ボタンをクリックします。

    f:id:ksby:20170109170709p:plain

  6. Project name、Project location を入力する画面が表示されます。以下の画像の文字列を入力した後、「Finish」ボタンをクリックします。

    f:id:ksby:20170109171103p:plain

  7. build.gradle を リンク先の内容 に変更します。

  8. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

  9. src/main/java の下に ksbysample.eipapp.datacopy パッケージを作成します。

  10. src/main/java/ksbysample/eipapp/datacopy の下に Application.java を新規作成します。作成後、リンク先の内容 に変更します。

  11. 以下のディレクトリの下に .gitkeep ファイルを作成します。

    • src/main/groovy
    • src/main/resources
    • src/test/groovy
    • src/test/java
    • src/test/resources
  12. この時点で Project Tool Window は以下の状態になります。

    f:id:ksby:20170109182746p:plain

※この時点ではまだアプリケーションは起動できません。起動しようとしてもデータベースの接続設定を記述していないためエラーになります。

MySQLPostgreSQL に orders テーブルを作成する

  1. IntelliJ IDEA の Database Tool Window から MySQL の world データベース、PostgreSQL の ksbylending データベースにアクセスできるように設定します。

  2. まず MySQL の world データベースに接続する設定を追加します。Database Tool Window の左上から「New」-「Data Source」-「MySQL」を選択します。

    f:id:ksby:20170109190150p:plain

  3. 「Data Sources and Drivers」ダイアログが表示されます。以下の画像の値を入力後、「OK」ボタンをクリックします。

    f:id:ksby:20170109190831p:plain

  4. 次に PostgreSQL の ksbylending データベースに接続する設定を追加します。Database Tool Window の左上から「New」-「Data Source」-「PostgreSQL」を選択します。

    f:id:ksby:20170109191218p:plain

  5. 「Data Sources and Drivers」ダイアログが表示されます。以下の画像の値を入力後、「OK」ボタンをクリックします。

    f:id:ksby:20170109191501p:plain

  6. この時点で Database Tool Window は以下の状態です。

    f:id:ksby:20170109193208p:plain

  7. MySQL の world データベースに orders テーブルを作成します。Database Tool Window で world データベースを選択してコンテキストメニューを表示した後、「New」-「Table」を選択します。

    f:id:ksby:20170109200601p:plain

  8. 「Create New Table」ダイアログが表示されます。以下の画像のデータを入力後、「Execute」ボタンをクリックします。

    f:id:ksby:20170109202312p:plain

  9. PostgreSQL の ksbylending データベースの方でも「Create New Table」ダイアログを表示し、以下の画像のデータを入力後、「Execute」ボタンをクリックします。

    f:id:ksby:20170109203836p:plain

  10. orders テーブル作成後は以下の状態になります。

    f:id:ksby:20170109204800p:plain

データベースの設定を記述し、DataSource 等の Bean を定義する

  1. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy の下に config パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/datacopy/config の下に ApplicationConfig.java を作成し、リンク先の内容 を記述します。

  4. src/main/resources/.gitkeep を削除します。

  5. データベースの接続設定を記述しましたのでアプリケーションを起動してみます。Gradle Tool Window から bootRun タスクを実行してアプリケーションが起動してエラーが出ないことを確認します。

    f:id:ksby:20170109213305p:plain

    アプリケーションはエラーが出ずに起動しましたが、Channel も何も作成していないとすぐに終了するんですね。。。

copyChannel, delChannel を作成する

  1. src/main/java/ksbysample/eipapp/datacopy の下に integration.channel パッケージを作成します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/channel の下に ChannelConfig.java を作成し、リンク先の内容 を記述します。

getFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy の下に dto パッケージを作成します。

  2. src/main/java/ksbysample/eipapp/datacopy/dto の下に OrdersDto.java を作成し、リンク先の内容 を記述します。

  3. src/main/java/ksbysample/eipapp/datacopy/dto の下に OrdersDtoRowMapper.java を作成し、リンク先の内容 を記述します。

  4. src/main/java/ksbysample/eipapp/datacopy/integration の下に flow パッケージを作成します。

  5. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

copyFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に CopyOrdersMessageHandler.java を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下の FlowConfig.javaリンク先のその2の内容 に変更します。

delFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に DelOrdersMessageHandler.java を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下の FlowConfig.javaリンク先のその3の内容 に変更します。

動作確認

  1. bootRun タスクを実行してアプリケーションを起動します。各データベースの orders テーブルには何も登録されていません。

    f:id:ksby:20170110224012p:plain

  2. MySQL の world データベースの orders テーブルにデータを登録して commit してみます。

    f:id:ksby:20170110224226p:plain

    status が ‘01’ に更新されます。

    f:id:ksby:20170110224345p:plain

    PostgreSQL の ksbylending データベースの orders テーブルにデータが登録されます。

    f:id:ksby:20170110224502p:plain

    world データベースの orders テーブルのデータが削除されます。

    f:id:ksby:20170110224651p:plain

  3. 登録済の order_id = ‘00000001’ のデータを再度登録して commit してみます。

    f:id:ksby:20170110225011p:plain

    status = ‘01’ に更新されますが、ksbylending データベースの orders テーブルには変化はありません。

    f:id:ksby:20170110225136p:plain

    ログには一意性制約違反のエラーが出力されます。

    f:id:ksby:20170110225538p:plain

  4. bootRun で起動したアプリケーションを停止します。

  5. commit して、develop, master ブランチへマージします。

感想

  • Spring Integration DSL が使いやすいです。Web に出ているサンプルを見ていた時には分かりにくい印象があったのですが、実際に使ってみると書きやすいですね。XML ファイルを使わないなら DSL を使うことをお勧めします。poller や transaction も DSL を使わない場合と比較するとかなり定義しやすいです。
  • JDBC Support の機能も簡単な処理なら実装しやすいです。ただし使い方を調べるのにサンプルが少なくて手こずりました。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    def jdbcDriverMySQL = "mysql:mysql-connector-java:6.0.5"
    def jdbcDriverPgSQL = "org.postgresql:postgresql:9.4.1212"

    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile('org.springframework.boot:spring-boot-starter-integration')
    compile('org.springframework.integration:spring-integration-jdbc')
    compile('org.springframework.integration:spring-integration-java-dsl')
    compile('org.springframework.boot:spring-boot-starter-jdbc')
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    runtime("${jdbcDriverMySQL}")
    runtime("${jdbcDriverPgSQL}")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • Spring Integration の JDBC Support を利用するので compile('org.springframework.integration:spring-integration-jdbc') を記述します。また Spring Data JDBC も利用するので compile('org.springframework.boot:spring-boot-starter-jdbc') を記述します。
  • Spring Integration DSL を使用するので compile('org.springframework.integration:spring-integration-java-dsl') を記述します。
  • MySQLPostgreSQL の両方のデータベースにアクセスするので、それぞれの JDBC Driver を記述します。

Application.java

package ksbysample.eipapp.datacopy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

application.properties

spring.datasource.world.url=jdbc:mysql://localhost/world?serverTimezone=JST
spring.datasource.world.username=root
spring.datasource.world.password=xxxxxxxx
spring.datasource.world.driverClassName=com.mysql.cj.jdbc.Driver

spring.datasource.ksbylending.url=jdbc:postgresql://localhost/ksbylending
spring.datasource.ksbylending.username=ksbylending_user
spring.datasource.ksbylending.password=xxxxxxxx
spring.datasource.ksbylending.driverClassName=org.postgresql.Driver
  • MySQL の world データベース、PostgreSQL の ksbylending データーベースへの接続情報を記述します。

ApplicationConfig.java

package ksbysample.eipapp.datacopy.config;

import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class ApplicationConfig {

    @Primary
    @Bean
    @ConfigurationProperties("spring.datasource.world")
    public DataSource dataSourceWorld() {
        return DataSourceBuilder.create().build();
    }

    @Primary
    @Bean
    public PlatformTransactionManager transactionManagerWorld() {
        return new DataSourceTransactionManager(dataSourceWorld());
    }

    @Bean
    @ConfigurationProperties("spring.datasource.ksbylending")
    public DataSource dataSourceKsbylending() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    public PlatformTransactionManager transactionManagerKsbylending() {
        return new DataSourceTransactionManager(dataSourceKsbylending());
    }

}
  • world データベース、ksbylending データベースの DataSource, PlatformTransactionManager Bean を定義します。

ChannelConfig.java

package ksbysample.eipapp.datacopy.integration.channel;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.MessageChannel;

@Configuration
public class ChannelConfig {

    @Bean
    public MessageChannel copyChannel() {
        return new QueueChannel(100);
    }

    @Bean
    public MessageChannel delChannel() {
        return new QueueChannel(100);
    }

}
  • DirectChannel で作成すると MessageChannel の前後の処理が1トランザクションとして処理されてしまい、getFlow でデータを取得して copyChannel にデータを渡したら status をその時点で ‘01’ に更新したくても copyFlow の処理が正常終了しないと更新されなくなるので、QueueChannel で作成し getFlow だけでトランザクションが終了するようにします。
  • delChannel の方も copyFlow の処理だけでトランザクションが終了するようにしたいので、同様に QueueChannel で作成します。

OrdersDto.java

package ksbysample.eipapp.datacopy.dto;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class OrdersDto {

    private String orderId;

    private String status;

}

OrdersDtoRowMapper.java

package ksbysample.eipapp.datacopy.dto;

import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class OrdersDtoRowMapper implements RowMapper<OrdersDto> {

    @Override
    public OrdersDto mapRow(ResultSet rs, int rowNum) throws SQLException {
        OrdersDto ordersDto = new OrdersDto();
        ordersDto.setOrderId(rs.getString("order_id"));
        ordersDto.setStatus(rs.getString("status"));
        return ordersDto;
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersRowMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class FlowConfig {

    private final DataSource dataSourceWorld;

    private final PlatformTransactionManager transactionManagerWorld;

    private final MessageChannel copyChannel;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
    }

    @Bean
    public MessageSource<Object> ordersJdbcMessageSource() {
        JdbcPollingChannelAdapter adapter
                = new JdbcPollingChannelAdapter(this.dataSourceWorld, "select * from orders where status = '00'");
        adapter.setRowMapper(new OrdersRowMapper());
        adapter.setUpdateSql("update orders set status = '01' where order_id in (:orderId)");
        return adapter;
    }

    @Bean
    public IntegrationFlow getFlow() {
        // MySQL の world データベースの orders テーブルから status = '00' のデータを
        // 1秒間隔で取得して copyChannel にデータを渡す。取得したデータの status カラム
        // は '00'→'01' に更新する。
        return IntegrationFlows.from(ordersJdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(1000)
                        .transactional(this.transactionManagerWorld)))
                .channel(this.copyChannel)
                .get();
    }

}

■その2

@Configuration
public class FlowConfig {

    ..........

    private final MessageChannel delChannel;

    private final CopyOrdersMessageHandler copyOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
    }


    ..........

    @Bean
    public IntegrationFlow copyFlow() {
        // copyChannel を 1秒間隔でチェックして、データがある場合には PostgreSQL
        // の ksbylending データベースの orders テーブルに insert する。insert に成功した
        // 場合にはデータをそのまま delChannel に渡す。
        return IntegrationFlows.from(this.copyChannel)
                .handle(this.copyOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .channel(this.delChannel)
                .get();
    }

}

■その3

@Configuration
public class FlowConfig {

    ..........

    private final DelOrdersMessageHandler delOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler
            , DelOrdersMessageHandler delOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
        this.delOrdersMessageHandler = delOrdersMessageHandler;
    }

    ..........

    @Bean
    public IntegrationFlow delFlow() {
        // delChannel を 1秒間隔でチェックして、データがある場合には MySQL の
        // world データベースの orders テーブルのデータを delete する。
        return IntegrationFlows.from(this.delChannel)
                .handle(this.delOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .get();
    }

}

■完成版

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDtoRowMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class FlowConfig {

    private final DataSource dataSourceWorld;

    private final PlatformTransactionManager transactionManagerWorld;

    private final MessageChannel copyChannel;

    private final MessageChannel delChannel;

    private final CopyOrdersMessageHandler copyOrdersMessageHandler;

    private final DelOrdersMessageHandler delOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler
            , DelOrdersMessageHandler delOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
        this.delOrdersMessageHandler = delOrdersMessageHandler;
    }

    @Bean
    public MessageSource<Object> ordersJdbcMessageSource() {
        JdbcPollingChannelAdapter adapter
                = new JdbcPollingChannelAdapter(this.dataSourceWorld, "select * from orders where status = '00'");
        adapter.setRowMapper(new OrdersDtoRowMapper());
        adapter.setUpdateSql("update orders set status = '01' where order_id in (:orderId)");
        return adapter;
    }

    @Bean
    public IntegrationFlow getFlow() {
        // MySQL の world データベースの orders テーブルから status = '00' のデータを
        // 1秒間隔で取得して copyChannel にデータを渡す。取得したデータの status カラム
        // は '00'→'01' に更新する。
        return IntegrationFlows.from(ordersJdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(1000)
                        .transactional(this.transactionManagerWorld)))
                .channel(this.copyChannel)
                .get();
    }

    @Bean
    public IntegrationFlow copyFlow() {
        // copyChannel を 1秒間隔でチェックして、データがある場合には PostgreSQL
        // の ksbylending データベースの orders テーブルに insert する。insert に成功した
        // 場合にはデータをそのまま delChannel に渡す。
        return IntegrationFlows.from(this.copyChannel)
                .handle(this.copyOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .channel(this.delChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delFlow() {
        // delChannel を 1秒間隔でチェックして、データがある場合には MySQL の
        // world データベースの orders テーブルのデータを delete する。
        return IntegrationFlows.from(this.delChannel)
                .handle(this.delOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .get();
    }

}

CopyOrdersMessageHandler.java

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDto;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;

@MessageEndpoint
public class CopyOrdersMessageHandler implements GenericHandler<List<OrdersDto>> {

    private final DataSource dataSource;

    public CopyOrdersMessageHandler(@Qualifier("dataSourceKsbylending") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    @Transactional(transactionManager = "transactionManagerKsbylending")
    public Object handle(List<OrdersDto> payload, Map<String, Object> headers) {
        JdbcMessageHandler insertHandler
                = new JdbcMessageHandler(this.dataSource, "insert into orders (order_id) values (:payload.orderId)");
        insertHandler.afterPropertiesSet();
        payload.stream()
                .forEach(dto -> {
                    Message<OrdersDto> orderxMessage = MessageBuilder.withPayload(dto).build();
                    insertHandler.handleMessage(orderxMessage);
                });
        return payload;
    }
}

DelOrdersMessageHandler.java

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDto;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;

@MessageEndpoint
public class DelOrdersMessageHandler implements GenericHandler<List<OrdersDto>> {

    private final DataSource dataSource;

    public DelOrdersMessageHandler(@Qualifier("dataSourceWorld") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    @Transactional(transactionManager = "transactionManagerWorld")
    public Object handle(List<OrdersDto> payload, Map<String, Object> headers) {
        JdbcMessageHandler deleteHandler
                = new JdbcMessageHandler(this.dataSource
                , "delete from orders where order_id = :payload.orderId and status = '01'");
        deleteHandler.afterPropertiesSet();
        payload.stream()
                .forEach(dto -> {
                    Message<OrdersDto> orderxMessage = MessageBuilder.withPayload(dto).build();
                    deleteHandler.handleMessage(orderxMessage);
                });
        return null;
    }
}

履歴

2017/01/11
初版発行。

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( 感想 )

最後に感想です。

  • Nexus Repository Manager OSS 3.x はインストールが難しくないし、UI も分かりやすくて使いやすいと思います。また maven だけでなく OSS 版で npm や docker が使用できる ( Artifactory の Open Source 版では使えないらしい )、と書かれているのを見かけたことがあります。その辺も使えるようになりたいですね。

  • ライブラリのアップロードは build.gradle の設定が分かればあとは簡単な印象です。build.gradle の設定を調べるのが一番大変でした。

  • ライブラリの作成・アップロードと、別プロジェクトから利用してみて気づいた点としては、

  • ライブラリを作ってあらためて Spring Boot の auto-configuration の便利さに気付かされました。build.gradle にライブラリを使用するように記述するだけでいろいろ自動で設定してくれるというのが本当にすごい!、こんなものもライブラリ化できるようになるんだな、と思いました。

さて、次は何をしましょうか。考え中です。。。