かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する )

概要

記事一覧はこちらです。

  • Spring Integration は DSL を使った時の方が XML ファイルや DSL なしの Java Config の時よりも面白いです。DSL に慣れるためにもいくつかサンプルを作っていこうと思います。
  • Spring Integration DSL を使用して、以下の処理を行う常駐型アプリケーションを作成します。
    1. in ディレクトリに URL 一覧が記入された拡張子が .txt のファイルが置かれているか 1 秒間隔でチェックします。
    2. ファイルが置かれていたら読み込んで行単位に分割します。
    3. URL が “http://” で始まる場合には、その URL にアクセス可能か ( HTTP ステータスコードの 200 が返るか ) チェックします。チェック処理はスレッドを生成して並行処理します。
    4. URL と HTTP ステータスコードを出力した結果ファイルを out ディレクトリに作成します。出力する時には元の .txt ファイルに書かれていた順になるようにします。
    5. 元の .txt ファイルを削除します。
  • 今回作る処理を Spring Integration のフロー図で描こうとすると channel がいちいち途中に挿入されて分かりにくいので、今回は省略します。というか、フロー図を描こうとして改めて DSL だと複雑な処理でも結構簡単に書けるんだなということに気づきましたね。。。

参照したサイト・書籍

  1. spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/
    https://github.com/spring-projects/spring-integration-java-dsl/tree/master/src/test/java/org/springframework/integration/dsl/test

目次

  1. ksbysample-eipapp-urlchecker プロジェクトを作成する
  2. in, out ディレクトリを作成する
  3. urlListFilePollerFlow を実装する
  4. urlCheckFlow を実装する
  5. writeFileFlow を実装する
  6. deleteFileFlow を実装する
  7. 動作確認
  8. メモ書き
    1. .channel(c -> c.executor(taskExecutor())) を入れるとどう処理が変わるのか?
    2. urlListFilePollerFlow でわざわざ sequenceSize header の値を付ける処理を入れている理由とは?

手順

ksbysample-eipapp-urlchecker プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.urlchecker パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/urlchecker の下に Application.java を作成し、リンク先の内容 を記述します。

  4. ログの出力は INFO レベルからにするため、src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

in, out ディレクトリを作成する

C:/eipapp/ksbysample-eipapp-dirpoller を作成し、その下に in, out ディレクトリを作成します。以下の構成になります。

C:/eipapp/ksbysample-eipapp-urlchecker
├ in
└ out

urlListFilePollerFlow を実装する

以降の実装中に動作確認をする際には、以下の内容を記述した sample.txt というファイルを使用します。

http://projects.spring.io/spring-boot/
http://projects.spring.io/spring-integration/
http://ksby.hatenablog.com/entry/2017/01/13/083621
http://ksby.hatenablog.com/entry/2017/01/11/054343
http://ksby.hatenablog.com/entry/2017/01/08/064129
  1. src/main/java/ksbysample/eipapp/urlchecker の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

urlCheckFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその2の内容 に変更します。

    この時点で bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のようにログが出力されて、複数スレッドで並行処理されていることが分かります。

    f:id:ksby:20170115203712p:plain

writeFileFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその3の内容 に変更します。

    この時点で bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のようにログが出力されます。.resequence() の後でファイルに書かれていた順番に変更されていること、.aggregate() の後で 1 つの List の 1 Message に集約されていること、が確認できます。

    f:id:ksby:20170115210521p:plain

deleteFileFlow を実装する

  1. src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先のその4の内容 に変更します。

動作確認

動作確認します。以下のファイルを作成します。

■2014.txt

http://ksby.hatenablog.com/entry/2014/12/27/233427
http://ksby.hatenablog.com/entry/2014/12/29/175849

■2015.txt

http://ksby.hatenablog.com/entry/2015/04/18/194947
http://ksby.hatenablog.com/entry/2015/06/28/203752
http://ksby.hatenablog.com/entry/2015/07/22/232807
http://ksby.hatenablog.com/entry/2015/09/30/212728
http://ksby.hatenablog.com/entry/2015/12/27/211513

■2016.txt

http://ksby.hatenablog.com/entry/2016/02/17/103928
http://ksby.hatenablog.com/entry/2016/05/31/014201
http://ksby.hatenablog.com/entry/2016/12/29/230805

bootRun タスクを実行した後、in ディレクトリに 2014.txt, 2015.txt, 2016.txt を入れます。少しすると in ディレクトリからファイルが全てなくなり、out ディレクトリに 2014.txt, 2015.txt, 2016.txt が出来ました。

ログは以下のように出力されており、特に問題はありませんでした。

f:id:ksby:20170115225308p:plain f:id:ksby:20170115225517p:plain

out ディレクトリに出力された 2014.txt, 2015.txt, 2016.txt は以下の内容で、HTTPステータスコード付きで、元ファイルの順番通り出力されています。

■2014.txt

http://ksby.hatenablog.com/entry/2014/12/27/233427,200
http://ksby.hatenablog.com/entry/2014/12/29/175849,200

■2015.txt

http://ksby.hatenablog.com/entry/2015/04/18/194947,200
http://ksby.hatenablog.com/entry/2015/06/28/203752,200
http://ksby.hatenablog.com/entry/2015/07/22/232807,200
http://ksby.hatenablog.com/entry/2015/09/30/212728,200
http://ksby.hatenablog.com/entry/2015/12/27/211513,200

■2016.txt

http://ksby.hatenablog.com/entry/2016/02/17/103928,200
http://ksby.hatenablog.com/entry/2016/05/31/014201,200
http://ksby.hatenablog.com/entry/2016/12/29/230805,200

メモ書き

.channel(c -> c.executor(taskExecutor())) を入れるとどう処理が変わるのか?

.channel(c -> c.executor(taskExecutor())) を入れると、それ以降の処理が別スレッドで実行されるようになります。

urlListFilePollerFlow を以下のように変更して bootRun タスクを実行します。

  • .channel(c -> c.executor(taskExecutor())) を全てコメントアウトします。
  • .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE, ... 内に log.info(String.valueOf(lines.size())); を追加して、取得した行数をログに出力します。
    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
//                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            log.info(String.valueOf(lines.size()));
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して enrichHeaders 以降の処理を更に別のスレッドで並行処理する
//                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

sample.txt を in ディレクトリに置きます。

ログは以下のように出力され、処理が全て ask-scheduler-4 スレッド ( MessageSource を polling しているため main スレッドとは別に作成されるタスクスケジュール用のスレッド ) で実行されていることが確認できます。

f:id:ksby:20170115183038p:plain

次はコメントアウトしていた .channel(c -> c.executor(taskExecutor())) を元に戻して bootRun タスクを実行し直します。sample.txt を in ディレクトリに置くと以下のログが出力されて各処理がそれぞれ別のスレッドで実行されていることが確認できます。

f:id:ksby:20170115183610p:plain

urlListFilePollerFlow でわざわざ sequenceSize header の値を付ける処理を入れている理由とは?

urlListFilePollerFlow を以下のように変更し、.split(new FileSplitter()) の前後で header の内容がどう変わるかみてみます。

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
//                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
//                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
//                        , m -> {
//                            List<String> lines;
//                            try {
//                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
//                            } catch (IOException e) {
//                                throw new RuntimeException(e);
//                            }
//                            return lines.size();
//                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })
                .split(new FileSplitter())
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })

