Spring Boot + Spring Integration でいろいろ試してみる ( その33 )( 5.1 からの新機能 Java Functions Improvements を試してみる )
概要
記事一覧はこちらです。
- Spring Integration DSL でも RSocket が使えるようになるのか。。。と spring-integration/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java を見ていたのですが、その中に
return IntegrationFlows.from(Function.class)...
という見慣れない実装を見つけました。 - 調べたところ https://spring.io/blog/2018/10/29/spring-integration-5-1-goes-ga#java-functions-improvements に「Java Functions Improvements」という記述がありました。Spring Integration 5.1 からの新機能でした。興味が湧いたので試してみます。
参照したサイト・書籍
Spring Integration 5.1 goes GA! - Java Functions Improvements
https://spring.io/blog/2018/10/29/spring-integration-5-1-goes-ga#java-functions-improvementsspring-integration/src/reference/asciidoc/configuration.adoc
https://github.com/spring-projects/spring-integration/blob/master/src/reference/asciidoc/configuration.adoc
目次
- ksbysample-eipapp-functions プロジェクトを作成する
- Supplier&Function&Consumer の簡単なサンプルを作成する
- Supplier は Supplier<Message<?>> と書くことで Message オブジェクトを返すことが可能
- Function#apply で IntegrationFlow Bean の処理を呼び出す
手順
ksbysample-eipapp-functions プロジェクトを作成する
Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。
plugins { id "org.springframework.boot" version "2.1.6.RELEASE" id "java" id "idea" } apply plugin: "io.spring.dependency-management" group = "ksbysample.eipapp" version = "0.0.1-SNAPSHOT" sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } configurations { // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする testAnnotationProcessor.extendsFrom annotationProcessor testImplementation.extendsFrom compileOnly } repositories { mavenCentral() } dependencies { def lombokVersion = "1.18.6" implementation("org.springframework.boot:spring-boot-starter-integration") testImplementation("org.springframework.boot:spring-boot-starter-test") // for lombok // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している annotationProcessor("org.projectlombok:lombok:${lombokVersion}") compileOnly("org.projectlombok:lombok:${lombokVersion}") }
ディレクトリ構成は以下のようにしています。
Supplier&Function&Consumer の簡単なサンプルを作成する
package ksbysample.eipapp.functions; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @Slf4j @Configuration public class SupplierAndFunctionAndConsumerFlow { private AtomicInteger count = new AtomicInteger(0); // Supplier<?> は ? 型のオブジェクトが payload にセットされた Message を返す // MessageSource として利用できる @Bean public Supplier<Integer> countSupplier() { return () -> count.addAndGet(1); } @Bean public IntegrationFlow countDisplayFlow() { return IntegrationFlows .from(countSupplier() , e -> e.poller(Pollers.fixedDelay(1000))) .transform(DoubleAndToStrFunc()) .handle(addDotFunc()) .handle(printFunc()) .get(); } // Function<?, ?> は .transform(...) で利用したり、 public Function<Integer, String> DoubleAndToStrFunc() { return v -> String.valueOf(v * 2); } // .handle(...) で利用できる public Function<String, String> addDotFunc() { return s -> s + "..."; } // Consumer<?> は .handle(...) で利用できる public Consumer<String> printFunc() { return s -> log.warn(s); } }
上のサンプルを実行すると "2..."、"4..."、"6..." と順に出力されます。
同じ処理を Supplier は MessageSource で置き換えて、Function と Consumer は .transform(...)、.handle(...) に直接記述すると以下のようになります。
package ksbysample.eipapp.functions; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.messaging.support.MessageBuilder; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @Slf4j @Configuration public class MessageSourceAndLambdaFlow { private AtomicInteger count = new AtomicInteger(0); @Bean public MessageSource<Integer> countMessageSource() { return () -> MessageBuilder .withPayload(count.addAndGet(1)) .build(); } @Bean public IntegrationFlow countDisplayFlow() { return IntegrationFlows .from(countMessageSource() , e -> e.poller(Pollers.fixedDelay(1000))) .<Integer, String>transform(v -> String.valueOf(v * 2)) .handle((Function<String, String>) s -> s + "...") .handle((Consumer<String>) s -> log.warn(s)) .get(); } }
lambda 式を使っているのに (Function<String, String>)
や (Consumer<String>)
とキャストしないと build 時にエラーが出るのがなんか今ひとつな気がします。。。
Supplier は Supplier<Message<?>> と書くことで Message オブジェクトを返すことが可能
package ksbysample.eipapp.functions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.NullChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.handler.LoggingHandler; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @Configuration public class SupplierAndFunction2Flow { @Autowired private NullChannel nullChannel; private AtomicInteger count = new AtomicInteger(0); // Supplier<Message<?>> という記述にして Message オブジェクトを返すようにすることも出来る // この場合 MessageSource の時のように header を追加することが可能。 @Bean public Supplier<Message<Integer>> countSupplier() { return () -> MessageBuilder .withPayload(count.addAndGet(1)) .setHeader("random", ThreadLocalRandom.current().nextInt(100)) .build(); } @Bean public IntegrationFlow countDisplayFlow() { return IntegrationFlows .from(countSupplier() , e -> e.poller(Pollers.fixedDelay(1000))) .log(LoggingHandler.Level.WARN) .channel(nullChannel) .get(); } }
実行すると headers の中に random というデータが追加されています。
Function や Consumer でも Function<Message<Integer>, Message<String>>
や Consumer<Message<String>>
のように試してみたのですが、こちらは実行時にエラーになりました。この書き方が出来るのは Supplier だけのようです。
Function#apply で IntegrationFlow Bean の処理を呼び出す
以前 @MessagingGateway アノテーションを付加した interface を用意して IntegrationFlow の処理を呼び出す方法を記載しましたが、Function<?, ?> 型のフィールドを用意して Function#apply を呼び出すと IntegrationFlow の処理を呼び出すことができます。
package ksbysample.eipapp.functions; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.handler.GenericHandler; import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; import org.springframework.messaging.MessageHandlingException; import org.springframework.retry.RetryContext; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetrySynchronizationManager; import org.springframework.retry.support.RetryTemplate; import org.springframework.stereotype.Component; import java.util.function.Function; import static java.util.Collections.singletonMap; @Slf4j @Component public class FunctionSample implements CommandLineRunner { // IntegrationFlow で定義した処理を呼び出すには、 // * IntegrationFlows.from(Function.class)... と定義する // * Function<?, ?> 型の変数をフィールドに定義し // @Autowired, @Qualifier("<IntegrationFlow名>.gateway"), @Lazy アノテーションを付与する @Autowired @Qualifier("throwExceptionWithRetryFlow.gateway") @Lazy private Function<String, String> throwExceptionWithRetryFlowFunc; @Override public void run(String... args) throws Exception { System.out.println(throwExceptionWithRetryFlowFunc.apply("success???")); } @Configuration static class FlowConfig { /** * リトライ回数は最大5回、リトライ時は2秒待機する RetryTemplate を生成する * * @return {@link RetryTemplate} object */ @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // SimpleRetryPolicy retryTemplate.setRetryPolicy( new SimpleRetryPolicy(5, singletonMap(Exception.class, true))); // FixedBackOffPolicy FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(2000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice(); retryAdvice.setRetryTemplate(retryTemplate()); // リトライしても処理が成功しなかった場合には // MessageHandlingException#getCause#getMessage で取得したメッセージを payload // にセットした Message オブジェクトが次の処理に渡されるようにする retryAdvice.setRecoveryCallback(context -> { MessageHandlingException e = (MessageHandlingException) context.getLastThrowable(); String errMsg = e.getCause().getMessage(); log.error(errMsg); return errMsg; }); return retryAdvice; } @Bean public IntegrationFlow throwExceptionWithRetryFlow() { return IntegrationFlows .from(Function.class) .handle((GenericHandler<Object>) (p, h) -> { RetryContext retryContext = RetrySynchronizationManager.getContext(); log.warn("★★★ リトライ回数 = " + retryContext.getRetryCount()); // リトライ処理をさせたいので強制的に RuntimeException を throw する if (true) { throw new RuntimeException("error!!"); } return p; }, e -> e.advice(retryAdvice())) .get(); } } }
実行すると5回リトライして、最後に RuntimeException に渡した "error!!" のメッセージが表示されました。
started throwExceptionWithRetryFlow.gateway
と stopped throwExceptionWithRetryFlow.gateway
のメッセージが何度も出力されるのが気になります。
複数出力される理由を調べてみると、まず org.springframework.integration.gateway.GatewayProxyFactoryBean#onInit で7個の method 分 gatewayMap に gateway(throwExceptionWithRetryFlow.gateway)が put されて、
org.springframework.integration.gateway.GatewayProxyFactoryBean の doStart、doStop メソッドで gateway.start();
、gateway.stop();
が呼び出されるからでした。
1つ呼び出す仕組みをつくるだけで gateway が7個 start するとは。。。 あまり多用する仕組みではないのかもしれません。
履歴
2019/07/05
新規作成。