かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その28 )( Spring Cloud for AWS で S3 へファイルをアップロード・ダウンロードする )

概要

記事一覧 はこちらです。
今回作成したソースの GitHub レポジトリ はこちらです。

Spring Boot + Spring Integration でいろいろ試してみる ( その27 )( Spring Integration Extension for AWS で S3 へファイルをアップロード・ダウンロードする ) からの続きです。

今回は Spring Cloud for Amazon Web Services のサンプルを作成します。仕様や、S3 Bucket とアップロード・ダウンロード用の IAM ユーザは前回と同じにして、実装だけ変えます。

参照したサイト・書籍

目次

  1. ksbysample-eipapp-cloudaws プロジェクトを作成する
  2. upload ディレクトリ → S3 へアップロードする処理を実装する
  3. S3 → download ディレクトリへダウンロード → upload ディレクトリへ移動する処理を実装する
  4. 動作確認
  5. S3 Bucket とアップロード・ダウンロード用の IAM ユーザを削除する
  6. Spring Integration Extension for AWS と Spring Cloud for AWS を使用してみた感想

手順

ksbysample-eipapp-cloudaws プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成します。

f:id:ksby:20180912011527p:plain f:id:ksby:20180912011619p:plain f:id:ksby:20180912011849p:plain f:id:ksby:20180912011947p:plain

作成後、build.gradle を以下のように変更します。

buildscript {
    ext {
        springBootVersion = '2.0.4.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'ksbysample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES
        mavenBom 'org.springframework.cloud:spring-cloud-aws:2.0.0.RELEASE'
    }
}

dependencies {
    def lombokVersion = "1.18.2"

    implementation('org.springframework.boot:spring-boot-starter-integration')
    implementation('org.springframework.cloud:spring-cloud-aws-context')
    implementation('org.springframework.integration:spring-integration-file')
    testImplementation('org.springframework.boot:spring-boot-starter-test')

    // for lombok
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
}
  • dependencyManagement block を追加します。
  • dependencies block に以下の2行を追加します。
    • implementation('org.springframework.cloud:spring-cloud-aws-context')
    • implementation('org.springframework.integration:spring-integration-file')
  • lombok@Slf4j アノテーションを使いたいので、dependencies block に以下の3行を追加します。
    • def lombokVersion = "1.18.2"
    • annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    • compileOnly("org.projectlombok:lombok:${lombokVersion}")

メインクラス名を KsbysampleEipappCloudawsApplication → Application に変更した後、clean タスク実行 → Rebuild Project 実行 → build タスクを実行して "BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

※最初 build.gradle の dependencies block には Spring Cloud for Amazon Web Services に記載されている implementation('org.springframework.cloud:spring-cloud-starter-aws') を記載したのですが、テストを実行した時に java.lang.IllegalStateException: There is no EC2 meta data available, because the application is not running in the EC2 environment. Region detection is only possible if the application is running on a EC2 instance というエラーが出たので implementation('org.springframework.cloud:spring-cloud-aws-context') に変更しました。

upload ディレクトリ → S3 へアップロードする処理を実装する

src/main/java/ksbysample/eipapp/cloudaws の下に flow パッケージを作成した後、その下に FlowConfig.java を新規作成して、以下の内容を記述します。

@Slf4j
@Configuration
public class FlowConfig {

    private static final String EIPAPP_ROOT_DIR_PATH = "D:/eipapp/ksbysample-eipapp-cloudaws";
    private static final String UPLOAD_DIR_PATH = EIPAPP_ROOT_DIR_PATH + "/upload";
    private static final String UPLOADING_DIR_PATH = EIPAPP_ROOT_DIR_PATH + "/uploading";
    private static final String S3_BUCKET = "s3bucket-integration-test-ksby";

    // リージョンは環境変数 AWS_REGION に(東京リージョンなら ap-northeast-1)、
    // AccessKeyId, SecretAccessKey はそれぞれ環境変数 AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY にセットする
    @Bean
    public TransferManager transferManager() {
        return TransferManagerBuilder.standard().build();
    }