bootRun タスクを実行して sample.txt を in ディレクトリに置くと以下のログが出力されます。

f:id:ksby:20170115190040p:plain

このログを見ると .split(new FileSplitter()) の後だと header に以下のキーが追加されています。

  • sequenceNumber
  • file_name
  • sequenceSize
  • correlationId
  • file_originalFile

sequenceSize header の値を付ける処理を入れている理由ですが、以下の通りです。

  • FileSplitter で行毎に分割している場合 sequenceNumber に値を割り当ててはくれますが sequenceSize は 0 です。
  • この後 writeFileFlow で .resequence(), .aggregate() の処理を行いますが、この2つの処理はデフォルトでは 同じ correlationId のデータを sequenceSize 分のデータを受信してから処理する という動作をします。
  • そのため、このままでは sequenceSize = 0 なので .resequence(), .aggregate() が動作しません。それで正しい値を付ける処理を入れています。

以下のように .split(new FileSplitter()) の後のログ出力処理の位置を変更して実行してみます。

                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                .handle((p, h) -> {
                    System.out.println("★★★ " + p);
                    h.entrySet().stream().forEach(System.out::println);
                    return p;
                })

今度は sequenceSize=5 とファイルの行数がセットされていることが確認できます。

f:id:ksby:20170115191820p:plain

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.springframework.integration:spring-integration-http")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • Spring Integration の File Support を使用するので compile('org.springframework.integration:spring-integration-file') を記述します。
  • Spring Integration の HTTP Support は使用しませんが、RestTemplate を使用するために compile('org.springframework.integration:spring-integration-http') を記述します。
  • Spring Integration DSL は最新の 1.2 を使いたいので compile('org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE') を記述します。

Application.java

package ksbysample.eipapp.urlchecker;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>

FlowConfig.java

■その1

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.messaging.MessageChannel;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    @Bean
    public MessageChannel urlCheckChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                .channel(nullChannel)
                .get();
    }

}
  • urlListFilePollerFlow 内の各処理についてはコメントを参照。
  • この時点でも動作できるようにするために urlCheckFlow, writeFileFlow を仮実装で記述しておきます。

■その2

@Slf4j
@Configuration
public class FlowConfig {


    ..........
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";

    ..........

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    ..........

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    ..........

}

■その3

@Slf4j
@Configuration
public class FlowConfig {


    ..........
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";


    ..........

    @Bean
    public MessageChannel deleteFileChannel() {
        return new DirectChannel();
    }

    ..........

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS))
                        .build())
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(FileHeaders.FILENAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                .channel(nullChannel)
                .get();
    }

}

■その4

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(FileHeaders.ORIGINAL_FILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                })
                .get();
    }

}
  • 上記の記述を追加する以外に nullChannel のフィールドとコンストラクタを削除します。

■完成形

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";

    @Bean
    public MessageChannel urlCheckChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel deleteFileChannel() {
        return new DirectChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .enrichHeaders 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
                        , "headers['lines.size']", true))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS))
                        .build())
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(FileHeaders.FILENAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(FileHeaders.ORIGINAL_FILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                })
                .get();
    }

}

履歴

2017/01/15
初版発行。
2017/01/18
* build.gradle の dependencies の記述を ‘’ → “” へ統一しました。
* logback-spring.xml から使用していない記述を削除しました。
2017/01/22
* 以下の3つの定数文字列は Spring Integration で定義済だったので、それを使用するよう変更しました。
* “sequenceSize” を MESSAGE_HEADER_SEQUENCE_SIZE → IntegrationMessageHeaderAccessor.SEQUENCE_SIZE へ変更。
* “file_name” は MESSAGE_HEADER_FILE_NAME → FileHeaders.FILENAME へ変更。
* “file_originalFile” は MESSAGE_HEADER_FILE_ORIGINALFILE → FileHeaders.ORIGINAL_FILE へ変更。
* .enrichHeaders(h -> h.headerFunction.enrichHeaders(h -> h.headerExpression へ変更すれば値を .headerFilter で削除しなくても値を上書きできるので変更しました。

Spring Boot + Spring Integration でいろいろ試してみる ( その9 )( Pollers.fixedRate で待機時間を指定しても意味がない場合がある? )

概要

記事一覧はこちらです。

  • Spring Integration DSL の Pollers.fixedRate で次の処理までの待機時間をミリ秒で指定できますが、単純に「処理をする」→「指定されたミリ秒待機する」→「処理をする」→。。。、という処理だと漠然と思っていたら、QueueChannel からデータを取得する場合には少し違っていたというお話です。

参照したサイト・書籍

目次

  1. ksbysample-eipapp-pollertest プロジェクトを作成する
  2. poller の動作確認をするための実装をする
  3. 動作確認
  4. 結局どのように処理されているのか?

手順

ksbysample-eipapp-pollertest プロジェクトを作成する

まずは動作確認をするためのプロジェクトを作成します。

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

poller の動作確認をするための実装をする

  1. 以下の仕様のプログラムを実装します。

    f:id:ksby:20170113001933p:plain

    • MessageSource として呼び出されたら必ず Person オブジェクトの ArrayList を返すものを作成します。
    • getFlow では 1秒毎に MessageSource からデータを取得し、ログを出力した後 nextChannel にデータを渡します。
    • nextChannel は次の nextFlow でもポーリングをしたいので、QueueChannel として作成します。
    • nextFlow では nextChannel から 5秒毎にデータを取得してログを出力します。
  2. src/main/java の下に ksbysample.eipapp.pollertest パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/pollertest の下に Application.java を作成し、リンク先の内容 を記述します。

  4. src/main/java/ksbysample/eipapp/pollertest の下に Person.java を作成し、リンク先の内容 を記述します。

  5. src/main/java/ksbysample/eipapp/pollertest の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

動作確認

  1. bootRun タスクを実行します。

    f:id:ksby:20170113011005p:plain

    • nextFlow の poller の initialDelay (第2引数) に設定した 10秒後に nextFlow の処理が実行されて、getFlow が nextChannel に渡したデータ 11件が出力されます。
    • ただしその後は getFlow, nextFlow がほぼ同時に出力され続けており、nextFlow の poller の period (第1引数) に設定した 10秒がどうも機能していません???
  2. 起動したアプリケーションを停止します。

  3. src/main/java/ksbysample/eipapp/pollertest の下の FlowConfig.javaリンク先のその2の内容 に変更します。

  4. 再度 bootRun タスクを実行します。

    f:id:ksby:20170113012830p:plain

    • 今度は nextFlow はひたすら実行されたりはせずに、だいたい指定された 8秒毎に実行されているようです。

結局どのように処理されているのか?

  • MessageSource からデータを取得する場合には Pollers.fixedRate で指定された待機時間がそのまま機能する。
  • QueueChannel から取得する場合、QueueChannel をチェックしてデータがなくなって初めて待機するようになり、Pollers.fixedRate で指定された待機時間後にまた処理が実行される。QueueChannel にデータが次々と流れてくる状況では Pollers.fixedRate で待機時間を指定しても処理は待機することなく次々と実行されて実質意味がない。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile('org.springframework.boot:spring-boot-starter-integration')
    compile('org.springframework.integration:spring-integration-java-dsl')
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}

Application.java

package ksbysample.eipapp.pollertest;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

Person.java

package ksbysample.eipapp.pollertest;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Person {

    private String name;

    private int age;

}

FlowConfig.java

■その1

package ksbysample.eipapp.pollertest;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Configuration
public class FlowConfig {

    @Bean
    public MessageSource<List<Person>> personListMessageSource() {
        return () -> {
            List<Person> personList = new ArrayList<>();
            personList.add(new Person("tanaka", 35));
            personList.add(new Person("suzuki", 28));
            personList.add(new Person("kimura", 41));
            return MessageBuilder.withPayload(personList).build();
        };
    }

    @Bean
    public MessageChannel nextChannel() {
        return new QueueChannel(100);
    }

    @Bean
    public IntegrationFlow getFlow() {
        // 指定された時間毎に personListMessageSource からデータを取得して
        // ログを出力した後、nextChannel にデータを渡す
        return IntegrationFlows.from(personListMessageSource()
                , c -> c.poller(Pollers.fixedRate(1000)))
                .handle((p, h) -> {
                    log.info("★★★ getFlow");
                    return p;
                })
                .channel(nextChannel())
                .get();
    }

    @Bean
    public IntegrationFlow nextFlow() {
        // 指定された時間毎に nextChannel からデータを取得してログを出力する
        return IntegrationFlows.from(nextChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    log.info("■■■ nextFlow");
                    return null;
                }, c -> c.poller(Pollers.fixedRate(5000, 5000)))
                .get();
    }

}

