かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その22 )( http でリクエストを受信 → JdbcChannelMessageStore を使用した QueueChannel に Message を送信 → Message を受信して bat ファイルを実行する )

概要

記事一覧はこちらです。

  • Spring Integration DSL で以下の処理を行う常駐型アプリケーションを作成します。
    • http://localhost:8080/batch?sleep=... でリクエストを受信します。17. HTTP Support の機能を利用して WebAPI を作成します。
    • 渡された sleep パラメータの値を payload に設定した Message を生成し、H2 Database を MessageStore とする QueueChannel に送信します。
    • H2 Database は今回作成する常駐型アプリケーションから組み込みモードで起動し、データは in-memory ではなくファイルに保存します。
    • QueueChannel を1秒間隔でチェックし、Message が送信されていたら受信して Windows の bat ファイルを実行します。
    • bat は同時に1つしか起動しないようにします。
    • bat ファイルは引数を1つ受け取り、引数で渡された値の秒数 sleep してから、引数の値を戻り値として返します。bat ファイル名は sleep.bat とし、sleep.bat [秒数] で起動します。sleep 機能は ActivePerl をインストールして Perlワンライナースクリプトで実装します。
  • 新しく使った機能としては、以下の2つです。
    • HTTP Support を使用した WebAPI の作成
    • データベースを MessageStore に利用した QueueChannel の作成
  • 2~3回に分けて書く予定です。

参照したサイト・書籍

  1. Spring Integration Java DSL Reference - Using Protocol Adapters
    https://github.com/spring-projects/spring-integration-java-dsl/wiki/spring-integration-java-dsl-reference#adapters

  2. Spring Integration Reference Manual - 17.4.6 URI Template Variables and Expressions
    http://docs.spring.io/spring-integration/docs/4.3.11.RELEASE/reference/htmlsingle/#_uri_template_variables_and_expressions

    • SpEL で #requestParams が使用できることはここに書かれています。
  3. Spring integration flow respond with HTTP 400
    https://stackoverflow.com/questions/42631872/spring-integration-flow-respond-with-http-400

    • 今回は使用していませんが、レスポンスの HTTP ステータスコードを変えたい場合の方法が書かれていたのを見つけたので、メモとして残しておきます。
  4. TransactionalPoller is SpringIntegration
    https://stackoverflow.com/questions/41184992/transactionalpoller-is-springintegration

  5. Jdbc initialize-database with Java config
    https://stackoverflow.com/questions/37971721/jdbc-initialize-database-with-java-config

  6. How can I run jdbc:initialize-database using condition?
    https://stackoverflow.com/questions/24308731/how-can-i-run-jdbcinitialize-database-using-condition

  7. Spring Integration Reference Manual - 18.4.2 Backing Message Channels
    http://docs.spring.io/spring-integration/docs/4.3.11.RELEASE/reference/htmlsingle/#jdbc-message-store-channels

    • INT_CHANNEL_MESSAGE の説明が記載されています。

目次

  1. ksbysample-eipapp-batchexecutor プロジェクトを作成する
  2. http://localhost:8080/batch?sleep=... でリクエストを受信する処理を作成する
  3. 渡された sleep パラメータの値を H2 Database の JdbcChannelMessageStore を使用した QueueChannel に送信する処理を作成する
  4. QueueChannel を1秒毎にチェックする処理を作成する
  5. 次回ヘ続く

手順

ksbysample-eipapp-batchexecutor プロジェクトを作成する

IntelliJ IDEA で Gradle プロジェクトを作成します。

  • プロジェクトは C:\project-springboot\ksbysample-boot-integration\ksbysample-eipapp-batchexecutor に作成します。
  • ArtifactId は ksbysample-eipapp-batchexecutor にします。

build.gradle を以下の内容に変更した後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.5.6.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "https://plugins.gradle.org/m2/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        // for Error Prone ( http://errorprone.info/ )
        classpath("net.ltgt.gradle:gradle-errorprone-plugin:0.0.11")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'groovy'
apply plugin: 'net.ltgt.errorprone'
apply plugin: 'checkstyle'
apply plugin: 'findbugs'
apply plugin: 'pmd'

sourceCompatibility = 1.8
targetCompatibility = 1.8

task wrapper(type: Wrapper) {
    gradleVersion = '3.5'
}

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing,-path']
compileJava.options.compilerArgs += ['-Xep:RemoveUnusedImports:WARN']

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

