Project Reactor support
Introduction
Project Reactor support for Flux and Mono are provided from the Rx Project Reactor extension. This allows you to develop applications in a reactive fashion on both the client and server side.
org.apache.cxf/cxf-rt-rs-extension-reactor/3.2.3 and io.projectreactor/reactor-core/3.1.0.RELEASE dependencies are required.
Client
The following simple example uses ObservableRxInvoker. org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker can be used if needed instead. Reviewing our systests for reactive may help as well.
String address = "http://localhost:" + PORT + "/reactor/flux/textJson"; List<HelloWorldBean> collector = new ArrayList<>(); ClientBuilder.newClient() .register(new JacksonJsonProvider()) .register(new ReactorInvokerProvider()) .target(address) .request("application/json") .rx(ReactorInvoker.class) .get(HelloWorldBean.class) .doOnNext(collector::add) .subscribe(); // make sure to do a Thread.sleep or wait for the response to come back if you're trying to collect
Server
As a method return value
One simply returns reactor.core.publisher.Mono or reactor.core.publisher.Flux from the method and the runtime will make sure the response is finalized once the Flux/Mono flow is complete.
The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.reactor.server.ReactorInvoker. It does all the default JAXRSInvoker does and only checks if Flux or Mono are returned - if yes then it links it internally with the JAX-RS AsyncResponse. The invoker can be automatically registered with a lot of sensible defaults by using org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer.
The built in invoker handles Flux and Mono based on proper semantics. For instance, reading a JAX-RS response object is always a Mono (0 to 1 elements), but any method containing flux will convert to a Flux operation meant to contain 0 to n elements. You would typically use a Flux on a get with an arbitrary data set converted from a JSON array, Mono would likely be the right solution for any time you want to read the Response object.
Combining Flux/Mono with AsyncResponse
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.setProvider(new JacksonJsonProvider()); new ReactorCustomizer().customize(sf); // use a JAXRSServerFactoryCustomizationExtension to customize the server sf.setResourceClasses(FluxService.class); sf.setResourceProvider(FluxService.class, new SingletonResourceProvider(new FluxService(), true)); sf.setAddress("http://localhost:" + PORT + "/"); server = sf.create();
import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @GET @Produces("application/json") @Path("textJsonImplicitListAsyncStream") public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) { Flux.just("Hello", "Ciao") .map(HelloWorldBean::new) .subscribeOn(Schedulers.parallel()) .subscribe(new JsonStreamingAsyncSubscriber<>(ar)); } // or you can just return the Flux @GET @Produces("application/json") @Path("textJsonImplicitListAsyncStream2") public Flux<HelloWorldBean> getJsonImplicitListStreamingAsync2() { return Flux.just("Hello", "Ciao") .map(HelloWorldBean::new) .subscribeOn(Schedulers.parallel()); }
0 Comments