HomeAbout
 
  

Developing Reactive Applications with Spring Reactive MongoDB

March 1, 2023

We can use MongoDB with Spring Reactive Mongo to develop reactive applications. NoSQL structures seem to be more suitable for the use of reactive programming in complex production environments compared to R2DBC structures.

Spring Data Mongo Reactive provides us reactive MongoDB connector that is specifically designed for developing NOSQL reactive applications. As a result, our MongoDB operations can be executed asynchronously and non-blocking, leading to enhanced performance and scalability.

In addition to Reactive MongoDB, I recommend reading about the structures that I have previously mentioned and will be using in this article, starting with the theory , reactive endpoints - Mono and Flux , and finally, writing a reactive web client (for testing purposes), and if you are interested, the topic of Reactive Relational Databases.


In this article, we'll be building a complete user service using Spring Data Mongo Reactive. We'll use Functional Endpoints in Spring WebFlux for our Controller layer, and incorporate both custom queries via ReactiveMongoTemplate and readily available methods from ReactiveMongoRepository in our Service layer. You can access all the code on the Github repository.


Project Dependencies and Environment Setup

As we always emphasize, if you are developing a reactive application, all its components should be reactive.

implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'

With a docker-compose.yaml file, we can easily prepare MongoDB and start a MongoDB instance that is ready for use.

version: '3.7'
services:
mongodb_container:
image: mongo:latest
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: mongopw
ports:
- 27017:27017
volumes:
- mongodb_data_container:/data/db
volumes:
mongodb_data_container:

Getting our project up and running with MongoDB is a breeze. Simply running the command "docker-compose up" will ensure that our MongoDB instance is ready for use.

docker compose up -d

Developing Reactive Mongo Application

In order to establish a connection between our Reactive application and the MongoDB we've set up, we'll need to configure some settings. To do this, we can create a Reactive database connection using the MongoConfiguration class, which extends the AbstractReactiveMongoConfiguration class. We use the @EnableMongoRepositories annotation to inform Spring that we'll be working with MongoDB. Within the configuration, we'll override two methods, one to provide the database information and the other to provide the MongoClient.

package dev.gokhana.userservice.configuration;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import dev.gokhana.userservice.model.User;
import dev.gokhana.userservice.repository.UserRepository;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import reactor.core.publisher.Flux;
import java.util.concurrent.ThreadLocalRandom;
@Configuration
@EnableMongoRepositories
public class MongoConfiguration extends AbstractReactiveMongoConfiguration {
@Override
protected String getDatabaseName() {
return "users";
}
@Override
@Bean
public MongoClient reactiveMongoClient() {
return MongoClients.create("mongodb://root:mongopw@localhost:27017");
}
@Bean
public ReactiveMongoTemplate reactiveMongoTemplate() {
return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
}
@Bean
@ConditionalOnProperty(prefix = "job.autorun", name = "enabled", havingValue = "true", matchIfMissing = true)
public CommandLineRunner loadData(UserRepository repository) {
return (args) -> {
// save a couple of users
var users = Flux.just(
new User("Gökhan", ThreadLocalRandom.current().nextInt(1, 100)),
new User("Betül", ThreadLocalRandom.current().nextInt(1, 100)),
new User("Zühtü", ThreadLocalRandom.current().nextInt(1, 100))
);
repository.saveAll(users).subscribe();
};
}
}

We have prepared our structure that will establish a connection to the database. Now it's time to create the object that will be stored in our database. In MongoDB, the data we create is stored in structures called Documents. We can follow the same example we used in my previous article on R2DBC.

