Developing Reactive Applications with Spring Reactive MongoDB
We can use MongoDB with Spring Reactive Mongo to develop reactive applications. NoSQL structures seem to be more suitable for the use of reactive programming in complex production environments compared to R2DBC structures.
Spring Data Mongo Reactive provides us reactive MongoDB connector that is specifically designed for developing NOSQL reactive applications. As a result, our MongoDB operations can be executed asynchronously and non-blocking, leading to enhanced performance and scalability.
In addition to Reactive MongoDB, I recommend reading about the structures that I have previously mentioned and will be using in this article, starting with the theory , reactive endpoints - Mono and Flux , and finally, writing a reactive web client (for testing purposes), and if you are interested, the topic of Reactive Relational Databases.
In this article, we'll be building a complete user service using Spring Data Mongo Reactive. We'll use Functional Endpoints in Spring WebFlux for our Controller layer, and incorporate both custom queries via ReactiveMongoTemplate and readily available methods from ReactiveMongoRepository in our Service layer. You can access all the code on the Github repository.
Project Dependencies and Environment Setup
As we always emphasize, if you are developing a reactive application, all its components should be reactive.
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'implementation 'org.springframework.boot:spring-boot-starter-webflux'testImplementation 'org.springframework.boot:spring-boot-starter-test'testImplementation 'io.projectreactor:reactor-test'
With a docker-compose.yaml file, we can easily prepare MongoDB and start a MongoDB instance that is ready for use.
version: '3.7'services:mongodb_container:image: mongo:latestenvironment:MONGO_INITDB_ROOT_USERNAME: rootMONGO_INITDB_ROOT_PASSWORD: mongopwports:- 27017:27017volumes:- mongodb_data_container:/data/dbvolumes:mongodb_data_container:
Getting our project up and running with MongoDB is a breeze. Simply running the command "docker-compose up" will ensure that our MongoDB instance is ready for use.
docker compose up -d
Developing Reactive Mongo Application
In order to establish a connection between our Reactive application and the MongoDB we've set up, we'll need to configure some settings. To do this, we can create a Reactive database connection using the MongoConfiguration class, which extends the AbstractReactiveMongoConfiguration class. We use the @EnableMongoRepositories annotation to inform Spring that we'll be working with MongoDB. Within the configuration, we'll override two methods, one to provide the database information and the other to provide the MongoClient.
package dev.gokhana.userservice.configuration;import com.mongodb.reactivestreams.client.MongoClient;import com.mongodb.reactivestreams.client.MongoClients;import dev.gokhana.userservice.model.User;import dev.gokhana.userservice.repository.UserRepository;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;import org.springframework.data.mongodb.core.ReactiveMongoTemplate;import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;import reactor.core.publisher.Flux;import java.util.concurrent.ThreadLocalRandom;@Configuration@EnableMongoRepositoriespublic class MongoConfiguration extends AbstractReactiveMongoConfiguration {@Overrideprotected String getDatabaseName() {return "users";}@Override@Beanpublic MongoClient reactiveMongoClient() {return MongoClients.create("mongodb://root:mongopw@localhost:27017");}@Beanpublic ReactiveMongoTemplate reactiveMongoTemplate() {return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());}@Bean@ConditionalOnProperty(prefix = "job.autorun", name = "enabled", havingValue = "true", matchIfMissing = true)public CommandLineRunner loadData(UserRepository repository) {return (args) -> {// save a couple of usersvar users = Flux.just(new User("Gökhan", ThreadLocalRandom.current().nextInt(1, 100)),new User("Betül", ThreadLocalRandom.current().nextInt(1, 100)),new User("Zühtü", ThreadLocalRandom.current().nextInt(1, 100)));repository.saveAll(users).subscribe();};}}
We have prepared our structure that will establish a connection to the database. Now it's time to create the object that will be stored in our database. In MongoDB, the data we create is stored in structures called Documents. We can follow the same example we used in my previous article on R2DBC.
@Documentpublic class User {@Idprivate String id;@NotBlankprivate String name;private Integer score;//Getter, Setter, hashCode etc..
Now that we've defined our database document, the next step is to create a repository that will allow us to perform operations on this document. By extending ReactiveMongoRepository, we can quickly create our UserRepository, which will give us access to all the necessary CRUD methods like findAll, findById, save, and delete.
package dev.gokhana.userservice.repository;import dev.gokhana.userservice.model.User;import org.springframework.data.mongodb.repository.ReactiveMongoRepository;public interface UserRepository extends ReactiveMongoRepository<User, String> {}
The next step in our development process involves creating a service that can handle all of the necessary operations. To accomplish this, we can inject UserRepository into our service. For the sake of example, we can also make ReactiveMongoTemplate available in the service. However, before we dive into those details, let's take a closer look at how UserRepository is used. With this repository, we can perform all CRUD operations, with each operation returning User object(s) as either a Mono or a Flux.
package dev.gokhana.userservice.service;import com.mongodb.client.result.DeleteResult;import dev.gokhana.userservice.model.User;import dev.gokhana.userservice.repository.UserRepository;import org.springframework.data.mongodb.core.ReactiveMongoTemplate;import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;import static org.springframework.data.mongodb.core.query.Criteria.where;import static org.springframework.data.mongodb.core.query.Query.query;@Servicepublic class UserServiceImpl implements UserService {private final UserRepository userRepository;private final ReactiveMongoTemplate template;public UserServiceImpl(UserRepository userRepository, ReactiveMongoTemplate template) {this.userRepository = userRepository;this.template = template;}@Overridepublic Mono<User> getUserById(String id) {return userRepository.findById(id);}@Overridepublic Flux<User> getUsers() {return userRepository.findAll();}@Overridepublic Mono<User> saveUser(User userDTO) {return userRepository.save(userDTO);}@Overridepublic Mono<User> updateUser(String id, User userDTO) {return userRepository.findById(id).flatMap(user -> {userDTO.setId(user.getId()); // if there is something else to update do it here.return userRepository.save(userDTO);});}@Overridepublic Mono<Void> deleteUser(String id) {return userRepository.deleteById(id);}
If we want to manage the entire query infrastructure ourselves, then ReactiveMongoTemplate comes into play. The ReactiveMongoTemplate, which we have already made available for use in our service, provides an infrastructure that allows us to manage all operations ourselves.
@Overridepublic Mono<Long> deleteByName(String name) {return template.remove(query(where("name").is(name)), User.class).map(DeleteResult::getDeletedCount);}
In the example, we used the remove method with reactiveTemplate. Thus, we have written a method that deletes the names (the name field) that are equal to the parameter received and returns the number of deleted ones. In this way, we can describe the queries we want.
Moving forward, we need to prepare functional endpoints that can handle requests for all of our service methods. Our first step is to create a UserHandler component, which will encompass all the required request and response objects (excluding routing).
package dev.gokhana.userservice.controller;import dev.gokhana.userservice.controller.validation.ValidatorHandler;import dev.gokhana.userservice.model.User;import dev.gokhana.userservice.service.UserService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.http.MediaType;import org.springframework.stereotype.Component;import org.springframework.web.reactive.function.server.ServerRequest;import org.springframework.web.reactive.function.server.ServerResponse;import org.springframework.web.util.UriComponentsBuilder;import reactor.core.publisher.Mono;import java.net.URI;import java.util.Map;import java.util.Optional;import static org.springframework.web.reactive.function.BodyInserters.fromValue;@Componentpublic class UserHandler {private static final Logger LOGGER = LoggerFactory.getLogger(UserHandler.class);private final ValidatorHandler validator;private final UserService userService;public UserHandler(ValidatorHandler validator, UserService userService) {this.validator = validator;this.userService = userService;}public Mono<ServerResponse> getAll(ServerRequest request) {return userService.getUsers().collectList().flatMap(users -> {if (users.isEmpty()) {return ServerResponse.noContent().build();}return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(users));});}public Mono<ServerResponse> getUserById(ServerRequest request) {String id = request.pathVariable("id");return userService.getUserById(id).flatMap(contact -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(contact))).switchIfEmpty(ServerResponse.notFound().build());}public Mono<ServerResponse> createUser(ServerRequest request) {return request.bodyToMono(User.class).doOnNext(validator::validate).flatMap(userService::saveUser).doOnSuccess(userSaved -> LOGGER.info("User saved with id: {} ", userSaved.getId())).doOnError(e -> LOGGER.error("Error in saveUser method", e)).flatMap(user -> ServerResponse.created(getToUri(user)).bodyValue(user));}public Mono<ServerResponse> updateUser(ServerRequest request) {Mono<User> userMono = request.bodyToMono(User.class);String id = request.pathVariable("id");return userMono.flatMap(user -> userService.updateUser(id, user)).flatMap(user -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(user))).switchIfEmpty(ServerResponse.notFound().build());}public Mono<ServerResponse> deleteUser(ServerRequest request) {String id = request.pathVariable("id");return userService.deleteUser(id).then(ServerResponse.noContent().build()).switchIfEmpty(ServerResponse.notFound().build());}public Mono<ServerResponse> deleteUserByName(ServerRequest request) {Optional<String> name = request.queryParam("name");if (name.isEmpty())return ServerResponse.badRequest().contentType(MediaType.APPLICATION_JSON).bodyValue(Map.of("error", "There is no name in the field"));return userService.deleteByName(name.get()).then(ServerResponse.noContent().build()).switchIfEmpty(ServerResponse.notFound().build());}private URI getToUri(User userSaved) {return UriComponentsBuilder.fromPath(("/{id}")).buildAndExpand(userSaved.getId()).toUri();}}
From our code, it's clear that our overall structure is fully prepared. Our next step is to transform our routing procedures into endpoints using RouterFunctions.
package dev.gokhana.userservice.controller;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.http.MediaType;import org.springframework.util.StringUtils;import org.springframework.web.reactive.function.server.RequestPredicates;import org.springframework.web.reactive.function.server.RouterFunction;import org.springframework.web.reactive.function.server.ServerResponse;import static org.springframework.web.reactive.function.server.RequestPredicates.*;import static org.springframework.web.reactive.function.server.RouterFunctions.route;@Configurationpublic class RoutingHandler {private static final String API = "/api/v1/users";private static final String ID = "/{id}";@Beanpublic RouterFunction<ServerResponse> userRouter(UserHandler userHandler) {return route(GET(API), userHandler::getAll).andRoute(POST(API).and(accept(MediaType.APPLICATION_JSON)), userHandler::createUser).andRoute(GET(API + ID), userHandler::getUserById).andRoute(PUT(API + ID).and(accept(MediaType.APPLICATION_JSON)), userHandler::updateUser).andRoute(DELETE(API + ID), userHandler::deleteUser).andRoute(DELETE(API).and(RequestPredicates.queryParam("name", StringUtils::hasText)), userHandler::deleteUserByName);}}
At this point, we have a fully operational structure that allows us to effectively route our requests, as well as a reactive application that integrates with MongoDB in an end-to-end manner.
Moving forward, we can easily set up a unit test structure by utilizing WebTestClient and mocking our database structures.
package dev.gokhana.userservice;import dev.gokhana.userservice.model.User;import dev.gokhana.userservice.repository.UserRepository;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.boot.test.mock.mockito.MockBean;import org.springframework.test.web.reactive.server.WebTestClient;import reactor.core.publisher.Flux;import java.util.Arrays;import java.util.List;import static org.mockito.Mockito.when;@SpringBootTest(classes = UserServiceApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {"job.autorun.enabled=false"})class UserServiceApplicationTests {@Autowiredprivate WebTestClient webTestClient;@MockBeanprivate UserRepository userRepository;@Testpublic void findAllTest() {List<User> users = Arrays.asList(new User("Gokhan", 15),new User("Corç", 25));Flux<User> flux = Flux.fromIterable(users);when(userRepository.findAll()).thenReturn(flux);webTestClient.get().uri("api/v1/users").exchange().expectStatus().isOk().expectBodyList(User.class).isEqualTo(users);}}
Having finished all of our processes, we're ready to start our service and begin sending requests.
POST http://localhost:8083/api/v1/usersContent-Type: application/json{"name": "Zühtü","score": 52}
After sending a request to create a new user, we should receive a response with a status code of 201 Created. This response will also contain the user's ID and other details.
{"id": "61ed90c1d656605b62e19acd","name": "Zühtü","score": 52}
Conclusion
With reactive programming, we can perform asynchronous and non-blocking operations with a reduced number of threads and hardware resources. we've developed an end-to-end reactive application that can perform non-blocking, asynchronous operations. In my opinion, applications implemented with MongoDB seem to be more ready for production environments compared to R2DBC database structures.