Get Started

Get Started

These guides demonstrate how to get started quickly with Hazelcast IMDG and Hazelcast Jet.

Hazelcast IMDG

Learn how to store and retrieve data from a distributed key-value store using Hazelcast IMDG. In this guide you’ll learn how to:

  • Create a cluster of 3 members.
  • Start Hazelcast Management Center
  • Add data to the cluster using a sample client in the language of your choice
  • Add and remove some cluster members to demonstrate data balancing capabilities of Hazelcast

Hazelcast Jet

Learn how to build a distributed data processing pipeline in Java using Hazelcast Jet. In this guide you’ll learn how to:

  • Install Hazelcast Jet and form a cluster on your computer
  • Build a simple pipeline that receives a stream of data, does some calculations and outputs some results
  • Submit the pipeline as a job to the cluster and observe the results
  • Scale the cluster up and down while the job is still running

Migrating from Imperative to Reactive

September 29, 2020
Migrating from Imperative to Reactive

From Wikipedia, Reactive Programming is “a declarative programming paradigm concerned with data streams and the propagation of change.”

The programming model is complex to master. It requires a lot of experience to feel comfortable with it. However, there’s no denying that it fits the cloud ecosystem perfectly. Since on-premises infrastructure is oversized, running a program that executes a couple of additional CPU cycles won’t change anything. On the other hand, you will pay for them if you host the same program on third-party infrastructure. Depending on the number of those cycles, and the number of nodes the program runs on, it can make a huge difference in your monthly bill.

In this post, I’d like to showcase how you can migrate a traditional sample Spring Boot application to the Reactive model one step at a time.

The application to migrate

The application to migrate is a Spring Boot application based on Java 14. It includes the following:

  1. Spring Web MVC to manage HTTP requests
  2. Spring Data JPA for database access

It’s a simple sample that offers 2 endpoints, one to read all entities of a database table and the other to load one by its primary key. In order to improve performance, Hazelcast is used as the 2nd-level Hibernate cache.

This caching requires com.hazelcast:hazelcast-hibernate53:2.1.0 as a dependency, as well as the following Spring Boot configuration:

spring:
 jpa:
  properties:
   hibernate:
    generate_statistics: true
    cache:
     use_second_level_cache: true
     use_query_cache: true
     region.factory_class: com.hazelcast.hibernate.HazelcastCacheRegionFactory
     hazelcast.instance_name: hazelcastInstance
   javax.persistence.sharedCache.mode: ENABLE_SELECTIVE

For exhaustive documentation on Hazelcast and Hibernate integration, please refer to the GitHub project.

Step 1: Migrating to WebMVC.fn

The traditional Spring WebMVC model is based on annotations. Controllers are annotated with @Controller. At startup time, the framework scans the classpath for such annotated classes, looks up methods that are annotated with @RequestMapping, and registers those accordingly.

Here’s a sample that uses the more specialized @RestController and @GetMapping annotations, but the intent is the same:

@RestController
public class PersonController {

  private final PersonRepository repository;

  public PersonController(PersonRepository repository) {
    this.repository = repository;
  }

  @GetMapping("/person")
  public List<Person> getAll() {
    return repository.findAll(Sort.by("lastName", "firstName"));
  }

  @GetMapping("/person/{id}")
  public Optional<Person> getOne(@PathVariable Long id) {
    return repository.findById(id);
  }
}

For the record, this annotation-based configuration is available since Spring WebMVC 2.5 – released in 2007! It has become the de facto standard. But version 5.2 introduces an alternative, WebMVC.fn.

WebMVC.fn eschews annotations, and the additional hurdle of classpath scanning and introspection that come with them, in favor of an explicit “functional” fluent API. Actually, it’s a clone of the Reactive API in a different package. For this reason, migrating to WebMvc.fn is a good first step.

A core concept in WebMVC.fn is the route. A route associates a path, and more exactly a path pattern, and an HTTP method pair to a function, which represents the code to execute when a call to that particular pair has been triggered.

The following diagram succinctly illustrates the API:

WebMVC.fn API summary

The getAll() method can be rewritten as:

public RouterFunction<ServerResponse> getAll(PersonRepository repo) {
  return route().GET(
    "/person",
    req -> ok().body(repo.findAll(Sort.by("lastName", "firstName")))
  ).build();
}

