Sequence 변환 Operator
✳️ map
- upstream에서 emit된 데이터를 mapper function을 사용해 변환한 후, downstream으로 emit
- map() operator 내부에서 에러 발생 시 sequence가 종료되지 않고 계속 진행되도록 하는 기능 지원
Mono<String> mono = Mono.just("Reactor")
.map(String::toUpperCase);
mono.subscribe(System.out::println); // 출력: REACTOR
✳️ flatMap
- upstream에서 emit된 데이터가 Inner Sequence에서 평탄화 작업을 거치면서 하나의 Sequence로 병합되어 downstream으로 emit
public class Example {
public static void main(String[] args) {
Flux
.just("Good", "Bad")
.flatMap(feeling -> Flux
.just("Morning", "Afternoon", "Evening")
.map(time -> feeling + " " + time))
.subscribe(log::info);
}
}
// 출력
// Good Morning
// Good Afternoon
// Good Evening
// Bad Morning
// Bad Afternoon
// Bad Evening
✳️ concat
- 파라미터로 입력되는 Publisher의 Sequence를 연결해서 데이터를 순차적으로 emit
- 먼저 입력된 Publisher의 Sequence가 종료될 때까지 나머지 Publisher의 Sequence는 subscribe되지 않고 대기
import reactor.core.publisher.Flux;
public class ConcatExample {
public static void main(String[] args) {
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux<String> concatenatedFlux = Flux.concat(flux1, flux2);
concatenatedFlux.subscribe(System.out::println);
// 출력: A, B, C, D, E, F
}
}
✳️ merge
- 파라미터로 입력되는 Publisher의 Sequence에서 emit된 데이터를 interleave(도착하는 순서대로 섞어서 방출) 방식으로 병합
- 모든 Publisher의 Sequence가 즉시 subscribe 👉 순서 보장 X
public class MergeExample {
public static void main(String[] args) {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<String> flux2 = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(50));
Flux<String> mergedFlux = Flux.merge(flux1, flux2);
mergedFlux.toStream().forEach(System.out::println);
// 출력 순서: D, A, E, B, F, C (순서 보장되지 않음)
}
}
- 입력되는 Publisher가 데이터를 emit하는 시간 주기를 다르게 설정함으로써 emit하는 시간이 빠른 데이터부터 차례대로 emit
✳️ zip
- 파라미터로 입력되는 Publisher Sequence에서 emit된 데이터를 결합
- 각 Publisher가 데이터를 하나씩 emit할 때까지 기다렸다가 결합
public class ZipVsMerge {
public static void main(String[] args) {
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("1", "2", "3");
Flux<String> zippedFlux = Flux.zip(flux1, flux2, (f1, f2) -> f1 + f2);
zippedFlux.subscribe(System.out::println);
// 출력: A1, B2, C3
}
}
✳️ and
- Mono의 Complete signal과 파라미터로 입력된 Publisher의 Complete Signal을 결합하여 새로운 Mono<Void> 반환
- 모든 작업이 끝난 시점에 최종적으로 후처리 작업 수행
👉 모든 sequence가 종료되길 기다렸다가 최종적으로 onComplete signal만 전송
✳️ collectList
- Flux에서 emit된 데이터를 모아서 List로 변환한 후, 변환된 List를 emit하는 Mono 반환
- Upstream sequence가 비어 있다면 비어 있는 List를 Downstream으로 emit
public class CollectListWithMap {
public static void main(String[] args) {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
flux.map(i -> i * 2) // 각 숫자를 두 배로 변환
.collectList()
.subscribe(list -> System.out.println("Transformed List: " + list));
// 출력: Transformed List: [2, 4, 6, 8, 10]
}
}
✳️ collectMap
- Flux에서 emit된 데이터를 기반으로 key와 value를 생성하여 Map의 Element로 추가한 후, 최종적으로 Map을 emit하는 Mono를 반환
- Upstream sequence가 비어 있다면 비어 있는 Map을 Downstream으로 emit
public class CollectMapExample {
public static void main(String[] args) {
Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
// 이름을 키로 하고 길이를 값으로 매핑
Mono<Map<String, Integer>> nameLengthMap = names.collectMap(
name -> name, // 키: 이름 자체
name -> name.length() // 값: 이름의 길이
);
// 결과 구독
nameLengthMap.subscribe(map -> {
System.out.println(map);
// 출력: {Alice=5, Bob=3, Charlie=7}
});
}
}
Sequence 내부 동작 확인을 위한 Operator
Upstream Publisher에서 emit되는 데이터의 변경 없이 부수 효과만을 수행하기 위한 Operator로, doOnXXXX()로 시작
🌟 부수효과 : 함수가 정해진 결과를 돌려주는 것 이외의 어떤 일을 하게 되는 것
- Consumer 또는 Runnable 타입의 함수형 인터페이스를 파라미터로 가지기 때문에 별도의 리턴값 X
- Upstream Publisher로부터 emit되는 데이터를 이용해 Upstream Publisher의 내부 동작을 엿볼 수 있으며 로그를 출력하는 등의 디버깅 용도로 많이 사용됨
- 데이터 emit 과정에서 error가 발생하면 해당 에러에 대한 알림을 전송하는 로직 적용 가능
✳️ doOnXXXX() operator 목록
- doOnSubscribe()
- Publisher가 구독 중일 때 트리거되는 동작 추가
- doOnRequest()
- Publisher가 요청을 수신할 때 트리거되는 동작 추가
- doOnNext()
- Publisher가 데이터를 emit할 때 트리거되는 동작 추가
- doOnComplete()
- Publisher가 성공적으로 완료되었을 때 트리거되는 동작 추가
- doOnError()
- Publisher가 에러가 발생한 상태로 종료되었을 때 트리거되는 동작 추가
- doOnCancel()
- Publisher가 취소되었을 때 트리거되는 동작 추가
- doOnTerminate()
- Publisher가 성공적으로 완료되었을 때 또는 에러가 발생한 상태로 종료되었을 때 트리거되는 동작 추가
- doOnEach()
- Publisher가 데이터를 emit할 때 성공적으로 완료되었을 때, 에러가 발생한 상태로 종료되었을 때 트리거되는 동작 추가
- doOnDiscard()
- Upstream에 있는 전체 Operator 체인의 동작 중에서 Operator에 의해 폐기되는 요소를 조건부로 정리
- doAfterTerminate()
- Downstream을 성공적으로 완료한 직후 또는 에러가 발생하여 Publisher가 종료된 직후에 트리거되는 동작 추가
- doFirst()
- Publisher가 구독되기 전에 트리거되는 동작 추가
- doFinally()
- 에러를 포함해서 어떤 이유이든 간에 Publisher가 종료된 후 트리거되는 동작 추가
'Spring > Spring WebFlux' 카테고리의 다른 글
[📕 Reactive] Spring WebFlux (0) | 2024.12.26 |
---|---|
[📕 Reactive] Operators 上 (0) | 2024.12.24 |
[📕 Reactive] 6. Sinks와 Scheduler (0) | 2024.12.23 |
[📕 Reactive] 5. Reactor (0) | 2024.12.19 |
[📕 Reactive] 4. Reactive 프로그래밍을 위한 사전 지식 (1) | 2024.12.18 |