    /********************************************************
     * upload ディレクトリ --> S3 ファイルアップロード処理        *
     ********************************************************/

    @Bean
    public FileReadingMessageSource uploadFileMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(UPLOAD_DIR_PATH));
        source.setFilter(new AcceptAllFileListFilter<>());
        return source;
    }

    @Bean
    public IntegrationFlow uploadToS3Flow() {
        return IntegrationFlows.from(
                // 200ミリ秒毎に upload ディレクトリを監視し、ファイルがあれば処理を進める
                uploadFileMessageSource(), c -> c.poller(Pollers.fixedDelay(200)))
                // ファイルを uploading ディレクトリへ移動する
                .<File>handle((p, h) -> {
                    try {
                        Path movedFilePath = Files.move(p.toPath(), Paths.get(UPLOADING_DIR_PATH, p.getName())
                                , StandardCopyOption.REPLACE_EXISTING);
                        return new GenericMessage<>(movedFilePath.toFile(), h);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                // ここから下はマルチスレッドで並列処理する
                .channel(c -> c.executor(Executors.newFixedThreadPool(2)))
                // 処理開始のログを出力し、S3 へアップロードする
                .<File>handle((p, h) -> {
                    log.warn(String.format("☆☆☆ %s を S3 にアップロードします", p.getName()));
                    try {
                        // .waitForUploadResult() も呼び出してアップロード完了を待たないとファイルはアップロードされない
                        transferManager()
                                .upload(S3_BUCKET, p.getName(), p)
                                .waitForUploadResult();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return new GenericMessage<>(p, h);
                })
                // アップロードしたファイルを削除し、処理終了のログを出力する
                .<File>handle((p, h) -> {
                    p.delete();
                    log.warn(String.format("★★★ %s を S3 にアップロードしました", p.getName()));
                    return null;
                })
                .get();
    }

}

upload ディレクトリに5ファイルだけ配置して動作確認してみます。最初に IntelliJ IDEA のメインメニューから「Run」-「Edit Configurations...」を選択して「Run/Debug Configurations」ダイアログを表示後、AWS_REGION、AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEY を設定します。

f:id:ksby:20180912070220p:plain

s3bucket-integration-test-ksby バケットが空の状態であることを確認します。

f:id:ksby:20180912070414p:plain

アプリケーションを起動し upload ディレクトリに5ファイル配置すると、ファイルがアップロードされます。

f:id:ksby:20180912070651p:plain

再び s3bucket-integration-test-ksby バケットを見ると5ファイルアップロードされていました。ダウンロードして元の画像と比較すると同じファイルで問題ありませんでした。

S3 → download ディレクトリへダウンロード → upload ディレクトリへ移動する処理を実装する

src/main/java/ksbysample/eipapp/cloudaws/flow/FlowConfig.java に以下の処理を追加します。

@Slf4j
@Configuration
public class FlowConfig {

    ..........
    private static final String DOWNLOAD_DIR_PATH = EIPAPP_ROOT_DIR_PATH + "/download";
    ..........

    /********************************************************
     * S3 --> download ディレクトリ ファイルダウンロード処理      *
     ********************************************************/

    @Bean
    public MessageSource<File> downloadFileFromS3MessageSource() {
        return () -> {
            try {
                File downloadFile = null;
                String key = null;
                ObjectListing objectListing = amazonS3().listObjects(S3_BUCKET);
                if (objectListing.getObjectSummaries().size() > 0) {
                    S3ObjectSummary summary = objectListing.getObjectSummaries().iterator().next();
                    key = summary.getKey();
                    downloadFile = Paths.get(DOWNLOAD_DIR_PATH, key).toFile();
                    transferManager().download(S3_BUCKET, key, downloadFile)
                            .waitForCompletion();
                }
                return downloadFile != null
                        ? MessageBuilder.withPayload(downloadFile).setHeader("s3Path", key).build()
                        : null;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Bean
    public IntegrationFlow downloadFromS3Flow() {
        return IntegrationFlows.from(
                // 200ミリ秒毎に S3 Bucket を監視し、ファイルがあれば処理を進める
                downloadFileFromS3MessageSource(), c -> c.poller(Pollers
                        .fixedDelay(200)))
                // download ディレクトリに保存されたファイルを upload ディレクトリに移動する
                // ちなみに download ディレクトリからファイルを移動か削除しないと s3InboundFileSynchronizingMessageSource()
                // から Message が延々と送信され続けるので、必ず移動か削除する必要がある
                .<File>handle((p, h) -> {
                    try {
                        Files.move(p.toPath(), Paths.get(UPLOAD_DIR_PATH, p.getName())
                                , StandardCopyOption.REPLACE_EXISTING);
                        log.error(String.format("◎◎◎ %s をダウンロードしました", p.getName()));
                        return new GenericMessage<>(p, h);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .<File>handle((p, h) -> {
                    amazonS3().deleteObject(S3_BUCKET, (String) h.get("s3Path"));
                    return null;
                })
                .get();
    }

}

アップロードの処理をコメントアウトして、アップロードしたファイルをダウンロードしてみます。

アプリケーションを起動すると S3 Bucket にあるファイルが download ディレクトリにダウンロード → upload ディレクトリへ移動されて、

f:id:ksby:20180916154326p:plain

S3 Bucket の中は空になっていました。

f:id:ksby:20180916154455p:plain

ダウンロードされたファイルは元々アップロードしたファイルと全く同じで問題ありませんでした。

動作確認

アプリケーションを起動後、最初は upload ディレクトリにファイルを1つだけ配置してみます。

f:id:ksby:20180916160000p:plain

ファイルのアップロードとダウンロードを繰り返し、エラーも発生しませんでした。

今度は 30ファイル配置してみます。

f:id:ksby:20180916160507p:plain f:id:ksby:20180916160609p:plain f:id:ksby:20180916160738p:plain f:id:ksby:20180916160909p:plain

ダウンロード・アップロードが実行されました。全然制御していないので、同じファイルばかりダウンロードされていますが。。。

S3 Bucket とアップロード・ダウンロード用の IAM ユーザを削除する

以下のコマンドを実行して、作成した S3 Bucket とアップロード・ダウンロード用の IAM ユーザを削除します。

f:id:ksby:20180916170109p:plain

Spring Integration Extension for AWS と Spring Cloud for AWS を使用してみた感想

  • Spring Integration Extension for AWS を使用すると Spring Integration らしく実装できて、かつ便利になったという感じがします。
  • Spring Cloud for AWSAWS SDK for Java の知識がないとやりたいことが実装できなくて、S3 を使うだけの場合、あまりメリットを感じられませんでした。build.gradle に implementation('org.springframework.cloud:spring-cloud-aws-context') を記述すれば依存性解決をしてくれることくらいのような気がします。
  • implementation('org.springframework.cloud:spring-cloud-starter-aws') を記述すると、裏で何かいろいろ設定されるのか、PC 上で起動できなくなる(起動するためにいろいろ調べないといけない)ので、やり過ぎの感じがします。EC2 インスタンス上で起動すれば便利に感じるのかもしれませんが、今回は PC 上で実行していたので分かりません。implementation('org.springframework.integration:spring-integration-aws:2.0.0.RELEASE') はそんなことはなかったので、Spring Integration Extension for AWS の方で不要な自動設定を無効にしてくれているのでしょうか。。。
  • Spring Cloud for AWS のダウンロード処理を実装しようとしていて思ったのが、Spring Integration の MessageSource の作成方法の理解が全然足りないということでした。Spring Integration Extension for AWS だと Spring Integration に既にある AbstractInboundFileSynchronizer や AbstractInboundFileSynchronizingMessageSource を利用して実装されているのですが、それらのクラスは今回初めて知りました。Spring Integration、まだまだ奥が深いです。

履歴

2018/09/16
初版発行。