■その2

    @Bean
    public IntegrationFlow getFlow() {
        // 指定された時間毎に personListMessageSource からデータを取得して
        // ログを出力した後、nextChannel にデータを渡す
        return IntegrationFlows.from(personListMessageSource()
                , c -> c.poller(Pollers.fixedRate(2000)))
                .handle((p, h) -> {
                    log.info("★★★ getFlow");
                    return p;
                })
                .channel(nextChannel())
                .get();
    }

    @Bean
    public IntegrationFlow nextFlow() {
        // 指定された時間毎に nextChannel からデータを取得してログを出力する
        return IntegrationFlows.from(nextChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    log.info("■■■ nextFlow");
                    return null;
                }, c -> c.poller(Pollers.fixedRate(8000, 5000)))
                .get();
    }
  • getFlow 内の Pollers.fixedRate に指定するミリ秒数を 1000 → 2000 に変更します。
  • nextFlow 内の Pollers.fixedRate に指定するミリ秒数を 5000 → 8000 に変更します。

履歴

2017/01/13
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その8 )( MySQL のテーブルのデータを取得して PostgreSQL のテーブルへ登録する常駐型アプリケーションを作成する )

概要

記事一覧はこちらです。

  • Spring Integration の 18. JDBC Support の機能を利用して、以下の処理を行う常駐型アプリケーションを作成してみます。
    1. MySQL の orders テーブルにデータを登録します。データには status を持たせ、登録時には status = ‘00’ にします。
    2. 登録されたデータを常駐型アプリケーションで取得します。取得時に status = ‘01’ に更新します。
    3. 取得したデータを PostgreSQL の orders テーブルに登録します。
    4. 登録できたデータを MySQL の orders テーブルから削除します。
  • Spring Integration のフロー図で描くと以下の構成になります。 f:id:ksby:20170109102124p:plain
  • 今回 Spring Integration DSL も使用します。XML ファイルは使用しません。
  • MySQLPostgreSQL は以前インストールした 5.6、9.4 を使用します。
  • Spring Boot は 1.4.3 を、Spring Integration は 4.3.6 を使用します。どちらも Spring IO Platform の Athens-SR2 を利用して指定します。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 18. JDBC Support
    http://docs.spring.io/spring-integration/docs/4.3.6.RELEASE/reference/html/jdbc.html

  2. spring-integration/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java
    https://github.com/spring-projects/spring-integration/blob/master/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java

    • JdbcMessageHandler の使い方を参考にしました。
  3. Spring Integration Java DSL Reference
    https://github.com/spring-projects/spring-integration-java-dsl/wiki/spring-integration-java-dsl-reference

  4. Spring Integration Java DSL: Line by line tutorial
    https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

    • Spring Integration DSL の tutorial です。
  5. Spring Integration Java DSL sample - further simplification with Jms namespace factories
    http://www.java-allandsundry.com/2014/06/spring-integration-java-dsl-sample.html

  6. Spring Integration Java DSL sample
    https://dzone.com/articles/spring-integration-java-dsl

  7. JDBC Spring Integration with Annotations
    http://stackoverflow.com/questions/27247013/jdbc-spring-integration-with-annotations

目次

  1. ksbysample-eipapp-datacopy プロジェクトを作成する
  2. MySQL と PostgreSQL に orders テーブルを作成する
  3. データベースの設定を記述し、DataSource 等の Bean を定義する
  4. copyChannel, delChannel を作成する
  5. getFlow を作成する
  6. copyFlow を作成する
  7. delFlow を作成する
  8. 動作確認
  9. 感想

手順

ksbysample-eipapp-datacopy プロジェクトを作成する

  1. feature/6-issue ブランチを作成します。

  2. IntelliJ IDEA の「Welcome to IntelliJ IDEA」ダイアログを表示した後、「Create New Project」をクリックします。

    f:id:ksby:20170109165105p:plain

  3. 「New Project」ダイアログが表示されます。画面左側で「Gradle」を選択した後、画面右側は何も変更せずに「Next」ボタンをクリックします。

    f:id:ksby:20170109165235p:plain

  4. GroupId、ArtifactId を入力する画面が表示されます。以下の画像の文字列を入力した後、「Next」ボタンをクリックします。

    f:id:ksby:20170109165833p:plain

  5. 次の画面が表示されます。「Create directories for empty content roots automatically」をチェックした後、「Next」ボタンをクリックします。

    f:id:ksby:20170109170709p:plain

  6. Project name、Project location を入力する画面が表示されます。以下の画像の文字列を入力した後、「Finish」ボタンをクリックします。

    f:id:ksby:20170109171103p:plain

  7. build.gradle を リンク先の内容 に変更します。

  8. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

  9. src/main/java の下に ksbysample.eipapp.datacopy パッケージを作成します。

  10. src/main/java/ksbysample/eipapp/datacopy の下に Application.java を新規作成します。作成後、リンク先の内容 に変更します。

  11. 以下のディレクトリの下に .gitkeep ファイルを作成します。

    • src/main/groovy
    • src/main/resources
    • src/test/groovy
    • src/test/java
    • src/test/resources
  12. この時点で Project Tool Window は以下の状態になります。

    f:id:ksby:20170109182746p:plain

※この時点ではまだアプリケーションは起動できません。起動しようとしてもデータベースの接続設定を記述していないためエラーになります。

