WebFlux 에서 JPA 사용하기 WebFlux 에서 JPA 를 굳이 사용해보려는 이유는 Sharding Spherer 를 WebFlux 에 적용하기 위해서이다.
이번포스팅에선 WebMVC 와 WebFlux 의 성능비교와 함께 WebFlux 환경에서 JPA 의 트랜잭션 기능이 모두 동작하는지 테스트하는것을 목표로한다.
R2DBC 로 샤딩을 처리하려면 AbstractRoutingConnectionFactory 를 사용하면 된다. 하지만 샤딩에 대한 조회 및 병합은 직접 구현해야 한다.
아래와 같이 1.5 초동안 sleep 처리된 함수를 동시에 1000번 수행하는 테스트를 진행한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 @Transactional public OrderDto addRandomOrder (Long accountId) { OrderEntity entity = new OrderEntity ( RandomTestUtil.generateRandomString(10 ), accountId); try { Thread.sleep(1500 ); } catch (InterruptedException e) { e.printStackTrace(); } entity = orderRepository.save(entity); return toDto(entity); }
WebFlux 에선 아래 코드로 Blocking 함수를 감싸 조치한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class ReactUtil { public static <T> Flux<T> blockingToFlux (Supplier<List<T>> supplier) { return Flux.defer(() -> Flux.fromIterable(supplier.get())) .subscribeOn(Schedulers.boundedElastic()); } public static <T> Mono<T> blockingToMono (Supplier<T> supplier) { return Mono.fromSupplier(supplier) .subscribeOn(Schedulers.boundedElastic()); } } ... @PostMapping("/{accountId}") public Mono<OrderDto> addRandomOrder (@PathVariable Long accountId) { return ReactUtil.blockingToMono(() -> { return orderService.addRandomOrder(accountId); }); }
테스트 테스트는 k6 를 통해 진행한다. 대기시간 1초로 동안 1000개의 vuser 가 요청을 수행한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import http from 'k6/http' ;export let options = { vus : 1000 , duration : '1s' , }; export default function ( ) { const url = 'http://localhost:8080/order' ; const payload = JSON .stringify ({ }); const params = { headers : { 'Content-Type' : 'application/json' , }, }; http.post (url, payload, params); }
사실 위 코드를 사용해 MVC 와 WebFlux 를 비교하는것은 큰 의미는 없다. 현재 동작중인 컴퓨터에서 Tomcat Thread pool 는 200개 넘게 만들 수 있지만 Thread.sleep 을 동시에 실행하는 횟수에 제한이 있기 때문이다.
Thread.sleep 을 통해 OS 타이머에 접근해 연산하는 과정에 횟수 제한이 있기 때문이라 생각된다.
Spring MVC 에선 Thread.sleep 를 동시에 50개, Spring WebFlux 에선 Thread.sleep 를 동시에 100개 실행할 수 있다. 이 차이 때문에 Spring WebFlux 가 2배정도 빠르게 수행된다.
하지만 아래와 같이 Thread.sleep 이 아닌 Mono.delay 를 사용하고 NIO 함수끼리 묶게 될 경우 압도적인 성능차이가 발생한다.
1 2 3 4 5 6 7 8 9 10 @PostMapping public Mono<OrderDto> addRandomOrder () { Mono<OrderDto> result = ReactUtil.blockingToMono(() -> { AccountDto accountDto = accountService.getRandomAccount(); return orderService.addRandomOrder(accountDto.getAccountId()); }); return Mono.delay(Duration.ofSeconds(1 )) .doOnNext(t -> System.out.println("timeover" )) .then(result); }
수행결과는 아래와 같다.
SpringMVC + Thread.sleep 조합 (31초, 1992개)
SpringWebFlux + Thread.sleep 조합 (17초, 2698)
SpringWebFlux + Mono.delay 조합 (1.7초, 3847)
Tomcat 의 경우 요청대기풀에서 최대 31초간 블락된 사용자가 있을 수 있다(시간제한으로 중단된 요청도 있음).
Mono.delay 내부에서도 Thread.sleep 과 마찬자기로 OS 에서 제공하는 타이머를 사용하겠지만 프레임워크에서 구현된 NIO 로직을 통해 효율적으로 진행되는것으로 판단된다.
트랜잭션 테스트 동시에 3개 요청을 보내 status 값을 체크한 뒤 이후과정을 진행하는 테스트 작성 한번은 200OK, 뒤에 요청되는 두번은 400 에러가 발생하는치 테스트한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import http from 'k6/http' ;import { check, group } from 'k6' ;export const options = { vus : 3 , }; const orderId = '1018674689176219648' ;export default function ( ) { const url = `http://localhost:8080/order/status/${orderId} ` ; const params = { headers : { 'Content-Type' : 'application/json' , }, }; let responses = []; group ('Sending 3 concurrent requests' , () => { responses = http.batch ([ ['PUT' , url, null , params], ['PUT' , url, null , params], ['PUT' , url, null , params], ]); }); let success = 0 ; let badRequest = 0 ; responses.forEach ((response ) => { if (response.status === 200 ) { success++; } else if (response.status === 400 ) { badRequest++; } }); check (success, { 'One request succeeded' : (s ) => s === 1 }); check (badRequest, { 'Two requests failed with 400 error' : (b ) => b === 2 }); }
running -> block(1sec) -> complete 하는 과정으로 진행
1 2 3 4 5 6 7 8 9 10 11 12 @PutMapping("/status/{id}") public Mono<OrderDto> updateStatus (@PathVariable Long id) { Mono<OrderDto> changeStatusRunning = ReactUtil.blockingToMono(() -> orderService.changeOrderStatusRunning(id)); Mono<Long> doSomethingDeploy = Mono.delay(Duration.ofSeconds(1 )) .doOnNext(t -> System.out.println("timeover" )); Mono<OrderDto> changeStatusComplete = ReactUtil.blockingToMono(() -> orderService.changeOrderStatusComplete(id)); return changeStatusRunning .then(doSomethingDeploy) .then(changeStatusComplete); }
orderService.changeOrderStatusRunning 메서드안에선 Perssimistc Lock 을 사용하여 타 스레드의 동시접근을 막았다.
1 2 3 4 @Lock(LockModeType.PESSIMISTIC_WRITE) @QueryHints(@QueryHint(name = AvailableSettings.JAKARTA_LOCK_TIMEOUT, value ="5000")) @Query("SELECT oe FROM OrderEntity oe WHERE oe.id = :id") Optional<OrderEntity> findByIdForUpdate (@Param("id") Long id) ;
데모코드
https://github.com/Kouzie/spring-reactive-demo/tree/master/spring-react-jpa https://github.com/Kouzie/spring-boot-demo/tree/main/sharding-demo/sharding-sphere