Spring Framework/Spring WebFlux

[WebFlux] Reactor ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์ •๋ฆฌ

soogoori 2024. 7. 17. 17:59

flatMapMany

flatMapMany

Mono์—์„œ ์—ฌ๋Ÿฌ ๊ฐœ ์š”์†Œ๋ฅผ ์ƒ์„ฑํ•  ๋•Œ ์‚ฌ์šฉ 
๐Ÿ‘‰ Mono์—์„œ ๋ฐ˜ํ™˜๋œ ๊ฒฐ๊ณผ๋ฅผ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์š”์†Œ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ Flux๋กœ ์ฒ˜๋ฆฌํ•  ๋•Œ ์œ ์šฉ

 

 

 

fromIterable

Iterable ํƒ€์ž…์˜ ๋ฐ์ดํ„ฐ(list, set, map ๋“ฑ์˜ ์ปฌ๋ ‰์…˜)๋ฅผ Flux๋กœ ๋ณ€ํ™˜ 
๐Ÿ‘‰ ์ปฌ๋ ‰์…˜ ๋‚ด์˜ ์š”์†Œ๋“ค์„ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์œผ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌ

 

 

map, mapNotNull

map์€ onNext ์ด๋ฒคํŠธ๋ฅผ ๋ฐ›์•„์„œ ๊ฐ’์„ ๋ณ€๊ฒฝํ•˜๊ณ  ์•„๋ž˜๋กœ ์ „๋‹ฌ 
Flux.range(1, 5)
    .map(value -> value * 2)
    .doOnNext(value -> {
    	log.info("doOnNext: " + value);
    })
	.subscribe();โ€‹
mapNotNull์€ ๋ณ€๊ฒฝ๋œ ๊ฐ’์ด null์ธ ๊ฒฝ์šฐ ํ•„ํ„ฐ 
๐Ÿ‘‰ Flux์—์„œ null ๊ฐ’์„ onNext๋กœ ์ „๋‹ฌํ•˜๋ฉด ์—๋Ÿฌ ๋ฐœ์ƒ
Flux.range(1, 5)
    .mapNotNull(value -> {
        if (value % 2 == 0) {
        	return value;
    	}
    	return null;
    })
    .doOnNext(value -> {
    	log.info("doOnNext: " + value);
    })
	.subscribe();โ€‹

 

 

 

doOnXX

doOnSubscribe, doOnNext, doOnComplete, doOnError ๋“ฑ

๊ฐ๊ฐ์˜ ์ด๋ฒคํŠธ๋ฅผ ํ๋ฆ„์— ์˜ํ–ฅ์„ ์ฃผ์ง€ ์•Š๊ณ  ์œ„์—์„œ ๋‚ด๋ ค์˜ค๋Š” ์ด๋ฒคํŠธ์— ๋Œ€ํ•ด์„œ ๋กœ๊น…์ด๋‚˜ ์ถ”๊ฐ€ ์ž‘์—… ๊ฐ€๋Šฅ 

 

Flux.range(1, 5)
    .map(value -> value * 2)
    .doOnNext(value -> {
    	log.info("doOnNext: " + value);
    })
    .doOnComplete(() -> {
    	log.info("doOnComplete");
    })
    .doOnSubscribe(subscription -> {
    	log.info("doOnSubscribe");
    })
    .doOnRequest(value -> {
    	log.info("doOnRequest: " + value);
    })
    .map(value -> value / 2)
.subscribe();

 

โœด๏ธ ์ด๋ฒคํŠธ ๋ฐœ์ƒ ์ˆœ์„œ 

1. doOnSubscribe : Flux๊ฐ€ ๊ตฌ๋…๋  ๋•Œ ๋ฐœ์ƒ