checkstyle {
    configFile = file("${rootProject.projectDir}/config/checkstyle/google_checks.xml")
    toolVersion = '8.1'
    sourceSets = [project.sourceSets.main]
}

findbugs {
    toolVersion = '3.0.1'
    sourceSets = [project.sourceSets.main]
    ignoreFailures = true
    effort = "max"
    excludeFilter = file("${rootProject.projectDir}/config/findbugs/findbugs-exclude.xml")
}

tasks.withType(FindBugs) {
    // Gradle 3.3以降 + FindBugs Gradle Plugin を組み合わせると、"The following errors occurred during analysis:"
    // の後に "Cannot open codebase filesystem:..." というメッセージが大量に出力されるので、以下の doFirst { ... }
    // のコードを入れることで出力されないようにする
    doFirst {
        def fc = classes
        if (fc == null) {
            return
        }
        fc.exclude '**/*.properties'
        fc.exclude '**/*.xml'
        fc.exclude '**/META-INF/**'
        fc.exclude '**/static/**'
        fc.exclude '**/templates/**'
        classes = files(fc.files)
    }
    reports {
        xml.enabled = false
        html.enabled = true
    }
}

pmd {
    toolVersion = "5.8.1"
    sourceSets = [project.sourceSets.main]
    ignoreFailures = true
    consoleOutput = true
    ruleSetFiles = rootProject.files("/config/pmd/pmd-project-rulesets.xml")
    ruleSets = []
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        // BOM は https://repo.spring.io/release/io/spring/platform/platform-bom/Brussels-SR4/
        // の下を見ること
        mavenBom("io.spring.platform:platform-bom:Brussels-SR4")
    }
}

bootRepackage {
    mainClass = 'ksbysample.eipapp.batchexecutor.Application'
}

dependencies {
    def errorproneVersion = "2.0.15"

    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.boot:spring-boot-starter-web")
    compile("org.springframework.boot:spring-boot-starter-data-jpa")
    compile("org.springframework.integration:spring-integration-http")
    compile("org.springframework.integration:spring-integration-jdbc")
    testCompile("org.springframework.boot:spring-boot-starter-test")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.2.RELEASE")
    compile("com.h2database:h2:1.4.192")

    // for Error Prone ( http://errorprone.info/ )
    errorprone("com.google.errorprone:error_prone_core:${errorproneVersion}")
    compileOnly("com.google.errorprone:error_prone_annotations:${errorproneVersion}")
}
  • Spring Boot は 1.5.6、Spring IO Platform は Brussels-SR4 を使用します。
  • CheckstyleFindBugs、PMD、Error Prone は全て入れます。config ディレクトリは ksbysample-webapp-lending プロジェクトの config ディレクトリ をそのまま持ってきます。
  • Gradle は 3.5 を使用します。
  • 17. HTTP Support の機能を利用して WebAPI を作成するので、以下の2行を記述します。
    • compile("org.springframework.boot:spring-boot-starter-web")
    • compile("org.springframework.integration:spring-integration-http")
  • H2 Database と JdbcChannelMessageStore を利用するので、以下の3行を記述します。作成したデータベースに IntelliJ IDEA の Database Tools から接続するために com.h2database:h2 は Database Tools が使用するクライアントライブラリのバージョンと同じバージョンを指定します。
    • compile("org.springframework.boot:spring-boot-starter-data-jpa")
    • compile("org.springframework.integration:spring-integration-jdbc")
    • compile("com.h2database:h2:1.4.192")

src/main/java の下に ksbysample.eipapp.batchexecutor パッケージを作成し、その下に Application.java を新規作成して以下の内容を記述します。

package ksbysample.eipapp.batchexecutor;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.jpa.JpaRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;

import javax.sql.DataSource;

/**
 * Application 用メイン + JavaConfig 用クラス
 */