MySQLPostgreSQL に orders テーブルを作成する

  1. IntelliJ IDEA の Database Tool Window から MySQL の world データベース、PostgreSQL の ksbylending データベースにアクセスできるように設定します。

  2. まず MySQL の world データベースに接続する設定を追加します。Database Tool Window の左上から「New」-「Data Source」-「MySQL」を選択します。

    f:id:ksby:20170109190150p:plain

  3. 「Data Sources and Drivers」ダイアログが表示されます。以下の画像の値を入力後、「OK」ボタンをクリックします。

    f:id:ksby:20170109190831p:plain

  4. 次に PostgreSQL の ksbylending データベースに接続する設定を追加します。Database Tool Window の左上から「New」-「Data Source」-「PostgreSQL」を選択します。

    f:id:ksby:20170109191218p:plain

  5. 「Data Sources and Drivers」ダイアログが表示されます。以下の画像の値を入力後、「OK」ボタンをクリックします。

    f:id:ksby:20170109191501p:plain

  6. この時点で Database Tool Window は以下の状態です。

    f:id:ksby:20170109193208p:plain

  7. MySQL の world データベースに orders テーブルを作成します。Database Tool Window で world データベースを選択してコンテキストメニューを表示した後、「New」-「Table」を選択します。

    f:id:ksby:20170109200601p:plain

  8. 「Create New Table」ダイアログが表示されます。以下の画像のデータを入力後、「Execute」ボタンをクリックします。

    f:id:ksby:20170109202312p:plain

  9. PostgreSQL の ksbylending データベースの方でも「Create New Table」ダイアログを表示し、以下の画像のデータを入力後、「Execute」ボタンをクリックします。

    f:id:ksby:20170109203836p:plain

  10. orders テーブル作成後は以下の状態になります。

    f:id:ksby:20170109204800p:plain

データベースの設定を記述し、DataSource 等の Bean を定義する

  1. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy の下に config パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/datacopy/config の下に ApplicationConfig.java を作成し、リンク先の内容 を記述します。

  4. src/main/resources/.gitkeep を削除します。

  5. データベースの接続設定を記述しましたのでアプリケーションを起動してみます。Gradle Tool Window から bootRun タスクを実行してアプリケーションが起動してエラーが出ないことを確認します。

    f:id:ksby:20170109213305p:plain

    アプリケーションはエラーが出ずに起動しましたが、Channel も何も作成していないとすぐに終了するんですね。。。

copyChannel, delChannel を作成する

  1. src/main/java/ksbysample/eipapp/datacopy の下に integration.channel パッケージを作成します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/channel の下に ChannelConfig.java を作成し、リンク先の内容 を記述します。

getFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy の下に dto パッケージを作成します。

  2. src/main/java/ksbysample/eipapp/datacopy/dto の下に OrdersDto.java を作成し、リンク先の内容 を記述します。

  3. src/main/java/ksbysample/eipapp/datacopy/dto の下に OrdersDtoRowMapper.java を作成し、リンク先の内容 を記述します。

  4. src/main/java/ksbysample/eipapp/datacopy/integration の下に flow パッケージを作成します。

  5. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

copyFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に CopyOrdersMessageHandler.java を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下の FlowConfig.javaリンク先のその2の内容 に変更します。

delFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に DelOrdersMessageHandler.java を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下の FlowConfig.javaリンク先のその3の内容 に変更します。

動作確認

  1. bootRun タスクを実行してアプリケーションを起動します。各データベースの orders テーブルには何も登録されていません。

    f:id:ksby:20170110224012p:plain

  2. MySQL の world データベースの orders テーブルにデータを登録して commit してみます。

    f:id:ksby:20170110224226p:plain

    status が ‘01’ に更新されます。

    f:id:ksby:20170110224345p:plain

    PostgreSQL の ksbylending データベースの orders テーブルにデータが登録されます。

    f:id:ksby:20170110224502p:plain

    world データベースの orders テーブルのデータが削除されます。

    f:id:ksby:20170110224651p:plain

  3. 登録済の order_id = ‘00000001’ のデータを再度登録して commit してみます。

    f:id:ksby:20170110225011p:plain

    status = ‘01’ に更新されますが、ksbylending データベースの orders テーブルには変化はありません。

    f:id:ksby:20170110225136p:plain

    ログには一意性制約違反のエラーが出力されます。

    f:id:ksby:20170110225538p:plain

  4. bootRun で起動したアプリケーションを停止します。

  5. commit して、develop, master ブランチへマージします。

感想

  • Spring Integration DSL が使いやすいです。Web に出ているサンプルを見ていた時には分かりにくい印象があったのですが、実際に使ってみると書きやすいですね。XML ファイルを使わないなら DSL を使うことをお勧めします。poller や transaction も DSL を使わない場合と比較するとかなり定義しやすいです。
  • JDBC Support の機能も簡単な処理なら実装しやすいです。ただし使い方を調べるのにサンプルが少なくて手こずりました。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    def jdbcDriverMySQL = "mysql:mysql-connector-java:6.0.5"
    def jdbcDriverPgSQL = "org.postgresql:postgresql:9.4.1212"

    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile('org.springframework.boot:spring-boot-starter-integration')
    compile('org.springframework.integration:spring-integration-jdbc')
    compile('org.springframework.integration:spring-integration-java-dsl')
    compile('org.springframework.boot:spring-boot-starter-jdbc')
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    runtime("${jdbcDriverMySQL}")
    runtime("${jdbcDriverPgSQL}")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • Spring Integration の JDBC Support を利用するので compile('org.springframework.integration:spring-integration-jdbc') を記述します。また Spring Data JDBC も利用するので compile('org.springframework.boot:spring-boot-starter-jdbc') を記述します。
  • Spring Integration DSL を使用するので compile('org.springframework.integration:spring-integration-java-dsl') を記述します。
  • MySQLPostgreSQL の両方のデータベースにアクセスするので、それぞれの JDBC Driver を記述します。

Application.java

package ksbysample.eipapp.datacopy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

application.properties

spring.datasource.world.url=jdbc:mysql://localhost/world?serverTimezone=JST
spring.datasource.world.username=root
spring.datasource.world.password=xxxxxxxx
spring.datasource.world.driverClassName=com.mysql.cj.jdbc.Driver

spring.datasource.ksbylending.url=jdbc:postgresql://localhost/ksbylending
spring.datasource.ksbylending.username=ksbylending_user
spring.datasource.ksbylending.password=xxxxxxxx
spring.datasource.ksbylending.driverClassName=org.postgresql.Driver
  • MySQL の world データベース、PostgreSQL の ksbylending データーベースへの接続情報を記述します。

ApplicationConfig.java

package ksbysample.eipapp.datacopy.config;

import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class ApplicationConfig {

    @Primary
    @Bean
    @ConfigurationProperties("spring.datasource.world")
    public DataSource dataSourceWorld() {
        return DataSourceBuilder.create().build();
    }

    @Primary
    @Bean
    public PlatformTransactionManager transactionManagerWorld() {
        return new DataSourceTransactionManager(dataSourceWorld());
    }

    @Bean
    @ConfigurationProperties("spring.datasource.ksbylending")
    public DataSource dataSourceKsbylending() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    public PlatformTransactionManager transactionManagerKsbylending() {
        return new DataSourceTransactionManager(dataSourceKsbylending());
    }

}
  • world データベース、ksbylending データベースの DataSource, PlatformTransactionManager Bean を定義します。

ChannelConfig.java

package ksbysample.eipapp.datacopy.integration.channel;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.MessageChannel;

@Configuration
public class ChannelConfig {

    @Bean
    public MessageChannel copyChannel() {
        return new QueueChannel(100);
    }

