Spring Boot + Spring Integration でいろいろ試してみる ( その23 )( http でリクエストを受信 → JdbcChannelMessageStore を使用した QueueChannel に Message を送信 → Message を受信して bat ファイルを実行する2 )
概要
記事一覧はこちらです。
参照したサイト・書籍
目次
手順
sleep.bat を作成する
C:\eipapp\ksbysample-eipapp-batchexecutor\bat の下に sleep.bat を新規作成し、以下の内容を記述します。
@echo off perl -sle "sleep $sleep_time" -- -sleep_time=%1 exit /B %ERRORLEVEL%
例えば sleep.bat 10
で起動すると、バッチを起動後 10秒 sleep してから終了します。
bat ファイルを起動する処理を作成する
InputStream 等の close 処理に Apache Commons IO を使いたいのと、ログの出力に lombok を使いたいので、build.gradle を以下のように変更します。
dependencies { def lombokVersion = "1.16.18" def errorproneVersion = "2.0.15" // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 .......... compile("org.springframework.integration:spring-integration-jdbc") compile("commons-io:commons-io") testCompile("org.springframework.boot:spring-boot-starter-test") .......... compile("com.h2database:h2:1.4.192") // for lombok compileOnly("org.projectlombok:lombok:${lombokVersion}") testCompileOnly("org.projectlombok:lombok:${lombokVersion}") // for Error Prone ( http://errorprone.info/ ) .......... }
compile("commons-io:commons-io")
を追加します。Apache Commons IO は Spring IO Platform の Appendix A. Dependency versions に最新バージョンの 2.5 で定義されているので、バージョン番号の記述は不要です。- lombok をインストールするのに以下の3行を追加します。
def lombokVersion = "1.16.18"
compileOnly("org.projectlombok:lombok:${lombokVersion}")
testCompileOnly("org.projectlombok:lombok:${lombokVersion}")
変更した後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。
src/main/java/ksbysample/eipapp/batchexecutor/FlowConfig.java を以下のように変更します。
@Slf4j @Configuration public class FlowConfig { .......... @Bean public IntegrationFlow httpBatchFlow() { return IntegrationFlows.from( // http://localhost:8080/batch?sleep=... でリクエストを受信して // sleep の値を payload にセットした Message を生成する Http.inboundGateway("/batch") .requestMapping(r -> r .methods(HttpMethod.GET) .params("sleep")) .payloadExpression("#requestParams.sleep[0]")) .handle((p, h) -> { log.info("★★★ リクエストを受信しました ( sleep = {} )", p); return p; }) .......... .get(); } @Bean public IntegrationFlow executeBatchFlow() { return IntegrationFlows.from(executeBatchQueueChannel()) // QueueChannel に Message が送信されているか1秒毎にチェックする .bridge(e -> e.poller(Pollers.fixedDelay(1000))) // sleep.bat を実行する .handle((p, h) -> { log.info("●●● sleep.bat {} を実行します", p); ProcessBuilder builder = new ProcessBuilder(); builder.command(COMMAND_SLEEP_BAT, (String) p, ">", "NUL") .directory(new File(WORKDIR_SLEEP_BAT)) .redirectErrorStream(true); Process process = null; try { process = builder.start(); int exitCode = process.waitFor(); log.info("●●● sleep.bat {} が終了しました ( exitCode = {} )", p, exitCode); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } finally { if (process != null) { IOUtils.closeQuietly(process.getInputStream()); IOUtils.closeQuietly(process.getOutputStream()); IOUtils.closeQuietly(process.getErrorStream()); process.destroy(); } } return null; }) .get(); } }
- クラスに
@Slf4j
アノテーションを付加します。 - httpBatchFlow メソッド内に
.handle((p, h) -> { log.info("★★★ リクエストを受信しました ( sleep = {} )", p); return p; })
を追加して、リクエストを受信したことが分かるようにします。 - executeBatchFlow メソッド内の
.handle((p, h) -> { ... })
に上記の sleep.bat を起動する処理を記述します。
動作確認
動作確認します。jar ファイルは作成せず、IntelliJ IDEA から起動して確認します。
アプリケーションを起動後、以下3つの URL で連続してアクセスします。
http://localhost:8080/batch?sleep=10
http://localhost:8080/batch?sleep=8
http://localhost:8080/batch?sleep=6
コンソールに出力されたログを見ると以下のことが分かります。想定通り動作しています。
- http で送信したリクエストはバッチの実行中でも受信して処理されています。http 受信とバッチ実行は別のスレッドで処理されています。
- リクエストは1件ずつ処理されてバッチを実行しています。実行されているバッチが終了しないと次のバッチは実行されていません。
- バッチは起動した後、URL の sleep パラメータで指定された秒数待機して終了しています。
バッチを複数起動可能にするには?
バッチを複数起動可能にするには src/main/java/ksbysample/eipapp/batchexecutor/FlowConfig.java を以下のように変更します。
@Bean public IntegrationFlow executeBatchFlow() { return IntegrationFlows.from(executeBatchQueueChannel()) // QueueChannel に Message が送信されているか1秒毎にチェックする .bridge(e -> e.poller(Pollers.fixedDelay(1000))) // バッチを最大3スレッドで起動する // 最大数の制限を設けないのであれば Executors.newCachedThreadPool() にする .channel(c -> c.executor(Executors.newFixedThreadPool(3))) // sleep.bat を実行する .handle((p, h) -> { ..........
- executeBatchFlow メソッドで Message を受信した後に
.channel(c -> c.executor(Executors.newFixedThreadPool(3)))
を追加すると最大3スレッドでバッチを実行するようになります。Executors#newFixedThreadPool メソッドに渡す値を変更すれば起動するバッチ数を変更できます。また.channel(c -> c.executor(Executors.newCachedThreadPool()))
と記述すればリクエストが送信された分だけバッチが起動します。
アプリケーションを起動し直して sleep パラメータの値を 10, 9, 8, 7, 6 の5つに変えて連続してアクセスすると、
- sleep = 10, 9, 8 の3つはリクエスト受信後にバッチが起動されています(3つ同時に実行されています)。またこの3つはそれぞれ別のスレッドで実行されています。
- sleep = 7 は sleep = 9 のバッチが終了した後に起動され、sleep = 6 のバッチは sleep = 10 のバッチが終了した後に起動されています。
最後に
HTTP Support の方はサンプル通りなので難しくありませんでしたが、JdbcChannelMessageStore の方が少し分かりづらいですね。出力されたエラーメッセージと stackoverflow の QA を見てやっと実装できました。ちょっとした永続的な QueueChannel が使いたい時には、データベースの QueueChannel が使えると便利かもしれません。
最後に最終形のソースを掲載しておきます。
ソースコード
Application.java
package ksbysample.eipapp.batchexecutor; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.data.jpa.JpaRepositoriesAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ImportResource; import javax.sql.DataSource; /** * Application 用メイン + JavaConfig 用クラス */ @SpringBootApplication(exclude = {JpaRepositoriesAutoConfiguration.class, HibernateJpaAutoConfiguration.class}) @ImportResource("classpath:applicationContext.xml") public class Application { /** * メインクラス * @param args アプリケーション起動時のオプション */ public static void main(String[] args) { SpringApplication.run(Application.class, args); } /** * @return Tomcat JDBC Connection Pool の DataSource オブジェクト */ @Bean @ConfigurationProperties("spring.datasource.tomcat") public DataSource dataSource() { return DataSourceBuilder.create() .type(org.apache.tomcat.jdbc.pool.DataSource.class) .build(); } }
application.properties
spring.datasource.tomcat.url=jdbc:h2:file:C:/eipapp/ksbysample-eipapp-batchexecutor/db/batchexecutordb spring.datasource.tomcat.username=sa spring.datasource.tomcat.password= spring.datasource.tomcat.driverClassName=org.h2.Driver spring.datasource.tomcat.initialSize=2 spring.datasource.tomcat.maxActive=2 spring.datasource.tomcat.maxIdle=2 spring.datasource.tomcat.minIdle=2 spring.datasource.tomcat.testOnBorrow=true spring.datasource.tomcat.validationQuery=select 1 spring.datasource.tomcat.validationQueryTimeout=5 spring.datasource.tomcat.removeAbandoned=true spring.datasource.tomcat.removeAbandonedTimeout=30 spring.datasource.tomcat.jdbc-interceptors=SlowQueryReport(threshold=5000) # spring.datasource.jmx-enabled は spring.datasource.tomcat.jmx-enabled と書かないこと。 # spring.datasource.tomcat.jmx-enabled だと機能しない。 spring.datasource.jmx-enabled=true
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd"> <!-- アプリケーション起動時に schema-h2.sql を実行して JdbcChannelMessageStore で使用する INT_CHANNEL_MESSAGE テーブルを新規作成する また、既に INT_CHANNEL_MESSAGE テーブルがあると schema-h2.sql 実行時にエラーが出るが、 ignore-failures="ALL" を付与して無視するようにする --> <jdbc:initialize-database ignore-failures="ALL"> <jdbc:script location="classpath:org/springframework/integration/jdbc/store/channel/schema-h2.sql"/> </jdbc:initialize-database> </beans:beans>
FlowConfig.java
package ksbysample.eipapp.batchexecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpMethod; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.channel.MessageChannels; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.dsl.http.Http; import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; import org.springframework.integration.jdbc.store.channel.H2ChannelMessageStoreQueryProvider; import org.springframework.integration.store.PriorityCapableChannelMessageStore; import org.springframework.messaging.MessageChannel; import javax.sql.DataSource; import java.io.File; import java.io.IOException; /** * バッチ起動リクエスト受信+バッチ起動 Flow 設定用 JavaConfig クラス */ @Slf4j @Configuration public class FlowConfig { private static final String WORKDIR_SLEEP_BAT = "C:/eipapp/ksbysample-eipapp-batchexecutor/bat"; private static final String COMMAND_SLEEP_BAT = WORKDIR_SLEEP_BAT + "/sleep.bat"; private final DataSource dataSource; /** * コンストラクタ * @param dataSource {@link DataSource} オブジェクト */ public FlowConfig(DataSource dataSource) { this.dataSource = dataSource; } /** * {@link FlowConfig#executeBatchQueueChannel()} の ChannelMessageStore を H2 Database * にするための {@link JdbcChannelMessageStore} * * @return {@link PriorityCapableChannelMessageStore} オブジェクト */ @Bean public PriorityCapableChannelMessageStore jdbcChannelMessageStore() { JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(this.dataSource); messageStore.setChannelMessageStoreQueryProvider(new H2ChannelMessageStoreQueryProvider()); messageStore.setPriorityEnabled(true); return messageStore; } /** * bat ファイル実行指示用の QueueChannel * * @return {@link MessageChannel} オブジェクト */ @Bean public MessageChannel executeBatchQueueChannel() { return MessageChannels.queue(jdbcChannelMessageStore(), "EXECUTE_BATCH") .get(); } /** * http://localhost:8080/batch?sleep=... でリクエストを受信して、sleep パラメータの値を * payload にセットした Message を bat ファイル実行指示用の QueueChannel に送信した後、 * sleep パラメータの文字列をそのままテキストとしてレスポンスとして返す * * @return {@link IntegrationFlow} オブジェクト */ @Bean public IntegrationFlow httpBatchFlow() { return IntegrationFlows.from( // http://localhost:8080/batch?sleep=... でリクエストを受信して // sleep の値を payload にセットした Message を生成する Http.inboundGateway("/batch") .requestMapping(r -> r .methods(HttpMethod.GET) .params("sleep")) .payloadExpression("#requestParams.sleep[0]")) .handle((p, h) -> { log.info("★★★ リクエストを受信しました ( sleep = {} )", p); return p; }) // message を bat ファイル実行指示用の QueueChannel に送信する // .wireTap(...) を利用することで、処理をここで中断せず次の .handle(...) が実行されるようにする .wireTap(f -> f.channel(executeBatchQueueChannel())) // http のレスポンスには sleep パラメータの文字列をそのままテキストで返す .handle((p, h) -> p) .get(); } /** * bat ファイル実行指示用の QueueChannel に Message が送信されているか1秒毎にチェックし、 * 送信されている場合には受信して sleep.bat を実行する。連続して Message を送信しても * 多重化していないので前のバッチが終了しないと次のバッチは実行されない。 * * @return {@link IntegrationFlow} オブジェクト */ @Bean public IntegrationFlow executeBatchFlow() { return IntegrationFlows.from(executeBatchQueueChannel()) // QueueChannel に Message が送信されているか1秒毎にチェックする .bridge(e -> e.poller(Pollers.fixedDelay(1000))) // バッチを最大3スレッドで起動する // 最大数の制限を設けないのであれば Executors.newCachedThreadPool() にする // .channel(c -> c.executor(Executors.newFixedThreadPool(3))) // sleep.bat を実行する .handle((p, h) -> { log.info("●●● sleep.bat {} を実行します", p); ProcessBuilder builder = new ProcessBuilder(); builder.command(COMMAND_SLEEP_BAT, (String) p, ">", "NUL") .directory(new File(WORKDIR_SLEEP_BAT)) .redirectErrorStream(true); Process process = null; try { process = builder.start(); int exitCode = process.waitFor(); log.info("●●● sleep.bat {} が終了しました ( exitCode = {} )", p, exitCode); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } finally { if (process != null) { IOUtils.closeQuietly(process.getInputStream()); IOUtils.closeQuietly(process.getOutputStream()); IOUtils.closeQuietly(process.getErrorStream()); process.destroy(); } } return null; }) .get(); } }
履歴
2017/08/20
初版発行。