DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Project Reactor support
Introduction
Project Reactor support for Flux and Mono are provided from the Rx Project Reactor extension. This allows you to develop applications in a reactive fashion on both the client and server side.
org.apache.cxf/cxf-rt-rs-extension-reactor/3.2.3 and io.projectreactor/reactor-core/3.1.0.RELEASE dependencies are required.
Client
The following simple example uses ObservableRxInvoker. org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker can be used if needed instead. Reviewing our systests for reactive may help as well.
String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
List<HelloWorldBean> collector = new ArrayList<>();
ClientBuilder.newClient()
.register(new JacksonJsonProvider())
.register(new ReactorInvokerProvider())
.target(address)
.request("application/json")
.rx(ReactorInvoker.class)
.get(HelloWorldBean.class)
.doOnNext(collector::add)
.subscribe();
// make sure to do a Thread.sleep or wait for the response to come back if you're trying to collect
Server
As a method return value
One simply returns reactor.core.publisher.Mono or reactor.core.publisher.Flux from the method and the runtime will make sure the response is finalized once the Flux/Mono flow is complete.
The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.reactor.server.ReactorInvoker. It does all the default JAXRSInvoker does and only checks if Flux or Mono are returned - if yes then it links it internally with the JAX-RS AsyncResponse. The invoker can be automatically registered with a lot of sensible defaults by using org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer.
The built in invoker handles Flux and Mono based on proper semantics. For instance, reading a JAX-RS response object is always a Mono (0 to 1 elements), but any method containing flux will convert to a Flux operation meant to contain 0 to n elements. You would typically use a Flux on a get with an arbitrary data set converted from a JSON array, Mono would likely be the right solution for any time you want to read the Response object.
Combining Flux/Mono with AsyncResponse
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setProvider(new JacksonJsonProvider());
new ReactorCustomizer().customize(sf); // use a JAXRSServerFactoryCustomizationExtension to customize the server
sf.setResourceClasses(FluxService.class);
sf.setResourceProvider(FluxService.class,
new SingletonResourceProvider(new FluxService(), true));
sf.setAddress("http://localhost:" + PORT + "/");
server = sf.create();
import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@GET
@Produces("application/json")
@Path("textJsonImplicitListAsyncStream")
public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
Flux.just("Hello", "Ciao")
.map(HelloWorldBean::new)
.subscribeOn(Schedulers.parallel())
.subscribe(new JsonStreamingAsyncSubscriber<>(ar));
}
// or you can just return the Flux
@GET
@Produces("application/json")
@Path("textJsonImplicitListAsyncStream2")
public Flux<HelloWorldBean> getJsonImplicitListStreamingAsync2() {
return Flux.just("Hello", "Ciao")
.map(HelloWorldBean::new)
.subscribeOn(Schedulers.parallel());
}