Reactor๋ ?
๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ
Spring Framework ํ์ ์ํด ๊ฐ๋ฐ๋ ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ฆ์ ๊ตฌํ์ฒด๋ก์ Spring WebFlux ๊ธฐ๋ฐ์ ๋ฆฌ์กํฐ๋ธ ์ดํ๋ฆฌ์ผ์ด์ ์ ์ ์ํ๊ธฐ ์ํ ํต์ฌ ์ญํ ์ ๋ด๋นํ๋ค.
โณ๏ธ Reactor์ ํน์ง
- Reactive Streams
- Non-Blocking
- Java's Functional API
- Flux[N]
- Mono[0|1]
- Microservices
- Backpressure
Mono ๊ธฐ๋ณธ ์์
public class Example {
public static void main(String[] args) {
Mono
.empty()
.subscribe(
none -> System.out.println("# emitted onNext signal"),
error -> {},
() -> System.out.println("# emitted onComplete signal")
);
}
}
- empty() Operator๋ฅผ ์ฌ์ฉํ๋ฉด ๋ด๋ถ์ ์ผ๋ก emitํ ๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒ์ผ๋ก ๊ฐ์ฃผํ์ฌ ๊ณง๋ฐ๋ก onComplete signal ์ ์ก
- empty() Operator๋ ์ด๋ค ํน์ ์์
์ ํตํด ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌ๋ฐ์ ํ์๋ ์์ง๋ง ์์
์ด ๋๋ฌ์์ ์๋ฆฌ๊ณ ์ด์ ๋ฐ๋ฅธ ํ์ฒ๋ฆฌ๋ฅผ ํ๊ณ ์ถ์ ๋ ์ฌ์ฉ
- subscribe() ๋ฉ์๋
- ์ฒซ ๋ฒ์งธ ๋๋ค ํํ์์ Publisher๊ฐ onNext signal์ ์ ์กํ๋ฉด ์คํ๋จ ๐ Subscriber๊ฐ Publisher๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌ๋ฐ๊ธฐ ์ํด ์ฌ์ฉ๋จ
- ๋ ๋ฒ์งธ ๋๋ค ํํ์์ Publisher๊ฐ onError signal์ ์ ์กํ๋ฉด ์คํ๋จ
- ์ธ ๋ฒ์งธ ๋๋ค ํํ์์ Publisher๊ฐ onComplete signal์ ์ ์กํ๋ฉด ์คํ๋จ
Flux ๊ธฐ๋ณธ ์์
public class Example {
public static void main(String[] args) {
Flux.fromArray(new Integer[]{3, 6, 7, 9})
.filter(num -> num > 6)
.map(num -> num * 2)
.subscribe(Sytstem.out::println);
}
}
public class Example {
public static void main(String[] args) {
Flux<String> flux =
Mono.justOrEmpty("Steve")
.concatWith(Mono.justOrEmpty("Jobs"));
flux.subscribe(System.out::println);
}
}
- justOrEmpty()์ ํ๋ผ๋ฏธํฐ๋ก null์ด ์ ๋ฌ๋๋ฉด ๋ด๋ถ์ ์ผ๋ก empty() Operator ํธ์ถ
- ๋ ๊ฐ์ Mono์์ emitํ๋ ๋ฐ์ดํฐ๋ฅผ ํ๋์ ๋ฐ์ดํฐ ์์ค๋ก ์ฐ๊ฒฐํ์ฌ ์๋ก์ด Flux ๋ฆฌํด
- ๋ฌธ์์ด์ฒ๋ผ ๋ฐ์ดํฐ ์์ฒด๋ฅผ ์ด์ด ๋ถ์ฌ์ ํ๋์ ๋ฐ์ดํฐ๋ฅผ emit ํ๋ ๊ฒ ์๋๋ผ emitํ ๋ฐ์ดํฐ๋ฅผ ์ผ๋ ฌ๋ก ์ค ์ธ์์ ํ๋์ ๋ฐ์ดํฐ ์์ค๋ฅผ ๋ง๋ ํ์ ์ฐจ๋ก์ฐจ๋ก ๋ฐ์ดํฐ emit
- concatWith() ๋ ๋ ๊ฐ์ ๋ฐ์ดํฐ ์์ค๋ง ์ฐ๊ฒฐ ๊ฐ๋ฅ
- concat()์ ์ฌ๋ฌ ๊ฐ์ ๋ฐ์ดํฐ ์์ค๋ฅผ ์ํ๋ ๋งํผ ์ฐ๊ฒฐ ๊ฐ๋ฅ
Cold / Hot Sequence
โณ๏ธ Cold Sequence
- Subscriber๊ฐ ๊ตฌ๋ ํ ๋๋ง๋ค ๋ฐ์ดํฐ ํ๋ฆ์ด ์ฒ์๋ถํฐ ๋ค์ ์์๋๋ sequence
โณ๏ธ Hot Sequence
- ๊ตฌ๋ ์ด ๋ฐ์ํ ์์ ์ด์ ์ Publisher๋ก๋ถํฐ emit๋ ๋ฐ์ดํฐ๋ Subscriber๊ฐ ์ ๋ฌ๋ฐ์ง ๋ชปํ๊ณ , ๊ตฌ๋ ์ด ๋ฐ์ํ ์์ ์ดํ์ emit๋ ๋ฐ์ดํฐ๋ง ๋ฐ์ ์ ์์
- share(), cache() ๋ฑ์ operator๋ฅผ ์ฌ์ฉํด์ cold sequence๋ฅผ hot sequence๋ก ๋ณํ ๊ฐ๋ฅ
Backpressure
Publisher๋ก๋ถํฐ ์ ๋ฌ๋ฐ์ ๋ฐ์ดํฐ๋ฅผ ์์ ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ์ํ ์๋จ
โณ๏ธ Backpressure ์ฒ๋ฆฌ ๋ฐฉ์
๐ท ๋ฐ์ดํฐ ๊ฐ์ ์ ์ด
- Subscriber๊ฐ request() ๋ฉ์๋๋ฅผ ํตํด ์ ์ ํ ๋ฐ์ดํฐ ๊ฐ์ ์์ฒญ
public class Example {
public static void main(String[] args) {
Flux.range(1, 5)
.doOnRequest(data -> log.info("# doOnRequest : {}", data))
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@SneakyThrows
@Override
protected void hookOnNext(Integer value) {
// Publisher์ ๋ฐ์ดํฐ emit ์๋๋ณด๋ค Subscriber์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์๋๊ฐ ๋ ๋๋ฆฐ ๊ฒ ์๋ฎฌ๋ ์ด์
Thread.sleep(2000L); // 2์ด ์ง์ฐ ์๊ฐ
log.info("# hookOnNext: {}", value);
request(1);
}
});
}
}
๐ท Backpressure ์ ๋ต ์ฌ์ฉ
- IGNORE ์ ๋ต
- Backpressure ์ ์ฉ X
- Downstream์์์ Backpressure ์์ฒญ์ด ๋ฌด์๋๊ธฐ ๋๋ฌธ์ IllegalStateException ๋ฐ์
- ERROR ์ ๋ต
- Downstream์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์๋๊ฐ ๋๋ ค์ Upstream์ emit ์๋๋ฅผ ๋ฐ๋ผ๊ฐ์ง ๋ชปํ ๊ฒฝ์ฐ IllegalStateException ๋ฐ์
- Publisher๋ Error signal์ subscriber์๊ฒ ์ ์กํ๊ณ ์ญ์ ํ ๋ฐ์ดํฐ๋ ํ๊ธฐ
- DROP ์ ๋ต
- Publisher๊ฐ Downstream์ผ๋ก ์ ๋ฌํ ๋ฐ์ดํฐ๊ฐ ๋ฒํผ์ ๊ฐ๋ ์ฐฐ ๊ฒฝ์ฐ, ๋ฒํผ ๋ฐ์์ ๋๊ธฐ ์ค์ธ ๋ฐ์ดํฐ ์ค์์ ๋จผ์ emit๋ ๋ฐ์ดํฐ๋ถํฐ Drop (ํ๊ธฐ)
- LATEST ์ ๋ต
- Publisher๊ฐ Downstream์ผ๋ก ์ ๋ฌํ ๋ฐ์ดํฐ๊ฐ ๋ฒํผ์ ๊ฐ๋ ์ฐฐ ๊ฒฝ์ฐ, ๋ฒํผ ๋ฐ์์ ๋๊ธฐ ์ค์ธ ๋ฐ์ดํฐ ์ค์์ ๊ฐ์ฅ ์ต๊ทผ์ (๋์ค์) emit๋ ๋ฐ์ดํฐ๋ถํฐ ๋ฒํผ์ ์ฑ์
- BUFFER ์ ๋ต
- ๋ฒํผ์ ๋ฐ์ดํฐ๋ฅผ ํ๊ธฐํ์ง ์๊ณ , ๋ฒํผ๋ง(๋ฐ์ดํฐ๋ฅผ ์์ ๋์๋ค๊ฐ ์ ์ก)์ ํ๋ ์ ๋ต, ๋ฒํผ๊ฐ ๊ฐ๋์ฐจ๋ฉด ๋ฒํผ ๋ด์ ๋ฐ์ดํฐ๋ฅผ ํ๊ธฐํ๋ ์ ๋ต, ๋ฒํผ๊ฐ ๊ฐ๋ ์ฐจ๋ฉด ์๋ฌ๋ฅผ ๋ฐ์์ํค๋ ์ ๋ต ๋ฑ์ผ๋ก ๊ตฌ๋ถ
'Spring > Spring WebFlux' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[๐ Reactive] Operators ไธ (0) | 2024.12.24 |
---|---|
[๐ Reactive] 6. Sinks์ Scheduler (0) | 2024.12.23 |
[๐ Reactive] 4. Reactive ํ๋ก๊ทธ๋๋ฐ์ ์ํ ์ฌ์ ์ง์ (1) | 2024.12.18 |
[๐ Reactive] 3. Blocking I/O์ Non-Blocking I/O (0) | 2024.12.17 |
[๐ Reactive] 2. Reactive Streams (0) | 2024.12.16 |