@SpringBootApplication(exclude = {JpaRepositoriesAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    /**
     * @return Tomcat JDBC Connection Pool の DataSource オブジェクト
     */
    @Bean
    @ConfigurationProperties("spring.datasource.tomcat")
    public DataSource dataSource() {
        return DataSourceBuilder.create()
                .type(org.apache.tomcat.jdbc.pool.DataSource.class)
                .build();
    }

}

src/main/resources の下に application.properties を新規作成し、以下の内容を記述します。

spring.datasource.tomcat.url=jdbc:h2:file:C:/eipapp/ksbysample-eipapp-batchexecutor/db/batchexecutordb
spring.datasource.tomcat.username=sa
spring.datasource.tomcat.password=
spring.datasource.tomcat.driverClassName=org.h2.Driver
spring.datasource.tomcat.initialSize=2
spring.datasource.tomcat.maxActive=2
spring.datasource.tomcat.maxIdle=2
spring.datasource.tomcat.minIdle=2
spring.datasource.tomcat.testOnBorrow=true
spring.datasource.tomcat.validationQuery=select 1
spring.datasource.tomcat.validationQueryTimeout=5
spring.datasource.tomcat.removeAbandoned=true
spring.datasource.tomcat.removeAbandonedTimeout=30
spring.datasource.tomcat.jdbc-interceptors=SlowQueryReport(threshold=5000)
# spring.datasource.jmx-enabled は spring.datasource.tomcat.jmx-enabled と書かないこと。
# spring.datasource.tomcat.jmx-enabled だと機能しない。
spring.datasource.jmx-enabled=true

アプリケーションの実行環境として以下のディレクトリを作成し、H2 Database のデータベースファイルは C:\eipapp\batch-executor\db の下に作成する想定です。

C:\eipapp\ksbysample-eipapp-batchexecutor
├ bat
│ └ sleep.bat
└ db
   ├ batchexecutordb.mv.db
   └ batchexecutordb.trace.db

現在のプロジェクトの構成は以下のようになります。

f:id:ksby:20170820020957p:plain

http://localhost:8080/batch?sleep=... でリクエストを受信する処理を作成する

src/main/java/ksbysample/eipapp/batchexecutor の下に FlowConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.batchexecutor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.http.Http;

@Configuration
public class FlowConfig {

    /**
     * http://localhost:8080/batch?sleep=... でリクエストを受信して、sleep パラメータの値を 
     * payload にセットした Message を bat ファイル実行指示用の QueueChannel に送信した後、
     * sleep パラメータの文字列をそのままテキストとしてレスポンスとして返す
     * 
     * @return {@link IntegrationFlow} オブジェクト
     */
    @Bean
    public IntegrationFlow httpBatchFlow() {
        return IntegrationFlows.from(
                // http://localhost:8080/batch?sleep=... でリクエストを受信して
                // sleep の値を payload にセットした Message を生成する
                Http.inboundGateway("/batch")
                        .requestMapping(r -> r
                                .methods(HttpMethod.GET)
                                .params("sleep"))
                        .payloadExpression("#requestParams.sleep[0]"))
                // http のレスポンスには sleep パラメータの文字列をそのままテキストで返す
                .handle((p, h) -> p)
                .get();
    }

}

アプリケーションを実行して http://localhost:8080/batch?sleep=10 でアクセスすると、画面上に 10 の文字が表示されます。

f:id:ksby:20170820035004p:plain

渡された sleep パラメータの値を H2 Database の JdbcChannelMessageStore を使用した QueueChannel に送信する処理を作成する

まず H2 Database を JdbcChannelMessageStore とする QueueChannel を定義します。src/main/java/ksbysample/eipapp/batchexecutor/FlowConfig.java に以下の記述を追加します。

@Configuration
public class FlowConfig {

    private final DataSource dataSource;

    public FlowConfig(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    /**
     * {@link FlowConfig#executeBatchQueueChannel()} の ChannelMessageStore を H2 Database
     * にするための {@link JdbcChannelMessageStore}
     *
     * @return {@link PriorityCapableChannelMessageStore} オブジェクト
     */
    @Bean
    public PriorityCapableChannelMessageStore jdbcChannelMessageStore() {
        JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(this.dataSource);
        messageStore.setChannelMessageStoreQueryProvider(new H2ChannelMessageStoreQueryProvider());
        messageStore.setPriorityEnabled(true);
        return messageStore;
    }

    /**
     * bat ファイル実行指示用の QueueChannel
     *
     * @return {@link MessageChannel} オブジェクト
     */
    @Bean
    public MessageChannel executeBatchQueueChannel() {
        return MessageChannels.queue(jdbcChannelMessageStore(), "EXECUTE_BATCH")
                .get();
    }

    ..........
  • jdbcChannelMessageStore メソッド内で messageStore.setPriorityEnabled(true); と記述していますが、この行は必須です。引数に false は指定できません。この行を書かなかったり、false を指定した場合には、アプリケーション起動時に java.lang.IllegalArgumentException: When using priority, the 'PriorityCapableChannelMessageStore' must have priority enabled. というエラーが出てアプリケーションが起動しません。

次に FlowConfig#httpBatchFlow メソッドを以下のように変更します。

    @Bean
    public IntegrationFlow httpBatchFlow() {
        return IntegrationFlows.from(
                // http://localhost:8080/batch?sleep=... でリクエストを受信して
                // sleep の値を payload にセットした Message を生成する
                Http.inboundGateway("/batch")
                        .requestMapping(r -> r
                                .methods(HttpMethod.GET)
                                .params("sleep"))
                        .payloadExpression("#requestParams.sleep[0]"))
                // message を bat ファイル実行指示用の QueueChannel に送信する
                // .wireTap(...) を利用することで、処理をここで中断せず次の .handle(...) が実行されるようにする
                .wireTap(f -> f.channel(executeBatchQueueChannel()))
                // http のレスポンスには sleep パラメータの文字列をそのままテキストで返す
                .handle((p, h) -> p)
                .get();
    }
  • .wireTap(f -> f.channel(executeBatchQueueChannel())) を追加します。

アプリケーションを起動し直して http://localhost:8080/batch?sleep=10 の URL にアクセスするとエラーが発生しました。。。 画面のエラーメッセージを見ると Table "INT_CHANNEL_MESSAGE" not found と表示されています。INT_CHANNEL_MESSAGE というテーブルが必要なようですが、自動では作成してくれないようです。

f:id:ksby:20170820041438p:plain

INT_CHANNEL_MESSAGE を作成する方法を調べてみると、stackoverflow の以下の QA がヒットしました。

また Spring Integration Reference Manual では以下の場所に INT_CHANNEL_MESSAGE の記載がありました。

内容をまとめると、

  • classpath:org/springframework/integration/jdbc/store/channel/schema-h2.sql を実行すれば INT_CHANNEL_MESSAGE テーブルを作成してくれます。
  • XML ベースの Spring Integration の場合、XML ファイルに <jdbc:initialize-database><jdbc:script location="..."/></jdbc:initialize-database> を記述すると、アプリケーションの起動時に SQL ファイルを実行してくれます。
  • 1度アプリケーションを実行して INT_CHANNEL_MESSAGE テーブルを作成した後アプリケーションを落として再度起動すると、 INT_CHANNEL_MESSAGE テーブルが既に存在するため schema-h2.sql 実行時にエラーが出るのですが(アプリケーション自体は起動します)、<jdbc:initialize-database>ignore-failures="ALL" 属性を追加するとエラーが出なくなります(無視されます)。

今回は以下のように対応します。

  • classpath:org/springframework/integration/jdbc/store/channel/schema-h2.sqlXML ファイルを作成して実行するようにします。
  • XML ファイルは src/main/resources の下に applicationContext.xml というファイル名で作成し、Application クラスに @ImportResource("classpath:applicationContext.xml") アノテーションを付加して読み込みます。

src/main/resources の下に applicationContext.xml を新規作成し、以下の内容を記述します。

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:jdbc="http://www.springframework.org/schema/jdbc"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
                                 http://www.springframework.org/schema/beans/spring-beans.xsd
                                 http://www.springframework.org/schema/integration
                                 http://www.springframework.org/schema/integration/spring-integration.xsd
                                 http://www.springframework.org/schema/jdbc
                                 http://www.springframework.org/schema/jdbc/spring-jdbc.xsd">

    <!--
        アプリケーション起動時に schema-h2.sql を実行して JdbcChannelMessageStore で使用する 
        INT_CHANNEL_MESSAGE テーブルを新規作成する 
        また、既に INT_CHANNEL_MESSAGE テーブルがあると schema-h2.sql 実行時にエラーが出るが、 
        ignore-failures="ALL" を付与して無視するようにする
    -->
    <jdbc:initialize-database ignore-failures="ALL">
        <jdbc:script location="classpath:org/springframework/integration/jdbc/store/channel/schema-h2.sql"/>
    </jdbc:initialize-database>

</beans:beans>

src/main/java/ksbysample/eipapp/batchexecutor/Application.java を以下のように変更します。

@SpringBootApplication(exclude = {JpaRepositoriesAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
@ImportResource("classpath:applicationContext.xml")
public class Application {
  • @ImportResource("classpath:applicationContext.xml") アノテーションを Application クラスに付加します。

アプリケーションを起動し直して http://localhost:8080/batch?sleep=10 の URL にアクセスすると、今度はエラーにならず画面上に 10 と表示されました。

f:id:ksby:20170820090351p:plain

Tomcat を停止して、データベースに Message が入っていることを確認します。

IntelliJ IDEA の Database Tools で「+」-「Data Source」-「H2」を選択して、

f:id:ksby:20170820090618p:plain

「Data Sources and Drivers」ダイアログが表示されたら以下の値を入力して「OK」ボタンを押します。

f:id:ksby:20170820090934p:plain

  • 「URL」の一番右側のドロップダウンリストで「URL only」を選択します。
  • 「Name」に file:batchexecutordb を入力します。file: を付けているのは in-memory や tcp 接続と区別するためです。
  • 「User」に sa を入力します。
  • 「URL」に jdbc:h2:file:C:/eipapp/ksbysample-eipapp-batchexecutor/db/batchexecutordb を入力します。
  • 「Test Connection」ボタンをクリックして Successful が表示されることを確認します。

f:id:ksby:20170820091331p:plain

  • 「Schemas」タブをクリックした後、「All schemas」をチェックします。

Database Tools に戻ると「file:batchexecutordb」が追加されており、展開すると INT_CHANNEL_MESSAGE テーブルも表示されています。

f:id:ksby:20170820091536p:plain

INT_CHANNEL_MESSAGE テーブルの中のデータを表示すると、1件データが登録されていました。

f:id:ksby:20170820093547p:plain

登録されていたデータは一旦削除します。

QueueChannel を1秒毎にチェックする処理を作成する

src/main/java/ksbysample/eipapp/batchexecutor/FlowConfig.java に以下の記述を追加します。

@Configuration
public class FlowConfig {

    ..........

    /**
     * bat ファイル実行指示用の QueueChannel に Message が送信されているか1秒毎にチェックし、
     * 送信されている場合には受信して sleep.bat を実行する。連続して Message を送信しても
     * 多重化していないので前のバッチが終了しないと次のバッチは実行されない。
     *
     * @return {@link IntegrationFlow} オブジェクト
     */
    @Bean
    public IntegrationFlow executeBatchFlow() {
        return IntegrationFlows.from(executeBatchQueueChannel())
                // QueueChannel に Message が送信されているか1秒毎にチェックする
                .bridge(e -> e.poller(Pollers.fixedDelay(1000)))
                // まだバッチ実行処理は実装しない
                // メッセージを受信したことが分かるようログを出力する
                .log()
                // sleep.bat を実行する
                .handle((p, h) -> {
                    return null;
                })
                .get();
    }

}
  • executeBatchFlow メソッドを追加します。

アプリケーションを起動し直して http://localhost:8080/batch?sleep=10http://localhost:8080/batch?sleep=9http://localhost:8080/batch?sleep=8 の URL に連続してアクセスしてみると、payload=10、payload=9、payload=8 の3つの Message のログが出力されました。

f:id:ksby:20170820143350p:plain

送信された Message を見ると payload 以外に header にもいろいろ情報をセットしており、http://localhost:8080/batch?sleep=10 の URL にアクセスした時に送信されたメッセージを見ると以下の値がセットされていました。

GenericMessage [
    payload=10
    , headers={
        http_requestMethod=GET
        , accept-language=ja-JP,en;q=0.5
        , cookie=Idea-283d915c=f5d44479-0894-4fff-95d8-ec599ffec940
        , accept=[text/html, application/xhtml+xml, */*]
        , JdbcChannelMessageStore.CREATED_DATE=1503207113688
        , host=localhost:8080
        , http_requestUrl=http://localhost:8080/batch?sleep=10
        , connection=Keep-Alive
        , id=c30d9152-90d9-a804-3c58-ca489111d6aa
        , JdbcChannelMessageStore.SAVED=true
        , accept-encoding=gzip
        , deflate
        , user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko
        , timestamp=1503207113688
    }
]

次回ヘ続く

長くなったので一旦ここで止めて、次回へ続きます。

履歴

2017/08/20
初版発行。