Likewise, getOne() can be rewritten as:

public RouterFunction<ServerResponse> getOne(PersonRepository repo) {
  return route().GET(
    "/person/{id}",
      req -> ok().body(repo.findById(Long.valueOf(req.pathVariable("id"))))
   ).build();
}

Note that the two above methods should contribute their return value to the Spring context. Hence, they both need to be annotated with @Bean in a @Configuration-annotated class.

In Kotlin, you can use the Kotlin Bean DSL to achieve the same, without annotations.

Step 2: Extract business logic into a Handler

As its name suggests, Spring WebMVC follows the Model-View-Controller pattern.

The Controller should be dedicated to routing, while the Model should contain the business logic. Both extracted methods contain both routing and business logic – even though the sample code’s logic is very simple. In standard applications, this would probably be more complex.

Spring’s proposal is to extract this logic into a class called a Handler. There’s no specific requirement for a class to be a Handler. Here’s the extracted class:

public class PersonHandler {

  private final PersonRepository repository;

  public PersonHandler(PersonRepository repository) {
    this.repository = repository;
  }

  public ServerResponse getAll(ServerRequest req) {
    return ok().body(
      repository.findAll(Sort.by("lastName", "firstName"))
    );
  }

  public ServerResponse getOne(ServerRequest req) {
    return ok().body(
      repository.findById(Long.valueOf(req.pathVariable("id")))
    );
  }
}

Routing methods can now be rewritten to use the above Handler class:

@Bean
public RouterFunction<ServerResponse> getAll(PersonHandler handler) {
   return route().GET("/person", handler::getAll).build();
}

@Bean
public RouterFunction<ServerResponse> getOne(PersonHandler handler) {
   return route().GET("/person/{id}", handler::getOne).build();
}

Because those are @Bean-annotated methods, the framework will call them at startup-time. It will also provide the required PersonHandler bean, provided there is one. Hence, you need an additional method:

@Bean
public PersonHandler handler(PersonRepository repository) {
  return new PersonHandler(repository);
}

Step 3: Merge route-creating methods if necessary

Having a dedicated method per route can quickly lead to an explosion of such methods. The route-creating API allows us to define multiple routes.

Let’s merge the two methods above into a single one:

public RouterFunction<ServerResponse> routes(PersonHandler handler) {
  return route().path(
    "/person", builder -> builder
      .GET("/", handler::getAll)
      .GET("/{id}", handler::getOne)
  ).build();
}

Merging routes together, and deciding which ones, is directly related to how you want to architect your code. It can be one method per route, one per endpoint, one per module, etc.

Step 4: Migrate from WebMVC.fn to WebFlux

At this point, our efforts were directed at making the code functional, not reactive. However, as I mentioned before, the reactive API maps one-for-one to this functional approach. Now the time has come to start migrating to reactive. We will start with the Controller layer.

Let’s replace the WebMVC dependency with the reactive dependency, called WebFlux:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

The code doesn’t compile anymore. To fix it, remove the old imports from the org.springframework.web.servlet.function package, and replace them with imports from the org.springframework.web.reactive.function.server package.

WebMVC.fn WebFlux
o.s.w.servlet.function.RouterFunction o.s.w.reactive.function.server.RouterFunction
o.s.w.servlet.function.ServerRequest o.s.w.reactive.function.server.ServerRequest
o.s.w.servlet.function.ServerResponse o.s.w.reactive.function.server.ServerResponse

Note that the return type should be changed from plain ServerResponse to the reactive Mono<ServerResponse> type. Now, the code compiles again!

Mono comes from Reactive Streams, a specification that focuses on Reactive Programming, and completely agnostic to the underlying language. In the Java ecosystem, Project Reactor is an implementation of Reactive Streams. The API defines the following building blocks:

  • A Publisher emits objects. Mono is a Publisher that can emit at most one object, while Flux is a Publisher that can potentially emit an infinite number of objects – think a Java 8 Stream.
  • A Subscriber subscribes to a Publisher to be notified when objects are emitted. Depending on the type of signals that it receives e.g. an object, an error signal, or a successful completion signal, its behavior may differ.
  • A Subscription materializes the relationship between a Publisher and a Subscriber. It allows for backpressure, a mechanism to limit the publishing rate according to what the Subscriber can handle.