    @Bean
    public MessageChannel delChannel() {
        return new QueueChannel(100);
    }

}
  • DirectChannel で作成すると MessageChannel の前後の処理が1トランザクションとして処理されてしまい、getFlow でデータを取得して copyChannel にデータを渡したら status をその時点で ‘01’ に更新したくても copyFlow の処理が正常終了しないと更新されなくなるので、QueueChannel で作成し getFlow だけでトランザクションが終了するようにします。
  • delChannel の方も copyFlow の処理だけでトランザクションが終了するようにしたいので、同様に QueueChannel で作成します。

OrdersDto.java

package ksbysample.eipapp.datacopy.dto;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class OrdersDto {

    private String orderId;

    private String status;

}

OrdersDtoRowMapper.java

package ksbysample.eipapp.datacopy.dto;

import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class OrdersDtoRowMapper implements RowMapper<OrdersDto> {

    @Override
    public OrdersDto mapRow(ResultSet rs, int rowNum) throws SQLException {
        OrdersDto ordersDto = new OrdersDto();
        ordersDto.setOrderId(rs.getString("order_id"));
        ordersDto.setStatus(rs.getString("status"));
        return ordersDto;
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersRowMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class FlowConfig {

    private final DataSource dataSourceWorld;

    private final PlatformTransactionManager transactionManagerWorld;

    private final MessageChannel copyChannel;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
    }

    @Bean
    public MessageSource<Object> ordersJdbcMessageSource() {
        JdbcPollingChannelAdapter adapter
                = new JdbcPollingChannelAdapter(this.dataSourceWorld, "select * from orders where status = '00'");
        adapter.setRowMapper(new OrdersRowMapper());
        adapter.setUpdateSql("update orders set status = '01' where order_id in (:orderId)");
        return adapter;
    }

    @Bean
    public IntegrationFlow getFlow() {
        // MySQL の world データベースの orders テーブルから status = '00' のデータを
        // 1秒間隔で取得して copyChannel にデータを渡す。取得したデータの status カラム
        // は '00'→'01' に更新する。
        return IntegrationFlows.from(ordersJdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(1000)
                        .transactional(this.transactionManagerWorld)))
                .channel(this.copyChannel)
                .get();
    }

}

■その2

@Configuration
public class FlowConfig {

    ..........

    private final MessageChannel delChannel;

    private final CopyOrdersMessageHandler copyOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
    }


    ..........

    @Bean
    public IntegrationFlow copyFlow() {
        // copyChannel を 1秒間隔でチェックして、データがある場合には PostgreSQL
        // の ksbylending データベースの orders テーブルに insert する。insert に成功した
        // 場合にはデータをそのまま delChannel に渡す。
        return IntegrationFlows.from(this.copyChannel)
                .handle(this.copyOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .channel(this.delChannel)
                .get();
    }

}

■その3

@Configuration
public class FlowConfig {

    ..........

    private final DelOrdersMessageHandler delOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler
            , DelOrdersMessageHandler delOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
        this.delOrdersMessageHandler = delOrdersMessageHandler;
    }

    ..........

    @Bean
    public IntegrationFlow delFlow() {
        // delChannel を 1秒間隔でチェックして、データがある場合には MySQL の
        // world データベースの orders テーブルのデータを delete する。
        return IntegrationFlows.from(this.delChannel)
                .handle(this.delOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .get();
    }

}