2. doOnRequest : ๊ตฌ๋…์ž๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์š”์ฒญํ•  ๋•Œ ๋ฐœ์ƒ ๐Ÿ‘‰ ๊ตฌ๋…์ด ์‹œ์ž‘๋  ๋•Œ ๊ตฌ๋…์ž๊ฐ€ publisher์—๊ฒŒ ์ตœ๋Œ€ํ•œ ๋งŽ์€ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›๊ณ ์ž ํ•จ (Long.MAX_VALUE)

3. ๋ฐ์ดํ„ฐ ๋ฐœํ–‰ ๋ฐ ์ฒ˜๋ฆฌ

- Flux.range(1, 5)์—์„œ 1~5๊นŒ์ง€์˜ ๊ฐ’์ด ์ˆœ์ฐจ์ ์œผ๋กœ ๋ฐœํ–‰๋˜๋ฉฐ map ์—ฐ์‚ฐ์ž์—์„œ ๊ฐ ๊ฐ’์ด 2๋ฐฐ๋กœ ๋ณ€ํ™˜
- doOnNext ์—ฐ์‚ฐ์ž์—์„œ ๋ณ€ํ™˜๋œ ๊ฐ ๊ฐ’์ด ๋กœ๊ทธ์— ๊ธฐ๋ก๋จ 

 

4. doOnComplete : ๋ชจ๋“  ๊ฐ’์ด ์„ฑ๊ณต์ ์œผ๋กœ ๋ฐœํ–‰๋˜๊ณ  ์™„๋ฃŒ๋˜์—ˆ์„ ๋•Œ ๋ฐœ์ƒ 


โœณ๏ธ ๊ฒฐ๊ณผ๊ฐ’ 

doOnSubscribe
doOnRequest: 9223372036854775807
doOnNext: 2
doOnNext: 4
doOnNext: 6
doOnNext: 8
doOnNext: 10
doOnComplete

 

 

flatMap

onNext ์ด๋ฒคํŠธ๋ฅผ ๋ฐ›์•„์„œ publisher๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ณ  publisher์˜ ์ด๋ฒคํŠธ๋ฅผ ์•„๋ž˜๋กœ ์ „๋‹ฌ
์—ฌ๋Ÿฌ publisher๋ฅผ ์กฐํ•ฉํ•ด์•ผํ•˜๋Š” ๊ฒฝ์šฐ ์œ ์šฉ 

 

Flux.range(1, 5)
    .flatMap(value -> {
    	return Flux.range(1, 2)
    		.map(value2 -> value + ", " + value2)
    		.publishOn(Schedulers.parallel());
    })
    .doOnNext(value -> {
    	log.info("doOnNext: " + value);
    })
	.subscribe();

 

  • flatMap์€ ๊ฐ ์š”์†Œ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ๋งˆ๋‹ค ๋‚ด๋ถ€์ ์œผ๋กœ ์ƒˆ๋กœ์šด Flux ์ƒ์„ฑ 
  • publishOn(Schedulers.parallel())์— ์˜ํ•ด ๋‚ด๋ถ€ Flux์˜ ์ž‘์—…์ด ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌ๋จ

 

โœณ๏ธ ๊ฒฐ๊ณผ๊ฐ’

doOnNext: 1, 1
doOnNext: 1, 2
doOnNext: 2, 1
doOnNext: 2, 2
doOnNext: 3, 1
doOnNext: 3, 2
doOnNext: 4, 1
doOnNext: 4, 2
doOnNext: 5, 1
doOnNext: 5, 2

 


๐Ÿšจ flatMap๊ณผ map์˜ ์ฐจ์ด?

