かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その27 )( Spring Integration Extension for AWS で S3 へファイルをアップロード・ダウンロードする )

概要

記事一覧 はこちらです。
今回作成したソースの GitHub レポジトリ はこちらです。

Spring Integration のアプリケーションから AWS の S3 へファイルをアップロード・ダウンロードするのに使えそうな Spring のライブラリとして以下の2種類がありますが、

それぞれのサンプルを作成してみます。まずは Spring Integration Extension for Amazon Web Services (AWS) から。

ちなみにこの2つのライブラリの違いですが、Spring Integration Extension for Amazon Web Services (AWS)Note the Spring Integration AWS Extension is based on the Spring Cloud AWS project. と記載されていました。Spring Integration Extension for AWS は Spring Cloud for AWS をベースに Spring Integration の Adapter 等を作成したもののようです。

参照したサイト・書籍

目次

  1. 仕様を決める
  2. ksbysample-eipapp-integrationaws プロジェクトを作成する
  3. S3 Bucket とアップロード・ダウンロード用の IAM ユーザを作成する
  4. upload ディレクトリ → S3 へアップロードする処理を実装する
  5. S3 → download ディレクトリへダウンロード → upload ディレクトリへ移動する処理を実装する
  6. 動作確認

手順

仕様を決める

  • S3 Bucket を1つ作成します。
  • 作成した S3 Bucket へアップロード・ダウンロードするための IAM ユーザを1つ作成します。
  • アプリケーションは AWS 側ではなく PC 上で実行します。
  • 作成したアプリケーションを実行するディレクトリ構成を以下のようにします。
D:\eipapp\ksbysample-eipapp-integrationaws
├ download
├ upload
├ uploading
└ ksbysample-eipapp-integrationaws-0.0.1-SNAPSHOT.jar
  • 最初に upload ディレクトリの下に画像ファイルを 30ファイル配置します。1ファイル 3~4MB 程度のファイルです。
  • 一定時間毎に upload ディレクトリを監視し、ファイルがあれば S3 Buket へアップロードします。アップロードするファイルは upload ディレクトリから uploading ディレクトリへ移動してから S3 へアップロードします(ディレクトリ監視に引っかからないようにするため)。
  • S3 へのアップロードはマルチスレッドで並列処理します。
  • 一定時間毎に S3 Bucket を監視し、ファイルがあれば PC にダウンロードします。こちらはシングルスレッドで処理します。
  • upload ディレクトリ →(アップロードFlow)→ S3 Bucket
    →(ダウンロードFlow)→ download ディレクトリ → upload ディレクト
    →(続く)
    という流れになります。

ksbysample-eipapp-integrationaws プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成します。

f:id:ksby:20180909112018p:plain f:id:ksby:20180909112113p:plain f:id:ksby:20180909112211p:plain f:id:ksby:20180909112341p:plain

作成後、build.gradle を以下のように変更します。

buildscript {
    ext {
        springBootVersion = '2.0.4.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'ksbysample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES
    }
}

dependencies {
    def lombokVersion = "1.18.2"

    implementation('org.springframework.boot:spring-boot-starter-integration')
    implementation('org.springframework.integration:spring-integration-aws:2.0.0.RELEASE')
    implementation('org.springframework.integration:spring-integration-file')
    testImplementation('org.springframework.boot:spring-boot-starter-test')

    // for lombok
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
}
  • dependencyManagement block を追加します。
  • dependencies block に以下の2行を追加します。
    • implementation('org.springframework.integration:spring-integration-aws:2.0.0.RELEASE')
    • implementation('org.springframework.integration:spring-integration-file')
  • lombok@Slf4j アノテーションを使いたいので、dependencies block に以下の3行を追加します。
    • def lombokVersion = "1.18.2"
    • annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    • compileOnly("org.projectlombok:lombok:${lombokVersion}")

メインクラス名を KsbysampleEipappIntegrationawsApplication → Application に変更した後、clean タスク実行 → Rebuild Project 実行 → build タスクを実行して "BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

S3 Bucket とアップロード・ダウンロード用の IAM ユーザを作成する

