Sinks ๋ ?
Reactor์์ ์ ๊ณตํ๋ Sinks๋ ๋น๋๊ธฐ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๊ฒ ๋ฐํํ ์ ์๋ ์ ํธ๋ฆฌํฐ๋ก, Publisher ๋ฐ Subscriber ์ญํ ์ ์ง์ํ๋ค.
์ผ๋ฐ์ ์ผ๋ก generate() operator๋ create() operator๋ ์ฑ๊ธ์ค๋ ๋ ๊ธฐ๋ฐ์์ signal์ ์ ์กํ๋ ๋ฐ ์ฌ์ฉํ๋ ๋ฐ๋ฉด,
Sinks๋ ๋ฉํฐ์ค๋ ๋ ๋ฐฉ์์ผ๋ก signal์ ์ ์กํด๋ ์ค๋ ๋ ์์ ์ฑ์ ๋ณด์ฅํ๊ธฐ ๋๋ฌธ์ ์๊ธฐ์น ์์ ๋์์ผ๋ก ์ด์ด์ง๋ ๊ฒ์ ๋ฐฉ์งํด์ค๋ค.
๐ก์ค๋ ๋ ์์ ์ฑ์ด๋ ?
ํจ์๋ ๋ณ์ ๊ฐ์ ๊ณต์ ์์์ ๋์ ์ ๊ทผํ ๊ฒฝ์ฐ์๋ ํ๋ก๊ทธ๋จ์ ์คํ์ ๋ฌธ์ ๊ฐ ์์์ ์๋ฏธํ๋ค.
๋์ ์ ๊ทผ์ ๊ฐ์งํ๊ณ , ๋์ ์ ๊ทผํ๋ ์ค๋ ๋ ์ค ํ๋๊ฐ ๋น ๋ฅด๊ฒ ์คํจํจ์ผ๋ก์จ ์ค๋ ๋ ์์ ์ฑ์ ๋ณด์ฅํ๋ค.
Sinks ์ข ๋ฅ ๋ฐ ํน์ง
โณ๏ธ Sinks.One
- Sinks.one() ๋ฉ์๋๋ฅผ ์ฌ์ฉํด์ ํ ๊ฑด์ ๋ฐ์ดํฐ๋ฅผ ์ ์ก
- Sinks.One์ผ๋ก ์๋ฌด๋ฆฌ ๋ง์ ์์ ๋ฐ์ดํฐ๋ฅผ emitํ๋ค ํ๋๋ผ๋ ์ฒ์ emitํ ๋ฐ์ดํฐ๋ ์ ์์ ์ผ๋ก emit๋์ง๋ง ๋๋จธ์ง ๋ฐ์ดํฐ๋ค์ drop๋จ
โณ๏ธ Sinks.Many
- Sinks.many() ๋ฉ์๋๋ฅผ ์ฌ์ฉํด์ ์ฌ๋ฌ ๊ฑด์ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๊ฐ์ง ๋ฐฉ์์ผ๋ก ์ ์ก
- ManySpec์ด๋ผ๋ ์ธํฐํ์ด์ค๋ฅผ ๋ฆฌํด ๐ManySpec์ ๋ฐ์ดํฐ emit์ ์ํ ์ฌ๋ฌ ๊ฐ์ง ๊ธฐ๋ฅ ์ ์
- UnicastSpec์ ๋จ ํ๋์ Subscriber์๊ฒ๋ง ๋ฐ์ดํฐ๋ฅผ emit
- MulticastSpec์ ํ๋ ์ด์์ Subscriber์๊ฒ ๋ฐ์ดํฐ๋ฅผ emit
- MulticastReplaySpec์ emit๋ ๋ฐ์ดํฐ ์ค์์ ํน์ ์์ ์ผ๋ก ๋๋๋ฆฐ ๋ฐ์ดํฐ๋ถํฐ emit
Scheduler ๋ ?
Reactor์์ ์ฌ์ฉ๋๋ Scheduler๋ Reactor Sequence์์ ์ฌ์ฉ๋๋(๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ์ํด ์ฌ์ฉ๋๋) ์ค๋ ๋๋ฅผ ๊ด๋ฆฌํด์ฃผ๋ ๊ด๋ฆฌ์ ์ญํ ์ ํ๋ค.
์ฆ, ์ด๋ค ์ค๋ ๋์์ ๋ฌด์์ ์ฒ๋ฆฌํ ์ง ์ ์ดํจ์ผ๋ก์จ ๊ฐ๋ฐ์๊ฐ ์ง์ ์ค๋ ๋๋ฅผ ์ ์ดํด์ผํ๋ ๋ถ๋ด์ ๋์ด์ค๋ค.
โณ๏ธ ๋ฌผ๋ฆฌ์ ์ค๋ ๋
- ํ๋์จ์ด์ ๊ด๋ จ๋ ์ค๋ ๋ (CPU ์ฝ์ด)
- ๋ณ๋ ฌ์ฑ๊ณผ ๊ด๋ จ ์์
- ๋ฌผ๋ฆฌ์ ์ธ ์ค๋ ๋๊ฐ ์ค์ ๋ก ๋์์ ์คํ๋๊ธฐ ๋๋ฌธ์ ์ฌ๋ฌ ์์ ์ ๋์์ ์ฒ๋ฆฌ
โณ๏ธ ๋ ผ๋ฆฌ์ ์ค๋ ๋
- ํ๋ก์ธ์ค ๋ด์์ ์คํ๋๋ ์ธ๋ถ ์์ ์ ๋จ์
- ๋์์ฑ๊ณผ ๊ด๋ จ ์์
Scheduler๋ฅผ ์ํ ์ ์ฉ Operator
โณ๏ธ subscribeOn()
- ๊ตฌ๋ ์ด ๋ฐ์ํ ์งํ ์คํ๋ ์ค๋ ๋๋ฅผ ์ง์
- ์๋ณธ Publisher์ ๋์์ ์ํํ๊ธฐ ์ํ ์ค๋ ๋
@Slf4j
public class Example{
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
}
- boundedElastic์ด๋ผ๋ ์ ํ์ Scheduler ์ง์
- doOnNext๋ฅผ ์ฌ์ฉํด ์๋ณธ Flux์์ emit๋๋ ๋ฐ์ดํฐ๋ฅผ ๋ก๊ทธ๋ก ์ถ๋ ฅ
- doOnSubscribe๋ฅผ ์ฌ์ฉํด ๊ตฌ๋
์ด ๋ฐ์ํ ์์ ์ ์ถ๊ฐ์ ์ธ ์ด๋ค ์ฒ๋ฆฌ๊ฐ ํ์ํ ๊ฒฝ์ฐ ํด๋น ์ฒ๋ฆฌ ๋์ ์ถ๊ฐ
- ํ์ฌ ์ฌ๊ธฐ์๋ ๊ตฌ๋ ์ด ๋ฐ์ํ ์์ ์ ์คํ๋๋ ์ค๋ ๋๊ฐ ๋ฌด์์ธ์ง ํ์ธ
- ๊ตฌ๋ ์ด ๋ฐ์ํ ์งํ๋ถํฐ๋ ์๋ณธ Flux์ ๋์์ ์ฒ๋ฆฌํ๋ ์ค๋ ๋๊ฐ ๋ณ๊ฒฝ
โณ๏ธ publishOn()
- downstream์ผ๋ก signal์ ์ ์กํ ๋ ์คํ๋๋ ์ค๋ ๋๋ฅผ ์ ์ดํ๋ ์ญํ
- publishOn()์ ๊ธฐ์ค์ผ๋ก ์๋์ชฝ์ธ Downstream์ ์คํ ์ค๋ ๋๋ฅผ ๋ณ๊ฒฝ
- Scheduler๋ฅผ ์ง์ ํจ์ผ๋ก์ ํด๋น Scheduler์ ํน์ฑ์ ๊ฐ์ง ์ค๋ ๋๋ก ๋ณ๊ฒฝ
@Slf4j
public class Example{
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
}
โณ๏ธ parallel()
- subscribeOn()๊ณผ publishOn()์ ๋์์ฑ์ ๊ฐ์ง๋ ๋ ผ๋ฆฌ์ ์ธ ์ค๋ ๋์ ํด๋น
- parallel()์ ๋ณ๋ ฌ์ฑ์ ๊ฐ์ง๋ ๋ฌผ๋ฆฌ์ ์ธ ์ค๋ ๋์ ํด๋น
- ๋ผ์ด๋ ๋ก๋น ๋ฐฉ์์ผ๋ก CPU ์ฝ์ด ๊ฐ์๋งํผ์ ์ค๋ ๋๋ฅผ ๋ณ๋ ฌ๋ก ์คํ
@Slf4j
public class Example{
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel()
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
- parallel()์ emit๋๋ ๋ฐ์ดํฐ๋ฅผ CPU์ ๋ ผ๋ฆฌ์ ์ธ ์ฝ์ด ์์ ๋ง๊ฒ ์ฌ์ ์ ๊ณจ๊ณ ๋ฃจ ๋ถ๋ฐฐํ๋ ์ญํ ์ํ
- ์ค์ ๋ก ๋ณ๋ ฌ ์์ ์ ์ํํ ์ค๋ ๋์ ํ ๋น์ runOn()์ด ๋ด๋น
publishOn()๊ณผ subscribeOn()์ ๋์ ์ดํด
- publishOn()๊ณผ subscribeOn()์ ์ฌ์ฉํ์ง ์์ ๊ฒฝ์ฐ ๋ชจ๋ main ์ค๋ ๋์์ ์คํ๋จ
- ํ ๊ฐ ์ด์์ publishOn()์ ์ฌ์ฉํ๋ฉด ์คํ ์ค๋ ๋๋ฅผ ๋ชฉ์ ์ ๋ง๊ฒ ์ ์ ํ๊ฒ ๋ถ๋ฆฌ ๊ฐ๋ฅ
- subscribeOn()์ ๊ตฌ๋ ์ด ๋ฐ์ํ ์งํ์ ์คํ๋ ์ค๋ ๋๋ฅผ ์ง์ ํ๊ธฐ ๋๋ฌธ์ fromArray()๋ A ์ค๋ ๋์์ ์คํ๋จ
Scheduler์ ์ข ๋ฅ
โณ๏ธ Schedulers.immediate()
- ๋ณ๋์ ์ค๋ ๋๋ฅผ ์ถ๊ฐ์ ์ผ๋ก ์์ฑํ์ง ์๊ณ , ํ์ฌ ์ค๋ ๋์์ ์์ ์ ์ฒ๋ฆฌํ๊ณ ์ ํ ๋ ์ฌ์ฉ
๐โ๏ธ ํ์ฌ ์ค๋ ๋์์ ์์ ์ ์ฒ๋ฆฌํ๊ณ ์ถ๋ค๋ฉด publishOn()์ ํ ๋ฒ๋ง ์ฌ์ฉํ๋ฉด ๋์ง ์ ๊ตณ์ด Schedulers.immediate()์ ์ฌ์ฉํ๋์ ??
๐ publishOn()์ด ์ฌ๋ฌ ๋ฒ ํธ์ถ๋ ์ํฉ์์, ์ดํ ์์ ๋ค์ด ํ์ฌ ์ค๋ ๋์์ ๊ณ์ ์คํ๋๊ธฐ๋ฅผ ๋ณด์ฅํ๋ ค๋ ๊ฒฝ์ฐ ์ฌ์ฉํ๊ฑฐ๋ ์ค์ผ์ค๋ง ์ ํ ์์ด ํ์ฌ ์ค๋ ๋์์ ์คํํ๊ณ ์ ํ๋ ์๋ ๋ช ํํ ํํํ๊ณ ์ ํจ !
โณ๏ธ Schedulers.single()
- ์ค๋ ๋ ํ๋๋ง ์์ฑํด์ Scheduler๊ฐ ์ ๊ฑฐ๋๊ธฐ ์ ๊น์ง ์ฌ์ฌ์ฉ
- ํ๋์ ์ค๋ ๋๋ก ๋ค์์ ์์ ์ ์ฒ๋ฆฌํด์ผ ๋๋ฏ๋ก ์ง์ฐ ์๊ฐ์ด ์งง์ ์์ ์ ์ฒ๋ฆฌํ๋ ๊ฒ์ด ํจ๊ณผ์
โณ๏ธ Schedulers.newSingle()
- ํธ์ถํ ๋๋ง๋ค ์๋ก์ด ์ค๋ ๋ ํ๋๋ฅผ ์์ฑ
- Schedulers.newSingle("new-single", true)
- ์ฒซ ๋ฒ์งธ ํ๋ผ๋ฏธํฐ์๋ ์์ฑํ ์ค๋ ๋์ ์ด๋ฆ ์ง์
- ๋ ๋ฒ์งธ ํ๋ผ๋ฏธํฐ์๋ ์ด ์ค๋ ๋๋ฅผ ๋ฐ๋ชฌ ์ค๋ ๋๋ก ๋์ํ๊ฒ ํ ์ง ์ฌ๋ถ ์ค์
โณ๏ธ Schedulers.boundedElastic()
- ExecutorService ๊ธฐ๋ฐ์ ์ค๋ ๋ ํ์ ์์ฑํ ํ, ๊ทธ ์์์ ์ ํด์ง ์๋งํผ์ ์ค๋ ๋๋ฅผ ์ฌ์ฉํ์ฌ ์์ ์ ์ฒ๋ฆฌํ๊ณ ์์ ์ด ์ข ๋ฃ๋ ์ค๋ ๋๋ ๋ฐ๋ฉํ์ฌ ์ฌ์ฌ์ฉ
- Blocking I/O ์์ ์ ์ต์ ํ๋์ด ์์
โณ๏ธ Schedulers.parallel()
- Non-Blocking I/O์ ์ต์ ํ๋์ด ์์
- CPU ์ฝ์ด ์๋งํผ์ ์ค๋ ๋ ์์ฑ
โณ๏ธ Schedulers.fromExecutorService()
- ๊ธฐ์กด์ ์ด๋ฏธ ์ฌ์ฉํ๊ณ ์๋ ExecutorService๊ฐ ์๋ค๋ฉด ์ด ExecutorService๋ก๋ถํฐ Scheduler๋ฅผ ์์ฑ
- ์ด ๋ฐฉ์ ๊ถ์ฅ X
โณ๏ธ Schedulers.newXXXX()
- ์ค๋ ๋ ์ด๋ฆ, ์์ฑ ๊ฐ๋ฅํ ๋ํดํธ ์ค๋ ๋ ๊ฐ์, ์ค๋ ๋์ ์ ํด ์๊ฐ, ๋ฐ๋ชฌ ์ค๋ ๋๋ก์ ๋์ ์ฌ๋ถ ๋ฑ์ ์ง์ ์ง์ ํด์ ์ปค์คํ ์ค๋ ๋ ํ์ ์๋ก ์์ฑ
'Spring > Spring WebFlux' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[๐ Reactive] Spring WebFlux (0) | 2024.12.26 |
---|---|
[๐ Reactive] Operators ไธ (0) | 2024.12.24 |
[๐ Reactive] 5. Reactor (0) | 2024.12.19 |
[๐ Reactive] 4. Reactive ํ๋ก๊ทธ๋๋ฐ์ ์ํ ์ฌ์ ์ง์ (1) | 2024.12.18 |
[๐ Reactive] 3. Blocking I/O์ Non-Blocking I/O (0) | 2024.12.17 |