かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その23 )( http でリクエストを受信 → JdbcChannelMessageStore を使用した QueueChannel に Message を送信 → Message を受信して bat ファイルを実行する2 )

概要

記事一覧はこちらです。

参照したサイト・書籍

目次

  1. sleep.bat を作成する
  2. bat ファイルを起動する処理を作成する
  3. 動作確認
  4. バッチを複数起動可能にするには?
  5. 最後に

手順

sleep.bat を作成する

C:\eipapp\ksbysample-eipapp-batchexecutor\bat の下に sleep.bat を新規作成し、以下の内容を記述します。

@echo off
perl -sle "sleep $sleep_time" -- -sleep_time=%1
exit /B %ERRORLEVEL%

例えば sleep.bat 10 で起動すると、バッチを起動後 10秒 sleep してから終了します。

bat ファイルを起動する処理を作成する

InputStream 等の close 処理に Apache Commons IO を使いたいのと、ログの出力に lombok を使いたいので、build.gradle を以下のように変更します。

dependencies {
    def lombokVersion = "1.16.18"
    def errorproneVersion = "2.0.15"

    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    ..........
    compile("org.springframework.integration:spring-integration-jdbc")
    compile("commons-io:commons-io")
    testCompile("org.springframework.boot:spring-boot-starter-test")

    ..........
    compile("com.h2database:h2:1.4.192")

    // for lombok
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
    testCompileOnly("org.projectlombok:lombok:${lombokVersion}")

    // for Error Prone ( http://errorprone.info/ )
    ..........
}
  • compile("commons-io:commons-io") を追加します。Apache Commons IO は Spring IO Platform の Appendix A. Dependency versions に最新バージョンの 2.5 で定義されているので、バージョン番号の記述は不要です。
  • lombok をインストールするのに以下の3行を追加します。
    • def lombokVersion = "1.16.18"
    • compileOnly("org.projectlombok:lombok:${lombokVersion}")
    • testCompileOnly("org.projectlombok:lombok:${lombokVersion}")

変更した後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

src/main/java/ksbysample/eipapp/batchexecutor/FlowConfig.java を以下のように変更します。

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow httpBatchFlow() {
        return IntegrationFlows.from(
                // http://localhost:8080/batch?sleep=... でリクエストを受信して
                // sleep の値を payload にセットした Message を生成する
                Http.inboundGateway("/batch")
                        .requestMapping(r -> r
                                .methods(HttpMethod.GET)
                                .params("sleep"))
                        .payloadExpression("#requestParams.sleep[0]"))
                .handle((p, h) -> {
                    log.info("★★★ リクエストを受信しました ( sleep = {} )", p);
                    return p;
                })
                ..........
                .get();
    }

    @Bean
    public IntegrationFlow executeBatchFlow() {
        return IntegrationFlows.from(executeBatchQueueChannel())
                // QueueChannel に Message が送信されているか1秒毎にチェックする
                .bridge(e -> e.poller(Pollers.fixedDelay(1000)))
                // sleep.bat を実行する
                .handle((p, h) -> {
                    log.info("●●● sleep.bat {} を実行します", p);
                    ProcessBuilder builder = new ProcessBuilder();
                    builder.command(COMMAND_SLEEP_BAT, (String) p, ">", "NUL")
                            .directory(new File(WORKDIR_SLEEP_BAT))
                            .redirectErrorStream(true);

                    Process process = null;
                    try {
                        process = builder.start();
                        int exitCode = process.waitFor();
                        log.info("●●● sleep.bat {} が終了しました ( exitCode = {} )", p, exitCode);
                    } catch (IOException | InterruptedException e) {
                        throw new RuntimeException(e);
                    } finally {
                        if (process != null) {
                            IOUtils.closeQuietly(process.getInputStream());
                            IOUtils.closeQuietly(process.getOutputStream());
                            IOUtils.closeQuietly(process.getErrorStream());
                            process.destroy();
                        }
                    }

                    return null;
                })
                .get();
    }

}
  • クラスに @Slf4j アノテーションを付加します。
  • httpBatchFlow メソッド内に .handle((p, h) -> { log.info("★★★ リクエストを受信しました ( sleep = {} )", p); return p; }) を追加して、リクエストを受信したことが分かるようにします。
  • executeBatchFlow メソッド内の .handle((p, h) -> { ... }) に上記の sleep.bat を起動する処理を記述します。

動作確認

動作確認します。jar ファイルは作成せず、IntelliJ IDEA から起動して確認します。

アプリケーションを起動後、以下3つの URL で連続してアクセスします。

  • http://localhost:8080/batch?sleep=10
  • http://localhost:8080/batch?sleep=8
  • http://localhost:8080/batch?sleep=6

f:id:ksby:20170820172310p:plain

コンソールに出力されたログを見ると以下のことが分かります。想定通り動作しています。

  • http で送信したリクエストはバッチの実行中でも受信して処理されています。http 受信とバッチ実行は別のスレッドで処理されています。
  • リクエストは1件ずつ処理されてバッチを実行しています。実行されているバッチが終了しないと次のバッチは実行されていません。
  • バッチは起動した後、URL の sleep パラメータで指定された秒数待機して終了しています。