AWS アカウントがすでに作成されていること、AWS CLI をインストール済であること、AWS CloudFormation を実行可能な環境を作成済であること、が前提で以下の手順を記載しています。

プロジェクトのルート直下に create-s3bucket.yaml を新規作成し、以下の内容を記述します。

AWSTemplateFormatVersion: 2010-09-09
Description: Spring Integration Test Stack

Resources:
  #########################
  # S3 Bucket
  #########################
  IntegrationTestS3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: "s3bucket-integration-test-ksby"

  #########################
  # IAM User & IAM Policy
  #########################
  IntegrationTestUser:
    Type: AWS::IAM::User
    Properties:
      UserName: "iam-user-integration-test"
  IntegrationTestUserAccessKey:
    Type: AWS::IAM::AccessKey
    Properties:
      UserName: !Ref IntegrationTestUser
  IntegrationTestPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: "iam-policy-integration-test"
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - s3:ListBucket
              - s3:GetBucketLocation
            Resource: !Join [ "", [ "arn:aws:s3:::", !Ref IntegrationTestS3Bucket ] ]
          - Effect: Allow
            Action:
              - s3:PutObject
              - s3:PutObjectAcl
              - s3:GetObject
              - s3:GetObjectAcl
              - s3:DeleteObject
            Resource: !Join [ "", [ "arn:aws:s3:::", !Ref IntegrationTestS3Bucket, "/*" ] ]
      Users:
        - !Ref IntegrationTestUser

Outputs:
  AccessKeyId:
    Value: !Ref IntegrationTestUserAccessKey
  SecretAccessKey:
    Value: !GetAtt IntegrationTestUserAccessKey.SecretAccessKey

git-cmd.exe を起動した後、以下のコマンドを実行してテストで使用する S3 Bucket とアップロード・ダウンロード用の IAM ユーザを作成します。

f:id:ksby:20180909165121p:plain f:id:ksby:20180909165328p:plain

upload ディレクトリ → S3 へアップロードする処理を実装する

src/main/java/ksbysample/eipapp/integrationaws の下に flow パッケージを作成した後、その下に FlowConfig.java を新規作成して、以下の内容を記述します。

@Slf4j
@Configuration
public class FlowConfig {

    private static final String EIPAPP_ROOT_DIR_PATH = "D:/eipapp/ksbysample-eipapp-integrationaws";
    private static final String UPLOAD_DIR_PATH = EIPAPP_ROOT_DIR_PATH + "/upload";
    private static final String UPLOADING_DIR_PATH = EIPAPP_ROOT_DIR_PATH + "/uploading";
    private static final String S3_BUCKET = "s3bucket-integration-test-ksby";

    // リージョンは環境変数 AWS_REGION に(東京リージョンなら ap-northeast-1)、
    // AccessKeyId, SecretAccessKey はそれぞれ環境変数 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY にセットする
    @Bean
    public AmazonS3 amazonS3() {
        return AmazonS3ClientBuilder.standard().build();
    }

    /********************************************************
     * upload ディレクトリ --> S3 ファイルアップロード処理        *
     ********************************************************/