The following diagram illustrates Reactive Streams from a bird’s eye view.

Reactive Streams API

At this point, the application still relies on JPA, which is blocking. As soon as you introduce a single blocking call in a reactive processing pipeline, it makes it blocking.

Hence, the application is blocking despite our efforts. In order to go further, we need to make the whole processing pipeline reactive. This includes data access code.

Step 5: Migrating from JPA to R2DBC

As mentioned above, JPA is blocking. The specification was designed at a time when the cloud was not as widespread as it is now, so that a reactive API was not included.

Hibernate is the default JPA implementation in Spring Boot. There’s an on-going initiative to make Hibernate reactive. So far, Reactive Hibernate supports PostgreSQL, MySQL, and DB2. It has some limitations, for example, it does not support the query cache. Also, one should ensure that all calls to the 2nd-level cache implementation methods are non-blocking as well.

Another alternative is to forego JPA/Hibernate/JDBC entirely and rely on R2DBC instead. In its own words:

“The Reactive Relational Database Connectivity (R2DBC) project brings reactive programming APIs to relational databases.”

This involves several steps:

  1. Use a non-blocking RDB2 driver for the database.
    At the time of this writing, R2DBC drivers are available for PostgreSQL, MySQL, MariaDB, SQL Server, H2, and Google Cloud Spanner.
  2. Migrate from JPA to R2DBC
    • Replace the spring-boot-starter-data-jpa dependency with spring-data-r2dbc
    • Change the inherited super interface from JpaRepository to ReactiveSortingRepository

Using R2DBC changes the return type of our repository methods:

  • When the database returns a single object e.g. load by primary key, it wraps it into a Mono
  • When it returns a potential collection, it wraps elements into a Flux

For methods that are generated at runtime via Spring Data, we only need to adapt the Handler code.

Compared to WebMVC.fn, WebFlux introduces a BodyInserter interface. As its name implies, a BodyInserter is responsible for populating the body of an HTTP response. A utility class, BodyInserters, provides inserters for the most common use-case e.g. empty body, form data, publisher, etc.

Body Inserter class diagram

Because Mono and Flux are Publishers, the BodyInserters.fromPublisher() method can be put to good use. The updated code is:

public Mono<ServerResponse> getAll(ServerRequest req) {
  Flux<Person> all = repository.findAll(Sort.by("lastName", "firstName"));
  return ok().body(fromPublisher(all, Person.class));
}

public Mono<ServerResponse> getOne(ServerRequest req) {
  Mono<Person> mono = repository.findById(Long.valueOf(req.pathVariable("id")));
  return ok().body(fromPublisher(mono, Person.class));
}

By default, Spring Boot detects the presence of the JDBC driver and creates a default data source based on it. With R2DBC, Spring Boot populates a bean of type ConnectionFactory in the Spring context.

Also, Spring Boot configures Hibernate to automatically read JPA entities and create the SQL schema from it at startup time. Additionally, the Spring Boot Data JPA starter allows you to provide a data.sql file that’s executed at start-up time to populate the database. By migrating to R2DBC, we lost both features.

We need to provide a schema.sql file. We read it, as well as the previous data file, and execute both of them explicitly when the application starts.

@Bean
public CommandLineRunner initialize(DatabaseClient client) {
  return (args) -> {
    InputStream schemaIn = new ClassPathResource("/schema.sql").getInputStream();
    String schema = StreamUtils.copyToString(schemaIn, UTF_8);
    InputStream dataIn = new ClassPathResource("/data.sql").getInputStream();
    String data = StreamUtils.copyToString(dataIn, UTF_8);
    client.execute(schema)
          .then()
          .and(client.execute(data)
                     .then())
          .block();
   };
}

Step 5.1: Make sure no call is blocking

As stated above, a single blocking call in a reactive pipeline makes it… blocking.

To ensure that no call is blocking, there are several ways. The first step is of course code reviews. Healthy engineering teams practice code reviews to improve the code that is committed and to make less experienced team members learn from the more experienced ones. This remains a manual process, and like every manual process, it has varying degrees of success that depends on human and organizational factors.

To make this check bullet-proof, Project Reactor provides the BlockHound library. BlockHound is a Java agent that can be started along with the application, and that throws an exception when it detects a blocking call.

