flatMapMany
Mono์์ ์ฌ๋ฌ ๊ฐ ์์๋ฅผ ์์ฑํ ๋ ์ฌ์ฉ
๐ Mono์์ ๋ฐํ๋ ๊ฒฐ๊ณผ๋ฅผ ์ฌ๋ฌ ๊ฐ์ ์์๋ก ๋ณํํ์ฌ Flux๋ก ์ฒ๋ฆฌํ ๋ ์ ์ฉ
fromIterable
Iterable ํ์ ์ ๋ฐ์ดํฐ(list, set, map ๋ฑ์ ์ปฌ๋ ์ )๋ฅผ Flux๋ก ๋ณํ
๐ ์ปฌ๋ ์ ๋ด์ ์์๋ค์ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ผ๋ก ๋ณํํ์ฌ ๋น๋๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌ
map, mapNotNull
map์ onNext ์ด๋ฒคํธ๋ฅผ ๋ฐ์์ ๊ฐ์ ๋ณ๊ฒฝํ๊ณ ์๋๋ก ์ ๋ฌ
Flux.range(1, 5) .map(value -> value * 2) .doOnNext(value -> { log.info("doOnNext: " + value); }) .subscribe();โ
mapNotNull์ ๋ณ๊ฒฝ๋ ๊ฐ์ด null์ธ ๊ฒฝ์ฐ ํํฐ
๐ Flux์์ null ๊ฐ์ onNext๋ก ์ ๋ฌํ๋ฉด ์๋ฌ ๋ฐ์
Flux.range(1, 5) .mapNotNull(value -> { if (value % 2 == 0) { return value; } return null; }) .doOnNext(value -> { log.info("doOnNext: " + value); }) .subscribe();โ
doOnXX
doOnSubscribe, doOnNext, doOnComplete, doOnError ๋ฑ
๊ฐ๊ฐ์ ์ด๋ฒคํธ๋ฅผ ํ๋ฆ์ ์ํฅ์ ์ฃผ์ง ์๊ณ ์์์ ๋ด๋ ค์ค๋ ์ด๋ฒคํธ์ ๋ํด์ ๋ก๊น ์ด๋ ์ถ๊ฐ ์์ ๊ฐ๋ฅ
Flux.range(1, 5)
.map(value -> value * 2)
.doOnNext(value -> {
log.info("doOnNext: " + value);
})
.doOnComplete(() -> {
log.info("doOnComplete");
})
.doOnSubscribe(subscription -> {
log.info("doOnSubscribe");
})
.doOnRequest(value -> {
log.info("doOnRequest: " + value);
})
.map(value -> value / 2)
.subscribe();
โด๏ธ ์ด๋ฒคํธ ๋ฐ์ ์์
1. doOnSubscribe : Flux๊ฐ ๊ตฌ๋ ๋ ๋ ๋ฐ์
2. doOnRequest : ๊ตฌ๋ ์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์์ฒญํ ๋ ๋ฐ์ ๐ ๊ตฌ๋ ์ด ์์๋ ๋ ๊ตฌ๋ ์๊ฐ publisher์๊ฒ ์ต๋ํ ๋ง์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ๊ณ ์ ํจ (Long.MAX_VALUE)
3. ๋ฐ์ดํฐ ๋ฐํ ๋ฐ ์ฒ๋ฆฌ
- Flux.range(1, 5)์์ 1~5๊น์ง์ ๊ฐ์ด ์์ฐจ์ ์ผ๋ก ๋ฐํ๋๋ฉฐ map ์ฐ์ฐ์์์ ๊ฐ ๊ฐ์ด 2๋ฐฐ๋ก ๋ณํ
- doOnNext ์ฐ์ฐ์์์ ๋ณํ๋ ๊ฐ ๊ฐ์ด ๋ก๊ทธ์ ๊ธฐ๋ก๋จ
4. doOnComplete : ๋ชจ๋ ๊ฐ์ด ์ฑ๊ณต์ ์ผ๋ก ๋ฐํ๋๊ณ ์๋ฃ๋์์ ๋ ๋ฐ์
โณ๏ธ ๊ฒฐ๊ณผ๊ฐ
doOnSubscribe
doOnRequest: 9223372036854775807
doOnNext: 2
doOnNext: 4
doOnNext: 6
doOnNext: 8
doOnNext: 10
doOnComplete
flatMap
onNext ์ด๋ฒคํธ๋ฅผ ๋ฐ์์ publisher๋ฅผ ๋ฐํํ๊ณ publisher์ ์ด๋ฒคํธ๋ฅผ ์๋๋ก ์ ๋ฌ
์ฌ๋ฌ publisher๋ฅผ ์กฐํฉํด์ผํ๋ ๊ฒฝ์ฐ ์ ์ฉ
Flux.range(1, 5)
.flatMap(value -> {
return Flux.range(1, 2)
.map(value2 -> value + ", " + value2)
.publishOn(Schedulers.parallel());
})
.doOnNext(value -> {
log.info("doOnNext: " + value);
})
.subscribe();
- flatMap์ ๊ฐ ์์๋ฅผ ์ฒ๋ฆฌํ ๋๋ง๋ค ๋ด๋ถ์ ์ผ๋ก ์๋ก์ด Flux ์์ฑ
- publisher๊ฐ ๋น๋๊ธฐ๋ก ๋์ํ ๋ ์์ ๋ณด์ฅ X
- publishOn(Schedulers.parallel())์ ์ํด ๋ด๋ถ Flux์ ์์ ์ด ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌ๋จ
โณ๏ธ ๊ฒฐ๊ณผ๊ฐ
doOnNext: 1, 1
doOnNext: 1, 2
doOnNext: 2, 1
doOnNext: 2, 2
doOnNext: 3, 1
doOnNext: 3, 2
doOnNext: 4, 1
doOnNext: 4, 2
doOnNext: 5, 1
doOnNext: 5, 2
๐จ flatMap๊ณผ map์ ์ฐจ์ด?
1. ๋ณํ ๋ฒ์
๐น map : ๊ฐ ์์๋ฅผ ๋ณํํ์ฌ ์๋ก์ด ๊ฐ ์์ฑ ๐๋จ์ํ ๋งคํ ์์
๐น flatMap : ๊ฐ ์์๋ฅผ ๋ณํํ๊ณ ๊ทธ ๊ฒฐ๊ณผ๋ก๋ถํฐ ์ฌ๋ฌ ๊ฐ์ Flux๋ Mono ์์ฑํ์ฌ ํ๋์ ์คํธ๋ฆผ์ผ๋ก ๋ณํฉ ๐ ๋ณ๋ ฌ์ฒ๋ฆฌ๋ ๋ค์ํ ๋น๋๊ธฐ ์์ ์ ์ ๋ฆฌ
2. ์ถ๋ ฅ ์คํธ๋ฆผ ํํ
๐น map : ๊ฐ ์์๋ฅผ ๋ณํํ๊ณ ์๋์ ์์์ ๋์ผํ ํ์ ์ ์คํธ๋ฆผ ์์ฑ
๐น flatMap : ๋ณํ ๊ฒฐ๊ณผ๋ก ์์ฑ๋ ์ฌ๋ฌ ๊ฐ์ ์คํธ๋ฆผ์ ํ๋๋ก ๋ณํฉํ์ฌ ํ๋์ ์คํธ๋ฆผ ์์ฑ
concatMap
๊ฐ ์ ๋ ฅ ์์์ ๋ํด ๋น๋๊ธฐ ์์ ์ ์์ฐจ์ ์ผ๋ก ์คํ
๋ด๋ถ์ ์ผ๋ก ์์๋ฅผ ๋ณด์ฅํ๋ฉฐ, ํ ์์ ์ด ์๋ฃ๋ ํ ๋ค์ ์์ ์ ์์
flatMapSequential
๋น๋๊ธฐ ์์ ์ ์์ฐจ์ ์ผ๋ก ์ํํ ๋ ์ฌ์ฉ
flatMap๊ณผ ๋ค๋ฅด๊ฒ ์์ ๋ณด์ฅ
๐ flatMapSequential๊ณผ concatMap์ ์ฐจ์ด
concatMap์ ์ธ์๋ก ์ง์ ๋ ํจ์์์ ๋ฆฌํดํ๋ Publisher์ ์คํธ๋ฆผ์ด ๋ค ๋๋ ํ์ ๊ทธ๋ค์ ๋์ด์ค๋ ๊ฐ์Publisher ์คํธ๋ฆผ์ ์ฒ๋ฆฌ
๐ ๊ฒฐ๊ณผ ๋ค์ ๋๋ฆฌ๊ฒ ๋ฐํํ๋ฏ๋ก ์์ ๊ฐ์ ์์กด์ฑ์ด ๊ฐํ ๊ฒฝ์ฐ, ์์๊ฐ ๋ณด์ฅ๋์ด์ผ ํ ๋ ์ฌ์ฉ๋จ
flatMapSequential์ ์ผ๋จ ์ค๋ ๋๋ก ๊ตฌ๋ ํ๊ณ ๊ฒฐ๊ณผ๋ ์์์ ๋ง๊ฒ ๋ฆฌํดํ๋ ์ญํ
๐ ๋น๋๊ธฐ ์์ ์ด ๋ณ๋ ฌ๋ก ์คํ๋๋๋ผ๋ ์์๊ฐ ์ค์ํ ๊ฒฝ์ฐ ์ฌ์ฉ๋จ
filter
onNext ์ด๋ฒคํธ๋ฅผ ๋ฐ์์ boolean ๋ฐํํด์ true๋ผ๋ฉด onNext ์ด๋ฒคํธ ์ ํํ๊ณ false๋ฉด ํํฐ
๐ ๋ฐ์ดํฐ ์คํธ๋ฆผ์์ ์กฐ๊ฑด์ ๋ง์กฑํ๋ ์์๋ง ํต๊ณผ์ํค๋ฉฐ ์กฐ๊ฑด์ ๋ง์ง ์๋ ์์๋ ํํฐ๋งํ์ฌ ์ ์ธํด ํ์ํ ๋ถ๋ถ๋ง ์ ํ์ ์ผ๋ก ์ฒ๋ฆฌ
Flux.range(1, 10)
.filter(value -> value % 2 == 0) // ์ง์๋ง ํํฐ๋ง
.subscribe(System.out::println);
collectList
next ์ด๋ฒคํธ๊ฐ ์ ๋ฌ๋๋ฉด ๋ด๋ถ์ item์ ์ ์ฅํ๊ณ complete ์ด๋ฒคํธ๊ฐ ์ ๋ฌ๋๋ฉด ์ ์ฅํ๋ item๋ค์ list ํํ๋ก ๋ง๋ค์ด ์๋์ onNext ๋ฐํ
๐ Flux๋ฅผ Mono๋ก ๋ฐ๊ฟ ๋ ์ ์ฉ
Flux.range(1, 3)
.map(value -> value * 10)
.collectList()
.subscribe(list -> System.out.println("Collected list: " + list));
// ์ถ๋ ฅ: Collected list: [10, 20, 30]
reduce
๋ฐ์ดํฐ๋ฅผ ์ง๊ณ(aggregate)ํ ๋ ์ฌ์ฉ๋๋ ์ฐ์ฐ์๋ก, ์ฃผ๋ก ์คํธ๋ฆผ์ ๋ชจ๋ ์์๋ฅผ ํ๋์ ๊ฐ์ผ๋ก ๊ฒฐํฉํ๋ ๋ฐ ์ฌ์ฉ
์ฐธ๊ณ ์๋ฃ
'Spring > Spring WebFlux' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[๐ Reactive] 2. Reactive Streams (0) | 2024.12.16 |
---|---|
[๐ Reactive] 1. Reactive ์์คํ ๊ณผ Reactive ํ๋ก๊ทธ๋๋ฐ (0) | 2024.12.16 |
[WebFlux] flatMap๊ณผ map์ ์ฐจ์ด (0) | 2024.07.17 |
[WebFlux] Reactive Streams (0) | 2024.07.03 |
[WebFlux] ๋น๋๊ธฐ/๋๊ธฐ, non-blocking/blocking, CompletableFuture (0) | 2024.07.03 |