@Document
public class User {
@Id
private String id;
@NotBlank
private String name;
private Integer score;
//Getter, Setter, hashCode etc..

Now that we've defined our database document, the next step is to create a repository that will allow us to perform operations on this document. By extending ReactiveMongoRepository, we can quickly create our UserRepository, which will give us access to all the necessary CRUD methods like findAll, findById, save, and delete.

package dev.gokhana.userservice.repository;
import dev.gokhana.userservice.model.User;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface UserRepository extends ReactiveMongoRepository<User, String> {}

The next step in our development process involves creating a service that can handle all of the necessary operations. To accomplish this, we can inject UserRepository into our service. For the sake of example, we can also make ReactiveMongoTemplate available in the service. However, before we dive into those details, let's take a closer look at how UserRepository is used. With this repository, we can perform all CRUD operations, with each operation returning User object(s) as either a Mono or a Flux.

package dev.gokhana.userservice.service;
import com.mongodb.client.result.DeleteResult;
import dev.gokhana.userservice.model.User;
import dev.gokhana.userservice.repository.UserRepository;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;
@Service
public class UserServiceImpl implements UserService {
private final UserRepository userRepository;
private final ReactiveMongoTemplate template;
public UserServiceImpl(UserRepository userRepository, ReactiveMongoTemplate template) {
this.userRepository = userRepository;
this.template = template;
}
@Override
public Mono<User> getUserById(String id) {
return userRepository.findById(id);
}
@Override
public Flux<User> getUsers() {
return userRepository.findAll();
}
@Override
public Mono<User> saveUser(User userDTO) {
return userRepository.save(userDTO);
}
@Override
public Mono<User> updateUser(String id, User userDTO) {
return userRepository.findById(id).flatMap(user -> {
userDTO.setId(user.getId()); // if there is something else to update do it here.
return userRepository.save(userDTO);
});
}
@Override
public Mono<Void> deleteUser(String id) {
return userRepository.deleteById(id);
}

If we want to manage the entire query infrastructure ourselves, then ReactiveMongoTemplate comes into play. The ReactiveMongoTemplate, which we have already made available for use in our service, provides an infrastructure that allows us to manage all operations ourselves.

@Override
public Mono<Long> deleteByName(String name) {
return template.remove(query(where("name").is(name)), User.class).map(DeleteResult::getDeletedCount);
}

In the example, we used the remove method with reactiveTemplate. Thus, we have written a method that deletes the names (the name field) that are equal to the parameter received and returns the number of deleted ones. In this way, we can describe the queries we want.

Moving forward, we need to prepare functional endpoints that can handle requests for all of our service methods. Our first step is to create a UserHandler component, which will encompass all the required request and response objects (excluding routing).

package dev.gokhana.userservice.controller;
import dev.gokhana.userservice.controller.validation.ValidatorHandler;
import dev.gokhana.userservice.model.User;
import dev.gokhana.userservice.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import static org.springframework.web.reactive.function.BodyInserters.fromValue;
@Component
public class UserHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(UserHandler.class);
private final ValidatorHandler validator;
private final UserService userService;
public UserHandler(ValidatorHandler validator, UserService userService) {
this.validator = validator;
this.userService = userService;
}
public Mono<ServerResponse> getAll(ServerRequest request) {
return userService.getUsers().collectList().flatMap(users -> {
if (users.isEmpty()) {
return ServerResponse.noContent().build();
}
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(fromValue(users));
});
}
public Mono<ServerResponse> getUserById(ServerRequest request) {
String id = request.pathVariable("id");
return userService.getUserById(id).flatMap(contact -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(contact))).switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(User.class)
.doOnNext(validator::validate)
.flatMap(userService::saveUser)
.doOnSuccess(userSaved -> LOGGER.info("User saved with id: {} ", userSaved.getId()))
.doOnError(e -> LOGGER.error("Error in saveUser method", e))
.flatMap(user -> ServerResponse.created(getToUri(user)).bodyValue(user));
}
public Mono<ServerResponse> updateUser(ServerRequest request) {
Mono<User> userMono = request.bodyToMono(User.class);
String id = request.pathVariable("id");
return userMono
.flatMap(user -> userService.updateUser(id, user))
.flatMap(user -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(fromValue(user))).switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> deleteUser(ServerRequest request) {
String id = request.pathVariable("id");
return userService.deleteUser(id)
.then(ServerResponse.noContent().build())
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> deleteUserByName(ServerRequest request) {
Optional<String> name = request.queryParam("name");
if (name.isEmpty())
return ServerResponse.badRequest().contentType(MediaType.APPLICATION_JSON)
.bodyValue(Map.of("error", "There is no name in the field"));
return userService.deleteByName(name.get())
.then(ServerResponse.noContent().build())
.switchIfEmpty(ServerResponse.notFound().build());
}
private URI getToUri(User userSaved) {
return UriComponentsBuilder.fromPath(("/{id}"))
.buildAndExpand(userSaved.getId()).toUri();
}
}

From our code, it's clear that our overall structure is fully prepared. Our next step is to transform our routing procedures into endpoints using RouterFunctions.

package dev.gokhana.userservice.controller;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class RoutingHandler {
private static final String API = "/api/v1/users";
private static final String ID = "/{id}";
@Bean
public RouterFunction<ServerResponse> userRouter(UserHandler userHandler) {
return route(GET(API), userHandler::getAll)
.andRoute(POST(API).and(accept(MediaType.APPLICATION_JSON)), userHandler::createUser)
.andRoute(GET(API + ID), userHandler::getUserById)
.andRoute(PUT(API + ID).and(accept(MediaType.APPLICATION_JSON)), userHandler::updateUser)
.andRoute(DELETE(API + ID), userHandler::deleteUser)
.andRoute(DELETE(API).and(RequestPredicates.queryParam("name", StringUtils::hasText)), userHandler::deleteUserByName);
}
}

At this point, we have a fully operational structure that allows us to effectively route our requests, as well as a reactive application that integrates with MongoDB in an end-to-end manner.

Moving forward, we can easily set up a unit test structure by utilizing WebTestClient and mocking our database structures.

package dev.gokhana.userservice;
import dev.gokhana.userservice.model.User;
import dev.gokhana.userservice.repository.UserRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Mockito.when;
@SpringBootTest(classes = UserServiceApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {"job.autorun.enabled=false"})
class UserServiceApplicationTests {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserRepository userRepository;
@Test
public void findAllTest() {
List<User> users = Arrays.asList(new User("Gokhan", 15),
new User("Corç", 25));
Flux<User> flux = Flux.fromIterable(users);
when(userRepository.findAll())
.thenReturn(flux);
webTestClient.get().uri("api/v1/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class)
.isEqualTo(users);
}
}

Having finished all of our processes, we're ready to start our service and begin sending requests.

POST http://localhost:8083/api/v1/users
Content-Type: application/json
{
"name": "Zühtü",
"score": 52
}

After sending a request to create a new user, we should receive a response with a status code of 201 Created. This response will also contain the user's ID and other details.

{
"id": "61ed90c1d656605b62e19acd",
"name": "Zühtü",
"score": 52
}

Conclusion

With reactive programming, we can perform asynchronous and non-blocking operations with a reduced number of threads and hardware resources. we've developed an end-to-end reactive application that can perform non-blocking, asynchronous operations. In my opinion, applications implemented with MongoDB seem to be more ready for production environments compared to R2DBC database structures.

 

Gökhan Ayrancıoğlu

Gökhan Ayrancıoğlu

Sr. Software Engineer @Heycar | Tech Blogger

 

Copyright © 2023 All rights reserved

Made with Coffee ☕