    @Bean
    public FileReadingMessageSource uploadFileMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(UPLOAD_DIR_PATH));
        source.setFilter(new AcceptAllFileListFilter<>());
        return source;
    }

    @Bean
    public MessageHandler uploadToS3MessageHandler() {
        return new S3MessageHandler(amazonS3(), S3_BUCKET);
    }

    @Bean
    public IntegrationFlow uploadToS3Flow() {
        return IntegrationFlows.from(
                // 200ミリ秒毎に upload ディレクトリを監視し、ファイルがあれば処理を進める
                uploadFileMessageSource(), c -> c.poller(Pollers.fixedDelay(200)))
                // ファイルを uploading ディレクトリへ移動する
                .<File>handle((p, h) -> {
                    try {
                        Path movedFilePath = Files.move(p.toPath(), Paths.get(UPLOADING_DIR_PATH, p.getName())
                                , StandardCopyOption.REPLACE_EXISTING);
                        return new GenericMessage<>(movedFilePath.toFile(), h);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                // ここから下はマルチスレッドで並列処理する
                .channel(c -> c.executor(Executors.newFixedThreadPool(5)))
                // 処理開始のログを出力する
                // 上の .channel(...) の直後に .log(...) を書くと並列処理されないため、.handle(...) を書いてその中でログに出力する
                .<File>handle((p, h) -> {
                    log.warn(String.format("☆☆☆ %s を S3 にアップロードします", p.getName()));
                    return new GenericMessage<>(p, h);
                })
                // S3 へアップロードする
                // S3MessageHandler は Outbound Channel Adapter で .handle(...) メソッドに渡しただけでは次の処理に行かないので、
                // .wireTap(...) で呼び出す
                .wireTap(sf -> sf
                        .handle(uploadToS3MessageHandler()))
                // アップロードしたファイルを削除し、処理終了のログを出力する
                .<File>handle((p, h) -> {
                    p.delete();
                    log.warn(String.format("★★★ %s を S3 にアップロードしました", p.getName()));
                    return null;
                })
                .get();
    }

}

upload ディレクトリに5ファイルだけ配置して動作確認してみます。最初に IntelliJ IDEA のメインメニューから「Run」-「Edit Configurations...」を選択して「Run/Debug Configurations」ダイアログを表示後、AWS_REGION、AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEY を設定します。

f:id:ksby:20180909195452p:plain

s3bucket-integration-test-ksby バケットが空の状態であることを確認します。

f:id:ksby:20180909194942p:plain

アプリケーションを起動し upload ディレクトリに5ファイル配置すると、ファイルがアップロードされます。

f:id:ksby:20180909195733p:plain

再び s3bucket-integration-test-ksby バケットを見ると5ファイルアップロードされていました。ダウンロードして元の画像と比較すると同じファイルで問題ありませんでした。

f:id:ksby:20180909195903p:plain

S3 → download ディレクトリへダウンロード → upload ディレクトリへ移動する処理を実装する

src/main/java/ksbysample/eipapp/integrationaws/flow/FlowConfig.java に以下の処理を追加します。

@Slf4j
@Configuration
public class FlowConfig {

    ..........
    private static final String DOWNLOAD_DIR_PATH = EIPAPP_ROOT_DIR_PATH + "/download";
    ..........

    /********************************************************
     * S3 --> download ディレクトリ ファイルダウンロード処理      *
     ********************************************************/

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(amazonS3());
        synchronizer.setDeleteRemoteFiles(true);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setRemoteDirectory(S3_BUCKET);
        return synchronizer;
    }

    @Bean
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() {
        S3InboundFileSynchronizingMessageSource messageSource =
                new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
        messageSource.setLocalDirectory(new File(DOWNLOAD_DIR_PATH));
        messageSource.setLocalFilter(new AcceptAllFileListFilter<>());
        return messageSource;
    }

    @Bean
    public IntegrationFlow downloadFromS3Flow() {
        return IntegrationFlows.from(
                // 1秒毎に S3 Bucket を監視し、ファイルがあれば処理を進める
                s3InboundFileSynchronizingMessageSource(), c -> c.poller(Pollers
                        .fixedDelay(1000)
                        // 1度に最大100ファイルダウンロードする
                        // .maxMessagesPerPoll(...) を書かないと 1ファイルずつダウンロードされる
                        .maxMessagesPerPoll(100)))
                // download ディレクトリに保存されたファイルを upload ディレクトリに移動する
                // ちなみに download ディレクトリからファイルを移動か削除しないと s3InboundFileSynchronizingMessageSource()
                // から Message が延々と送信され続けるので、必ず移動か削除する必要がある
                .<File>handle((p, h) -> {
                    try {
                        Files.move(p.toPath(), Paths.get(UPLOAD_DIR_PATH, p.getName())
                                , StandardCopyOption.REPLACE_EXISTING);
                        log.error(String.format("◎◎◎ %s をダウンロードしました", p.getName()));
                        return null;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .get();
    }

}

アップロードの処理をコメントアウトして、先程アップロードしたファイルをダウンロードしてみます。

アプリケーションを起動すると S3 Bucket にあるファイルが download ディレクトリにダウンロード → upload ディレクトリへ移動されて、

f:id:ksby:20180909211846p:plain

S3 Bucket の中は空になっていました。

f:id:ksby:20180909212049p:plain

ダウンロードされたファイルは元々アップロードしたファイルと全く同じで問題ありませんでした。

動作確認

アプリケーションを起動後、最初は upload ディレクトリにファイルを1つだけ配置してみます。

f:id:ksby:20180909232800p:plain

ファイルのアップロードとダウンロードを繰り返し、エラーも発生しませんでした。

今度は 30ファイル配置してみます。

f:id:ksby:20180909233613p:plain f:id:ksby:20180909233710p:plain f:id:ksby:20180909233828p:plain f:id:ksby:20180909233931p:plain f:id:ksby:20180909234027p:plain

なんか想定と違う動きをしています。。。

  • アップロード処理がマルチスレッドなので当然処理は多くて当たり前なのですが、ダウンロード処理がほとんど動きません。
  • org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory; nested exception is org.springframework.messaging.MessagingException: Failure occurred while copying 's3bucket-integration-test-ksby/P7250021.JPG' from the remote to the local directory; nested exception is java.net.SocketTimeoutException: Read timed out というエラーも出ています。
  • エラーの後に一気にダウンロードが実行されています。

1CPU2Core の PC で動かしているのですが、おそらく 5スレッドだと重いのと、ダウロード処理がどうも S3 Bucket 内に見つけたファイルを全て PC にダウンロードしてから処理を進めているようなので(ダウンロードのログが出ていないのに download ディレクトリにファイルが出来ていました)、以下の点を変更します。

    @Bean
    public IntegrationFlow uploadToS3Flow() {
                ..........
                // ここから下はマルチスレッドで並列処理する
                .channel(c -> c.executor(Executors.newFixedThreadPool(2)))
                ..........
    }

    ..........

    @Bean
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() {
        S3InboundFileSynchronizingMessageSource messageSource =
                new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
        messageSource.setLocalDirectory(new File(DOWNLOAD_DIR_PATH));
        messageSource.setLocalFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    public IntegrationFlow downloadFromS3Flow() {
        return IntegrationFlows.from(
                // 1秒毎に S3 Bucket を監視し、ファイルがあれば処理を進める
                s3InboundFileSynchronizingMessageSource(), c -> c.poller(Pollers
                        .fixedDelay(200)))
                ..........
    }

}
  • uploadToS3Flow 内のマルチスレッド数を 5 → 2 に変更します。
  • s3InboundFileSynchronizingMessageSource 内に messageSource.setMaxFetchSize(1); を追加し、S3 Bucket に見つけたファイルを全てダウンロードするのではなく 1ファイルだけダウンロードするようにします。
  • downloadFromS3Flow のポーリング間隔を uploadToS3Flow と同じ 200ミリ秒にし、.maxMessagesPerPoll(100) を削除します。

変更後に再度実行すると、最初はアップロードばかりが動きますが、途中からアップロードとダウンロードがバランスよく動くようになりました。

f:id:ksby:20180910001644p:plain

jar ファイルを作成してコマンドラインから実行してみます。まずは jar ファイルを生成して D:\eipapp\ksbysample-eipapp-integrationaws の下にコピーします。

コピー後、環境変数 AWS_REGION、AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEY を設定してからアプリケーションを実行します。

f:id:ksby:20180911043628p:plain

ksbysample-eipapp-integrationaws.log を IntelliJ IDEA のメインメニューの「Tools」-「Tail File in Console...」で開いておきます。

f:id:ksby:20180911043731p:plain

upload ディレクトリに 30ファイル配置すると、ダウンロード・アップロードが実行されました。

f:id:ksby:20180911044230p:plain f:id:ksby:20180911044329p:plain f:id:ksby:20180911044431p:plain f:id:ksby:20180911044553p:plain

Ctrl+C でアプリケーションを終了させた後、S3 にアップロードされたファイルを削除しておきます。

履歴

2018/09/11
初版発行。