バッチを複数起動可能にするには?

バッチを複数起動可能にするには src/main/java/ksbysample/eipapp/batchexecutor/FlowConfig.java を以下のように変更します。

    @Bean
    public IntegrationFlow executeBatchFlow() {
        return IntegrationFlows.from(executeBatchQueueChannel())
                // QueueChannel に Message が送信されているか1秒毎にチェックする
                .bridge(e -> e.poller(Pollers.fixedDelay(1000)))
                // バッチを最大3スレッドで起動する
                // 最大数の制限を設けないのであれば Executors.newCachedThreadPool() にする 
                .channel(c -> c.executor(Executors.newFixedThreadPool(3)))
                // sleep.bat を実行する
                .handle((p, h) -> {
                    ..........
  • executeBatchFlow メソッドで Message を受信した後に .channel(c -> c.executor(Executors.newFixedThreadPool(3))) を追加すると最大3スレッドでバッチを実行するようになります。Executors#newFixedThreadPool メソッドに渡す値を変更すれば起動するバッチ数を変更できます。また .channel(c -> c.executor(Executors.newCachedThreadPool())) と記述すればリクエストが送信された分だけバッチが起動します。

アプリケーションを起動し直して sleep パラメータの値を 10, 9, 8, 7, 6 の5つに変えて連続してアクセスすると、

f:id:ksby:20170820180001p:plain

  • sleep = 10, 9, 8 の3つはリクエスト受信後にバッチが起動されています(3つ同時に実行されています)。またこの3つはそれぞれ別のスレッドで実行されています。
  • sleep = 7 は sleep = 9 のバッチが終了した後に起動され、sleep = 6 のバッチは sleep = 10 のバッチが終了した後に起動されています。

最後に

HTTP Support の方はサンプル通りなので難しくありませんでしたが、JdbcChannelMessageStore の方が少し分かりづらいですね。出力されたエラーメッセージと stackoverflow の QA を見てやっと実装できました。ちょっとした永続的な QueueChannel が使いたい時には、データベースの QueueChannel が使えると便利かもしれません。

最後に最終形のソースを掲載しておきます。

ソースコード

Application.java

package ksbysample.eipapp.batchexecutor;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.jpa.JpaRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ImportResource;

import javax.sql.DataSource;

/**
 * Application 用メイン + JavaConfig 用クラス
 */
@SpringBootApplication(exclude = {JpaRepositoriesAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
@ImportResource("classpath:applicationContext.xml")
public class Application {

    /**
     * メインクラス
     * @param args アプリケーション起動時のオプション
     */
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    /**
     * @return Tomcat JDBC Connection Pool の DataSource オブジェクト
     */
    @Bean
    @ConfigurationProperties("spring.datasource.tomcat")
    public DataSource dataSource() {
        return DataSourceBuilder.create()
                .type(org.apache.tomcat.jdbc.pool.DataSource.class)
                .build();
    }

}

application.properties

spring.datasource.tomcat.url=jdbc:h2:file:C:/eipapp/ksbysample-eipapp-batchexecutor/db/batchexecutordb
spring.datasource.tomcat.username=sa
spring.datasource.tomcat.password=
spring.datasource.tomcat.driverClassName=org.h2.Driver
spring.datasource.tomcat.initialSize=2
spring.datasource.tomcat.maxActive=2
spring.datasource.tomcat.maxIdle=2
spring.datasource.tomcat.minIdle=2
spring.datasource.tomcat.testOnBorrow=true
spring.datasource.tomcat.validationQuery=select 1
spring.datasource.tomcat.validationQueryTimeout=5
spring.datasource.tomcat.removeAbandoned=true
spring.datasource.tomcat.removeAbandonedTimeout=30
spring.datasource.tomcat.jdbc-interceptors=SlowQueryReport(threshold=5000)
# spring.datasource.jmx-enabled は spring.datasource.tomcat.jmx-enabled と書かないこと。
# spring.datasource.tomcat.jmx-enabled だと機能しない。
spring.datasource.jmx-enabled=true

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:jdbc="http://www.springframework.org/schema/jdbc"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
                                 http://www.springframework.org/schema/beans/spring-beans.xsd
                                 http://www.springframework.org/schema/integration
                                 http://www.springframework.org/schema/integration/spring-integration.xsd
                                 http://www.springframework.org/schema/jdbc
                                 http://www.springframework.org/schema/jdbc/spring-jdbc.xsd">

    <!--
        アプリケーション起動時に schema-h2.sql を実行して JdbcChannelMessageStore で使用する
        INT_CHANNEL_MESSAGE テーブルを新規作成する
        また、既に INT_CHANNEL_MESSAGE テーブルがあると schema-h2.sql 実行時にエラーが出るが、
        ignore-failures="ALL" を付与して無視するようにする
    -->
    <jdbc:initialize-database ignore-failures="ALL">
        <jdbc:script location="classpath:org/springframework/integration/jdbc/store/channel/schema-h2.sql"/>
    </jdbc:initialize-database>

</beans:beans>

FlowConfig.java

package ksbysample.eipapp.batchexecutor;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.http.Http;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.jdbc.store.channel.H2ChannelMessageStoreQueryProvider;
import org.springframework.integration.store.PriorityCapableChannelMessageStore;
import org.springframework.messaging.MessageChannel;

import javax.sql.DataSource;
import java.io.File;
import java.io.IOException;

/**
 * バッチ起動リクエスト受信+バッチ起動 Flow 設定用 JavaConfig クラス
 */
@Slf4j
@Configuration
public class FlowConfig {

    private static final String WORKDIR_SLEEP_BAT = "C:/eipapp/ksbysample-eipapp-batchexecutor/bat";
    private static final String COMMAND_SLEEP_BAT = WORKDIR_SLEEP_BAT + "/sleep.bat";

    private final DataSource dataSource;

    /**
     * コンストラクタ

     * @param dataSource {@link DataSource} オブジェクト
     */
    public FlowConfig(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    /**
     * {@link FlowConfig#executeBatchQueueChannel()} の ChannelMessageStore を H2 Database
     * にするための {@link JdbcChannelMessageStore}
     *
     * @return {@link PriorityCapableChannelMessageStore} オブジェクト
     */
    @Bean
    public PriorityCapableChannelMessageStore jdbcChannelMessageStore() {
        JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(this.dataSource);
        messageStore.setChannelMessageStoreQueryProvider(new H2ChannelMessageStoreQueryProvider());
        messageStore.setPriorityEnabled(true);
        return messageStore;
    }

    /**
     * bat ファイル実行指示用の QueueChannel
     *
     * @return {@link MessageChannel} オブジェクト
     */
    @Bean
    public MessageChannel executeBatchQueueChannel() {
        return MessageChannels.queue(jdbcChannelMessageStore(), "EXECUTE_BATCH")
                .get();
    }

    /**
     * http://localhost:8080/batch?sleep=... でリクエストを受信して、sleep パラメータの値を
     * payload にセットした Message を bat ファイル実行指示用の QueueChannel に送信した後、
     * sleep パラメータの文字列をそのままテキストとしてレスポンスとして返す
     *
     * @return {@link IntegrationFlow} オブジェクト
     */
    @Bean
    public IntegrationFlow httpBatchFlow() {
        return IntegrationFlows.from(
                // http://localhost:8080/batch?sleep=... でリクエストを受信して
                // sleep の値を payload にセットした Message を生成する
                Http.inboundGateway("/batch")
                        .requestMapping(r -> r
                                .methods(HttpMethod.GET)
                                .params("sleep"))
                        .payloadExpression("#requestParams.sleep[0]"))
                .handle((p, h) -> {
                    log.info("★★★ リクエストを受信しました ( sleep = {} )", p);
                    return p;
                })
                // message を bat ファイル実行指示用の QueueChannel に送信する
                // .wireTap(...) を利用することで、処理をここで中断せず次の .handle(...) が実行されるようにする
                .wireTap(f -> f.channel(executeBatchQueueChannel()))
                // http のレスポンスには sleep パラメータの文字列をそのままテキストで返す
                .handle((p, h) -> p)
                .get();
    }

    /**
     * bat ファイル実行指示用の QueueChannel に Message が送信されているか1秒毎にチェックし、
     * 送信されている場合には受信して sleep.bat を実行する。連続して Message を送信しても
     * 多重化していないので前のバッチが終了しないと次のバッチは実行されない。
     *
     * @return {@link IntegrationFlow} オブジェクト
     */
    @Bean
    public IntegrationFlow executeBatchFlow() {
        return IntegrationFlows.from(executeBatchQueueChannel())
                // QueueChannel に Message が送信されているか1秒毎にチェックする
                .bridge(e -> e.poller(Pollers.fixedDelay(1000)))
                // バッチを最大3スレッドで起動する
                // 最大数の制限を設けないのであれば Executors.newCachedThreadPool() にする
                // .channel(c -> c.executor(Executors.newFixedThreadPool(3)))
                // sleep.bat を実行する
                .handle((p, h) -> {
                    log.info("●●● sleep.bat {} を実行します", p);
                    ProcessBuilder builder = new ProcessBuilder();
                    builder.command(COMMAND_SLEEP_BAT, (String) p, ">", "NUL")
                            .directory(new File(WORKDIR_SLEEP_BAT))
                            .redirectErrorStream(true);

                    Process process = null;
                    try {
                        process = builder.start();
                        int exitCode = process.waitFor();
                        log.info("●●● sleep.bat {} が終了しました ( exitCode = {} )", p, exitCode);
                    } catch (IOException | InterruptedException e) {
                        throw new RuntimeException(e);
                    } finally {
                        if (process != null) {
                            IOUtils.closeQuietly(process.getInputStream());
                            IOUtils.closeQuietly(process.getOutputStream());
                            IOUtils.closeQuietly(process.getErrorStream());
                            process.destroy();
                        }
                    }

                    return null;
                })
                .get();
    }

}

履歴

2017/08/20
初版発行。