Using BlockHound is a two-step process:

  • Add the dependency:
    <dependency>
      <groupId>io.projectreactor.tools</groupId>
      <artifactId>blockhound</artifactId>
      <version>1.0.4.RELEASE</version>
    </dependency>
  • Start BlockHound when the application starts:
    public static void main(String[] args) {
      BlockHound.install();
      SpringApplication.run(
        ImperativeToReactiveApplication.class,
        args
      );
    }

Step 6: Re-add caching

Schema creation and data population are not the only features that we let go of. By leaving Hibernate behind, we also lost its 2nd-level cache at the same time! It’s time to add it in a more manual way.

Let’s first add a cache bean to the Spring context:

@Bean
public IMap<Long, Person> cache(HazelcastInstance instance) {
  return instance.getMap("persons");
}

Note that the HazelcastInstance is already existing in the context, because Spring Boot takes care of starting it for us, either using a hazelcast.xml configuration file, or a Config bean that we provide. For more detailed information, please refer to the Spring Boot documentation.

Let’s now introduce a CachingService between the controller and the repository.

public class CachingService {

  private final IMap<Long, Person> cache;
  private final PersonRepository repository;

  public CachingService(IMap<Long, Person> cache,
                        PersonRepository repository) {
    this.repository = repository;
    this.cache = cache;
  }

  Mono<Person> findById(Long id) {
    Person person = cache.get(id);                  // 1
    if (person == null) {                           // 2
      Mono<Person> mono = repository.findById(id);  // 3
      return mono.doOnSuccess(p -> {                // 4
        cache.put(id, p);                           // 5
      });
    } else {
      return Mono.just(person);                     // 6
    }
  }
}
  1. Try to get the Person from the cache by its id
  2. No Person with the corresponding id is found
  3. Load the Person from the database
  4. Only when the Person will be successfully loaded from the database
  5. Put the Person in the cache
  6. If the Person is in the cache, just return it wrapped by a Mono

Caching is back.

There’s a catch, though. The get() and put() methods from Hazelcast are blocking: we introduced blocking calls again! If you’ve installed BlockHound in step 5.1, launch the application, and check call the endpoint, this will be apparent:

curl http://localhost:8080/person/1

reactor.blockhound.BlockingOperationError: Blocking call! jdk.internal.misc.Unsafe#park
at java.base/jdk.internal.misc.Unsafe.park(Unsafe.java) ~[na:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ HTTP GET “/person/1” [ExceptionHandlingWebHandler]

The next step is about fixing that.

Step 7: Use non-blocking methods

Hazelcast API offers traditional blocking methods, but non-blocking one as well, in the form of the getAsync() and putAsync() methods. However, it’s not enough as we need to know whether an entity is in the cache to decide to return it or load it from the database.

Mono offers the switchIfEmpty() method to design this behavior into the Reactor API. Its usage is the following:

Mono<Person> findById(Long id) {
  return Mono.fromCompletionStage(() -> cache.getAsync(id))
    .switchIfEmpty(repository.findById(id))
    .doOnNext(p -> cache.putAsync(p.getId(), p));
}

Conclusion

Reactive Programming allows you to optimize the usage of a server’s resources. It comes at a cost though: learning a new specific API.

In this post, we described how you can mitigate the risks of migrating to Project Reactor by following a step-by-step process:

  1. Use Functional Programming instead of annotations
  2. Migrate from WebMVC.fn to WebFlux
  3. Migrate from Spring Data JPA to Spring R2DBC
  4. Move from annotation-based synchronous caching to explicit asynchronous method calls

We also saw how to ensure that the whole call stack is entirely reactive with BlockHound.

The demo application for this blog post is available on GitHub.

Many thanks to my friend Oleh Dokuka for his detailed and relevant review!

About the Author

About the Author

Nicolas Frankel

Nicolas Frankel

Developer Advocate, Hazelcast

Nicolas Fränkel is a Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Currently working for Hazelcast. Also double as a teacher in universities and higher education schools, a trainer and triples as a book author.

Follow me on

Latest Blogs

Hazelcast Joins Hacktoberfest

Google Summer of Code 2020: A Mentor's Perspective

Google Summer of Code 2020: A Mentor’s Perspective

Designing an Evergreen Cache with Change Data Capture

Designing an Evergreen Cache with Change Data Capture

View all blogs by the author
Join Us On Slack