1. ๋ณ€ํ™˜ ๋ฒ”์œ„ 
๐Ÿ”น map : ๊ฐ ์š”์†Œ๋ฅผ ๋ณ€ํ™˜ํ•˜์—ฌ ์ƒˆ๋กœ์šด ๊ฐ’ ์ƒ์„ฑ ๐Ÿ‘‰๋‹จ์ˆœํ•œ ๋งคํ•‘ ์ž‘์—…
๐Ÿ”น flatMap : ๊ฐ ์š”์†Œ๋ฅผ ๋ณ€ํ™˜ํ•˜๊ณ  ๊ทธ ๊ฒฐ๊ณผ๋กœ๋ถ€ํ„ฐ ์—ฌ๋Ÿฌ ๊ฐœ์˜ Flux๋‚˜ Mono ์ƒ์„ฑํ•˜์—ฌ ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ์œผ๋กœ ๋ณ‘ํ•ฉ ๐Ÿ‘‰ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ๋‚˜ ๋‹ค์–‘ํ•œ ๋น„๋™๊ธฐ ์ž‘์—…์— ์œ ๋ฆฌ 

2. ์ถœ๋ ฅ ์ŠคํŠธ๋ฆผ ํ˜•ํƒœ
๐Ÿ”น map : ๊ฐ ์š”์†Œ๋ฅผ ๋ณ€ํ™˜ํ•˜๊ณ  ์›๋ž˜์˜ ์š”์†Œ์™€ ๋™์ผํ•œ ํƒ€์ž…์˜ ์ŠคํŠธ๋ฆผ ์ƒ์„ฑ
๐Ÿ”น flatMap : ๋ณ€ํ™˜ ๊ฒฐ๊ณผ๋กœ ์ƒ์„ฑ๋œ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ŠคํŠธ๋ฆผ์„ ํ•˜๋‚˜๋กœ ๋ณ‘ํ•ฉํ•˜์—ฌ ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ ์ƒ์„ฑ 

 

 

 

filter

onNext ์ด๋ฒคํŠธ๋ฅผ ๋ฐ›์•„์„œ boolean ๋ฐ˜ํ™˜ํ•ด์„œ true๋ผ๋ฉด onNext ์ด๋ฒคํŠธ ์ „ํŒŒํ•˜๊ณ  false๋ฉด ํ•„ํ„ฐ

๐Ÿ‘‰ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์—์„œ ์กฐ๊ฑด์„ ๋งŒ์กฑํ•˜๋Š” ์š”์†Œ๋งŒ ํ†ต๊ณผ์‹œํ‚ค๋ฉฐ ์กฐ๊ฑด์— ๋งž์ง€ ์•Š๋Š” ์š”์†Œ๋Š” ํ•„ํ„ฐ๋งํ•˜์—ฌ ์ œ์™ธํ•ด ํ•„์š”ํ•œ ๋ถ€๋ถ„๋งŒ ์„ ํƒ์ ์œผ๋กœ ์ฒ˜๋ฆฌ  

 

Flux.range(1, 10)
    .filter(value -> value % 2 == 0) // ์ง์ˆ˜๋งŒ ํ•„ํ„ฐ๋ง
    .subscribe(System.out::println);

 

collectList

next ์ด๋ฒคํŠธ๊ฐ€ ์ „๋‹ฌ๋˜๋ฉด ๋‚ด๋ถ€์— item์„ ์ €์žฅํ•˜๊ณ  complete ์ด๋ฒคํŠธ๊ฐ€ ์ „๋‹ฌ๋˜๋ฉด ์ €์žฅํ–ˆ๋˜ item๋“ค์„ list ํ˜•ํƒœ๋กœ ๋งŒ๋“ค์–ด ์•„๋ž˜์— onNext ๋ฐœํ–‰

๐Ÿ‘‰ Flux๋ฅผ Mono๋กœ ๋ฐ”๊ฟ€ ๋•Œ ์œ ์šฉ

 

Flux.range(1, 3)
    .map(value -> value * 10)
    .collectList()
    .subscribe(list -> System.out.println("Collected list: " + list));
    
// ์ถœ๋ ฅ: Collected list: [10, 20, 30]

 

 

์ฐธ๊ณ ์ž๋ฃŒ

https://tech.kakao.com/posts/350