■完成版

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDtoRowMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class FlowConfig {

    private final DataSource dataSourceWorld;

    private final PlatformTransactionManager transactionManagerWorld;

    private final MessageChannel copyChannel;

    private final MessageChannel delChannel;

    private final CopyOrdersMessageHandler copyOrdersMessageHandler;

    private final DelOrdersMessageHandler delOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler
            , DelOrdersMessageHandler delOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
        this.delOrdersMessageHandler = delOrdersMessageHandler;
    }

    @Bean
    public MessageSource<Object> ordersJdbcMessageSource() {
        JdbcPollingChannelAdapter adapter
                = new JdbcPollingChannelAdapter(this.dataSourceWorld, "select * from orders where status = '00'");
        adapter.setRowMapper(new OrdersDtoRowMapper());
        adapter.setUpdateSql("update orders set status = '01' where order_id in (:orderId)");
        return adapter;
    }

    @Bean
    public IntegrationFlow getFlow() {
        // MySQL の world データベースの orders テーブルから status = '00' のデータを
        // 1秒間隔で取得して copyChannel にデータを渡す。取得したデータの status カラム
        // は '00'→'01' に更新する。
        return IntegrationFlows.from(ordersJdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(1000)
                        .transactional(this.transactionManagerWorld)))
                .channel(this.copyChannel)
                .get();
    }

    @Bean
    public IntegrationFlow copyFlow() {
        // copyChannel を 1秒間隔でチェックして、データがある場合には PostgreSQL
        // の ksbylending データベースの orders テーブルに insert する。insert に成功した
        // 場合にはデータをそのまま delChannel に渡す。
        return IntegrationFlows.from(this.copyChannel)
                .handle(this.copyOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .channel(this.delChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delFlow() {
        // delChannel を 1秒間隔でチェックして、データがある場合には MySQL の
        // world データベースの orders テーブルのデータを delete する。
        return IntegrationFlows.from(this.delChannel)
                .handle(this.delOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .get();
    }

}

CopyOrdersMessageHandler.java

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDto;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;

@MessageEndpoint
public class CopyOrdersMessageHandler implements GenericHandler<List<OrdersDto>> {

    private final DataSource dataSource;

    public CopyOrdersMessageHandler(@Qualifier("dataSourceKsbylending") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    @Transactional(transactionManager = "transactionManagerKsbylending")
    public Object handle(List<OrdersDto> payload, Map<String, Object> headers) {
        JdbcMessageHandler insertHandler
                = new JdbcMessageHandler(this.dataSource, "insert into orders (order_id) values (:payload.orderId)");
        insertHandler.afterPropertiesSet();
        payload.stream()
                .forEach(dto -> {
                    Message<OrdersDto> orderxMessage = MessageBuilder.withPayload(dto).build();
                    insertHandler.handleMessage(orderxMessage);
                });
        return payload;
    }
}

DelOrdersMessageHandler.java

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDto;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;

@MessageEndpoint
public class DelOrdersMessageHandler implements GenericHandler<List<OrdersDto>> {

    private final DataSource dataSource;

    public DelOrdersMessageHandler(@Qualifier("dataSourceWorld") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    @Transactional(transactionManager = "transactionManagerWorld")
    public Object handle(List<OrdersDto> payload, Map<String, Object> headers) {
        JdbcMessageHandler deleteHandler
                = new JdbcMessageHandler(this.dataSource
                , "delete from orders where order_id = :payload.orderId and status = '01'");
        deleteHandler.afterPropertiesSet();
        payload.stream()
                .forEach(dto -> {
                    Message<OrdersDto> orderxMessage = MessageBuilder.withPayload(dto).build();
                    deleteHandler.handleMessage(orderxMessage);
                });
        return null;
    }
}

履歴

2017/01/11
初版発行。

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( 感想 )

最後に感想です。

  • Nexus Repository Manager OSS 3.x はインストールが難しくないし、UI も分かりやすくて使いやすいと思います。また maven だけでなく OSS 版で npm や docker が使用できる ( Artifactory の Open Source 版では使えないらしい )、と書かれているのを見かけたことがあります。その辺も使えるようになりたいですね。

  • ライブラリのアップロードは build.gradle の設定が分かればあとは簡単な印象です。build.gradle の設定を調べるのが一番大変でした。

  • ライブラリの作成・アップロードと、別プロジェクトから利用してみて気づいた点としては、

  • ライブラリを作ってあらためて Spring Boot の auto-configuration の便利さに気付かされました。build.gradle にライブラリを使用するように記述するだけでいろいろ自動で設定してくれるというのが本当にすごい!、こんなものもライブラリ化できるようになるんだな、と思いました。

さて、次は何をしましょうか。考え中です。。。

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( 大目次 )

  1. その1 ( 概要 )
  2. その2 ( Windows 版の Nexus Repository Manager OSS をインストールする )
  3. その3 ( Nexus Repository Manager OSS の画面を一通り見てみる )
  4. その4 ( IntelliJ IDEA の Project Defaults を設定する )
  5. その5 ( 簡単なライブラリを作成して Nexus に登録してみる )
  6. 番外編 ( IntelliJ IDEA を 2016.1.3 → 2016.2 へバージョンアップ )
  7. その6 ( 登録した ksbysample-library-simpleutils ライブラリを Spring Boot のプロジェクトから利用してみる )
  8. その7 ( ksbysample-library-simpleutils ライブラリをアップロードし直した時の挙動と、1.0-SNAPSHOT で呼び出せるのかを確認する )
  9. 番外編 ( Java SE を 8u92 → 8u102 へ、Git for Windows を 2.9.0 → 2.9.2 へバージョンアップ )
  10. その8 ( build.gradle に 1.0-SNAPSHOT のバージョンで記述している時にライブラリを更新するとすぐに反映されるのか? )
  11. その9 ( 1.0-SNAPSHOT のバージョンのまま Nexus にアップロードできるのか? )
  12. その10 ( 1.0-RELEASE で Nexus にアップロードする )
  13. その11 ( Nexus を 3.0.0-03 → 3.0.1-01 へバージョンアップする )
  14. 番外編 ( build.gradle にコマンドを書いて実行する )
  15. 番外編 ( IntelliJ IDEA を 2016.2 → 2016.2.1 へバージョンアップ )
  16. その12 ( 他のライブラリに依存するライブラリを作成する )
  17. その13 ( 利用するライブラリが依存しているライブラリの別バージョンを build.gradle で指定したらどうなるのか? )
  18. 番外編 ( IntelliJ IDEA を 2016.2.1 → 2016.2.2 へバージョンアップ )
  19. その14 ( sources と javadoc をアップロードする )
  20. その15 ( pom ファイルに name, description を追加する )
  21. 番外編 ( Gradle のバージョンを 3.0 に上げてみようとしたら Spring Boot 1.4.0 以降が必要でした )
  22. 番外編 ( IntelliJ IDEA を 2016.2.2 → 2016.2.4 へバージョンアップ )
  23. 番外編 ( Java SE を 8u102 → 8u112 へ、Git for Windows を 2.9.2 → 2.10.2 へバージョンアップ )
  24. 番外編 ( IntelliJ IDEA を 2016.2.4 → 2016.3 へバージョンアップ )
  25. 番外編 ( IntelliJ IDEA 2016.3 の新機能を試してみる )
  26. その16 ( Nexus を 3.0.1-01 → 3.1.0-04 へバージョンアップする )
  27. その17 ( Nexus へコンポーネントを登録する専用 User を作成する )
  28. その18 ( Spring Framework に依存するライブラリを作成する )
  29. その19 ( Spring Framework に依存するライブラリを作成する2 )
  30. その20 ( Spring Framework に依存するライブラリを作成する3 )
  31. その21 ( Spring Framework に依存するライブラリを作成する4 )
  32. その22 ( Spring Framework に依存するライブラリを作成する5 )
  33. その23 ( Spring Framework に依存するライブラリを作成する6 )
  34. 番外編 ( IntelliJ IDEA を 2016.3 → 2016.3.2 へ、Git for Windows を 2.10.2 → 2.11.0 へバージョンアップ )
  35. その24 ( Nexus を 3.1.0-04 → 3.2.0-01 へバージョンアップする )
  36. その25 ( Windows 版 Nexus 3 インストール手順 )
  37. その26 ( Linux のサーバに Nexus Repository Manager OSS をインストールする )

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( その26 )( Linux のサーバに Nexus Repository Manager OSS をインストールする )

概要

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( その25 )( Windows 版 Nexus 3 インストール手順 ) の続きです。

参照したサイト・書籍

目次

  1. Oracle VM VirtualBox のインストール
  2. Vagrant のインストール
  3. CentOS の仮想サーバを構築する
  4. Unix 版 Nexus Repository Manager OSS をインストールする
  5. JDK をインストールする
  6. 動作確認
  7. サービスとして登録・起動する
  8. ksbysample-library-depend-spring ライブラリをアップロードしてみる
  9. 次回は。。。

手順

Oracle VM VirtualBox のインストール

  1. Download VirtualBox のページの「Windows hosts」リンクをクリックし、VirtualBox-5.1.12-112440-Win.exe をダウンロードします。

    f:id:ksby:20170103193035p:plain

  2. VirtualBox-5.1.12-112440-Win.exe を実行します。

  3. Oracle VM VirtualBox 5.1.12 Setup」ダイアログが表示されます。「Next >」ボタンをクリックします。

  4. 「Custom Setup」画面が表示されます。「Location」を C:\virtualbox\ に変更した後、「Next >」ボタンをクリックします。

    f:id:ksby:20170103194330p:plain

  5. 次の画面が表示されます。チェックの状態を以下の画像の通り変更した後、「Next >」ボタンをクリックします。

    f:id:ksby:20170103194512p:plain

    ※「Create start menu entries」だけチェックを残したのは、仮想サーバの情報を確認するために VirtualBox の管理画面を見たい場合があるためです。

  6. 「Warning: Network Interfaces」画面が表示されます。「Yes」ボタンをクリックします。

  7. 「Ready to Install」画面が表示されます。「Install」ボタンをクリックします。

    インストールの途中で以下のようなダイアログが3回表示されますので都度「インストール」ボタンをクリックします。

    f:id:ksby:20170103194918p:plain

  8. インストールが完了すると「Oracle VM VirtualBox 5.1.12 installation is complete」画面が表示されます。「Start Oracle VM VirtualBox 5.1.12 after installation」のチェックボックスを外した後、「Finish」ボタンをクリックします。

Vagrant のインストール

  1. DOWNLOAD VAGRANT のページの Windows の「Universal (32 and 64-bit)」のリンクをクリックし、vagrant_1.9.1.msi をダウンロードします。

  2. vagrant_1.9.1.msi を実行します。

  3. Vagrant Setup」ダイアログが表示されます。「Next」ボタンをクリックします。

  4. 「End-User License Agreement」画面が表示されます。「I accept the terms in the License Agreement」をチェックした後、「Next」ボタンをクリックします。

  5. Destination Folder」画面が表示されます。C:\vagrant\ に変更した後、「Next」ボタンをクリックします。

  6. 「Ready to install Vagrant」画面が表示されます。「Install」ボタンをクリックします。

  7. インストールが完了すると「Completed the Vagrant Setup Wizard」画面が表示されます。「Finish」ボタンをクリックします。

  8. PC の再起動を求めるダイアログが表示されるので、「YES」ボタンをクリックして再起動します。

CentOS の仮想サーバを構築する

  1. C:\work\vagrant フォルダを作成します。

  2. C:\work\vagrant の下に boxes フォルダを作成します。

  3. Vagrantbox.es のページの以下の画像の赤枠の box ファイルの URL をコピーします。

    f:id:ksby:20170103232741p:plain

  4. コピーした URL をブラウザのアドレスバーにペーストして box ファイル centos-7-x86_64.box をダウンロードし、C:\work\vagrant\boxes の下に保存します。

  5. vagrant box add コマンドでダウンロードした box ファイルを追加します。

    f:id:ksby:20170104112823p:plain

  6. C:\work\vagrant の下に nexus-server フォルダを作成します。

  7. vagrant init コマンドで Vagrantfile を作成します。作成された時点の Vagrantfile は リンク先のその1の内容 です。

    f:id:ksby:20170104114633p:plain

  8. Vagrantfile をエディタで開き リンク先のその2の内容 に変更します。今はメモリを 2GB に増やすだけです。

  9. vagrant up コマンドを実行して仮想サーバを起動します。

    f:id:ksby:20170104120332p:plain f:id:ksby:20170104120502p:plain

  10. 起動した仮想サーバに ssh でアクセスしたいので、ssh.exe が入っている C:\Git\usr\bin を環境変数 PATH に追加します。

  11. vagrant ssh コマンドで仮想サーバに接続できることを確認します。

    f:id:ksby:20170104121041p:plain

Unix 版 Nexus Repository Manager OSS をインストールする

  1. Download Nexus Repository OSS ( http://www.sonatype.com/download-oss-sonatype ) のページにアクセスします。

  2. ページ内の「nexus-3.2.0-01-unix.tar.gz」リンクをクリックしてダウンロードします。

    f:id:ksby:20170104131229p:plain

  3. ダウンロードした nexus-3.2.0-01-unix.tar.gz を C:\work\vagrant\nexus-server に移動します。

  4. vagrant ssh で接続して /opt の下にインストールします。

    f:id:ksby:20170104183419p:plain

JDK をインストールする

  1. nexus run を実行しようとしてみたところエラーメッセージが出ました。/opt/nexus-3.2.0-01/bin の下を見たところ Windows 版にあった jre ディレクトリが存在しませんでしたので、別途インストールします。

    f:id:ksby:20170104212858p:plain

  2. Java SE Development Kit 8 Downloads の「Java SE Development Kit 8u112」の「jdk-8u112-linux-x64.tar.gz」リンクをクリックしてダウンロードします。

  3. ダウンロードした jdk-8u112-linux-x64.tar.gz を C:\work\vagrant\nexus-server に移動します。

  4. 以下のコマンドを実行し、jdk-8u112-linux-x64.tar.gz を展開して /opt/jdk1.8.0_112 を作成した後、/opt/java というシンボリックリンクを作成します。

    f:id:ksby:20170105191147p:plain

  5. /etc/profile.d の下に java.sh を作成し、リンク先の内容 を記述します。記述後は以下のようになります。

    f:id:ksby:20170105191712p:plain

  6. Ctrl+D を押してログアウトした後、再度 vagrant ssh で接続して java -version でバージョン情報が表示されることを確認します。

    f:id:ksby:20170105192526p:plain

動作確認

※動作確認前に Windows 版 nexus のサービスは停止しています。

  1. http://localhost:8081/ でアクセスした時に仮想サーバの nexus の管理画面にアクセスできるよう Vagrantfile を リンク先のその3の内容 に変更します。

  2. vagrant haltvagrant up を実行して仮想サーバを再起動します。

    f:id:ksby:20170105195036p:plain

  3. nexus run を実行します。

    f:id:ksby:20170105195822p:plain

    (.....しばらくログが出力され続けます.....)

    f:id:ksby:20170105195931p:plain

  4. http://localhost:8081/ にアクセスして管理画面が表示されることを確認します。

    f:id:ksby:20170105202616p:plain

  5. Ctrl+C を押して nexus を停止します。

サービスとして登録・起動する

  1. /opt/nexus-3.2.0-01 → /opt/nexus のシンボリックリンクを作成します。

    f:id:ksby:20170105220855p:plain

  2. /etc/systemd/system/ の下に nexus.service を作成し、リンク先の内容 を記述します。

    f:id:ksby:20170105221506p:plain

  3. systemctl コマンドで nexus を起動します。

    f:id:ksby:20170105222140p:plain

  4. http://localhost:8081/ にアクセスして管理画面が表示されることを確認します。

    f:id:ksby:20170105222359p:plain

  5. サーバ起動後に nexus が自動起動されるか確認します。vagrant haltvagrant up を実行して仮想サーバを再起動します。

    f:id:ksby:20170105222713p:plain

  6. vagrant ssh で接続して nexus が起動していることを確認します。また http://localhost:8081/ にアクセスして管理画面が表示されることも確認します。

    f:id:ksby:20170105223018p:plain

ksbysample-library-depend-spring ライブラリをアップロードしてみる

  1. IntelliJ IDEA で ksbysample-library-depend-spring プロジェクトを開きます。

  2. uploadArchives タスクを実行して登録します。BUILD SUCCESSFUL が表示されることが確認できます。

    f:id:ksby:20170105225113p:plain

  3. nexus の管理画面を見ると ksbysample-library-depend-spring ライブラリが登録されていることが確認できます。

    f:id:ksby:20170105225334p:plain

次回は。。。

目次を作成して感想を書いて締める予定です。

ソースコード

Vagrantfile

■その1

# -*- mode: ruby -*-
# vi: set ft=ruby :

# All Vagrant configuration is done below. The "2" in Vagrant.configure
# configures the configuration version (we support older styles for
# backwards compatibility). Please don't change it unless you know what
# you're doing.
Vagrant.configure("2") do |config|
  # The most common configuration options are documented and commented below.
  # For a complete reference, please see the online documentation at
  # https://docs.vagrantup.com.

  # Every Vagrant development environment requires a box. You can search for
  # boxes at https://atlas.hashicorp.com/search.
  config.vm.box = "centos-7-x86_64"

  # Disable automatic box update checking. If you disable this, then
  # boxes will only be checked for updates when the user runs
  # `vagrant box outdated`. This is not recommended.
  # config.vm.box_check_update = false

  # Create a forwarded port mapping which allows access to a specific port
  # within the machine from a port on the host machine. In the example below,
  # accessing "localhost:8080" will access port 80 on the guest machine.
  # config.vm.network "forwarded_port", guest: 80, host: 8080

  # Create a private network, which allows host-only access to the machine
  # using a specific IP.
  # config.vm.network "private_network", ip: "192.168.33.10"

  # Create a public network, which generally matched to bridged network.
  # Bridged networks make the machine appear as another physical device on
  # your network.
  # config.vm.network "public_network"

  # Share an additional folder to the guest VM. The first argument is
  # the path on the host to the actual folder. The second argument is
  # the path on the guest to mount the folder. And the optional third
  # argument is a set of non-required options.
  # config.vm.synced_folder "../data", "/vagrant_data"

  # Provider-specific configuration so you can fine-tune various
  # backing providers for Vagrant. These expose provider-specific options.
  # Example for VirtualBox:
  #
  # config.vm.provider "virtualbox" do |vb|
  #   # Display the VirtualBox GUI when booting the machine
  #   vb.gui = true
  #
  #   # Customize the amount of memory on the VM:
  #   vb.memory = "1024"
  # end
  #
  # View the documentation for the provider you are using for more
  # information on available options.

  # Define a Vagrant Push strategy for pushing to Atlas. Other push strategies
  # such as FTP and Heroku are also available. See the documentation at
  # https://docs.vagrantup.com/v2/push/atlas.html for more information.
  # config.push.define "atlas" do |push|
  #   push.app = "YOUR_ATLAS_USERNAME/YOUR_APPLICATION_NAME"
  # end

  # Enable provisioning with a shell script. Additional provisioners such as
  # Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the
  # documentation for more information about their specific syntax and use.
  # config.vm.provision "shell", inline: <<-SHELL
  #   apt-get update
  #   apt-get install -y apache2
  # SHELL
end

■その2

  config.vm.provider "virtualbox" do |vb|
  #   # Display the VirtualBox GUI when booting the machine
  #   vb.gui = true
  #
    # Customize the amount of memory on the VM:
    vb.memory = "2048"
  end

■その3

  # Create a forwarded port mapping which allows access to a specific port
  # within the machine from a port on the host machine. In the example below,
  # accessing "localhost:8080" will access port 80 on the guest machine.
  # config.vm.network "forwarded_port", guest: 80, host: 8080
  config.vm.network "forwarded_port", guest: 8081, host: 8081
  • config.vm.network "forwarded_port", guest: 8081, host: 8081 を追加します。

java.sh

JAVA_HOME=/opt/java
PATH=$PATH:$JAVA_HOME/bin

nexus.service

[Unit]
Description=nexus service
After=network.target

[Service]
Type=forking
ExecStart=/opt/nexus/bin/nexus start
ExecStop=/opt/nexus/bin/nexus stop
User=nexus
Restart=on-abort

[Install]
WantedBy=multi-user.target

履歴

2017/01/05
初版発行。

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( その25 )( Windows 版 Nexus 3 インストール手順 )

概要

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( その24 )( Nexus を 3.1.0-04 → 3.2.0-01 へバージョンアップする ) の続きです。

  • 今回の手順で確認できるのは以下の内容です。
    • Windows 版 Nexus 3 ( 3.1以降 ) のインストール手順をまとめます。
    • 3.1 以降の正式バージョンがリリースされた後にクリーンインストールの手順をまとめていなかったので、今回はクリーンインストールの手順を記述します。
    • 2017/1/3 時点でダウンロード可能な最新版 nexus-3.2.0-01-win64.zip を使用します。
    • インストール先は C:\nexus です。

参照したサイト・書籍

  1. Documentation Nexus Repository Manager 3.2 - Chapter 2. Installation and Running
    https://books.sonatype.com/nexus-book/reference3/install.html

目次

  1. nexus-3.2.0-01-win64.zip をダウンロードする
  2. C:\nexus ディレクトリを作成する
  3. nexus-3.2.0-01-win64.zip を解凍し、ディレクトリ・ファイル一式を移動する
  4. nexus /run を実行する
  5. サービスを登録して起動する
  6. 動作確認
  7. メモ書き
    1. nexus 起動時の JavaVM のオプションを変更するには?
    2. nexus から Maven Central Repository へアクセスする際に Proxy サーバがある場合には?

手順

nexus-3.2.0-01-win64.zip をダウンロードする

  1. Download Nexus Repository OSS ( http://www.sonatype.com/download-oss-sonatype ) のページにアクセスします。

  2. ページ内の「nexus-3.2.0-01-win64.zip」リンクをクリックしてダウンロードします。

    f:id:ksby:20170103105534p:plain

C:\nexus ディレクトリを作成する

  1. C:\ の下に nexus ディレクトリを作成します。

nexus-3.2.0-01-win64.zip を解凍し、ディレクトリ・ファイル一式を移動する

  1. nexus-3.2.0-01-win64.zip を解凍します。解凍すると nexus-3.2.0-01, sonatype-work の2つのフォルダが作成されます。

  2. nexus-3.2.0-01, sonatype-work の2つのフォルダを C:\nexus の下に移動します。移動後の C:\nexus の下のディレクトリ構成は以下のようになります。

    f:id:ksby:20170103144513p:plain

nexus /run を実行する

  1. nexus /run を実行し、Started Sonatype Nexus OSS 3.2.0-01 が出力されることを確認します。

    f:id:ksby:20170103152740p:plain f:id:ksby:20170103152901p:plain f:id:ksby:20170103153026p:plain f:id:ksby:20170103153132p:plain f:id:ksby:20170103153245p:plain f:id:ksby:20170103153351p:plain

    nexus の初回起動時のログを見ていると、初めて見るソフトもあって面白いです。

    • Jetty が使用されている。
    • Apache Shiro というものが使用されている ( このソフトは初めて聞きました )。
    • OrientDB というものが使用されている ( このソフトも初めて聞きました )。
    • Ehcache が使用されている。
    • elasticsearch が使用されている。
    • Quartz Scheduler というものが使用されている ( このソフトも初めて聞きました )。
  2. Ctrl+C を押して、起動した nexus を停止します。

サービスを登録して起動する

  1. コマンドプロンプトを「管理者として実行...」で起動した後、nexus /install nexus を実行しサービスを登録します。

    f:id:ksby:20170103161710p:plain

    f:id:ksby:20170103161839p:plain

  2. 「サービスの開始」リンクをクリックして起動します。

    f:id:ksby:20170103162009p:plain

動作確認

  1. http://localhost:8081/ にアクセスして管理画面が表示されることを確認します。

    f:id:ksby:20170103162456p:plain

  2. admin / admin123 でログインできることを確認します。

    f:id:ksby:20170103162618p:plain

    f:id:ksby:20170103162749p:plain

メモ書き

nexus 起動時の JavaVM のオプションを変更するには?

C:\nexus\nexus-3.2.0-01\bin の下にある nexus.vmoptions を編集します。nexus /run 実行直後は以下の内容でした。

-Xms1200M
-Xmx1200M
-XX:MaxDirectMemorySize=2G
-XX:+UnlockDiagnosticVMOptions
-XX:+UnsyncloadClass
-XX:+LogVMOutput 
-XX:LogFile=../sonatype-work/nexus3/log/jvm.log
-Djava.net.preferIPv4Stack=true
-Dkaraf.home=.
-Dkaraf.base=.
-Dkaraf.etc=etc/karaf
-Djava.util.logging.config.file=etc/karaf/java.util.logging.properties
-Dkaraf.data=../sonatype-work/nexus3
-Djava.io.tmpdir=../sonatype-work/nexus3/tmp
-Dkaraf.startLocalConsole=false

nexus から Maven Central Repository へアクセスする際に Proxy サーバがある場合には?

管理画面から設定します。

f:id:ksby:20170103171759p:plain

ソースコード

履歴

2017/01/03
初版発行。