Operator란 ?
Reactive Stream의 데이터 흐름을 처리하거나 변환하는 역할을 하는 메서드
🔷 특징
- 비동기 처리
- 조합 가능성
- 지연 실행 (구독이 호출되어야 처리)
- 데이터 흐름을 단계적으로 처리
Sequence 생성을 위한 Operator
✳️ justOrEmpty
- just()의 확장 operator로서, emit할 데이터가 null인 경우 NullPooinException이 발생하지 않고, onComplete signal을 전송
just()는 Reactor에서 간단하고 정적인 데이터를 Reactive Stream으로 변환할 때 유용
- Hot Publisher이기 때문에 Subscriber의 구독 여부와는 상관없이 데이터를 emit.
- 구독이 발생하면 emit된 데이터를 다시 replay해서 Subscriber에게 전달
✳️ fromIterable
- Iterable에 포함된 데이터를 emit하는 Flux를 생성
✳️ fromStream
- Stream에 포함되는 데이터를 emit하는 Flux를 생성
✳️ range
- n부터 1씩 증가한 연속된 수를 m개 emit하는 Flux 생성
Flux<Integer> flux = Flux.range(1, 5); // 1부터 5개 숫자 (1, 2, 3, 4, 5) 발행
flux.subscribe(System.out::println);
✳️ defer
- Operator를 선언한 시점에 데이터를 emit한 것이 아니라 구독하는 시점에 데이터를 emit하는 Flux 또는 Mono를 생성
- 데이터 emit을 지연시키기 때문에 꼭 필요한 시점에 데이터를 emit하여 불필요한 프로세스 줄임
Mono<String> mono = Mono.defer(() -> Mono.just("Hello, " + System.currentTimeMillis()));
mono.subscribe(System.out::println); // 출력: Hello, <현재 타임스탬프>
mono.subscribe(System.out::println); // 출력: Hello, <다른 타임스탬프>
✳️ using
- 파라미터로 전달받은 resource를 emit하는 Flux 생성
- 첫 번째 파라미터는 읽어 올 resource
- 두 번째 파라미터는 읽어 온 resource를 emit하는 Flux
- 세 번째 파라미터는 종료 signal이 발생할 경우 resource를 해제하는 등의 후처리를 할 수 있게 함
public class UsingExample {
public static void main(String[] args) {
Mono<String> mono = Mono.using(
// 리소스 생성
() -> Files.newBufferedReader(Paths.get("test.txt")),
// 리소스를 사용하여 데이터를 읽기
reader -> Mono.fromCallable(() -> reader.readLine()),
// 리소스 정리 (파일 스트림 닫기)
reader -> {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
);
mono.subscribe(System.out::println);
}
}
✳️ generate
- signal 이벤트를 발생시키며, 동기적으로 데이터를 하나씩 순차적으로 emit하고자 할 경우 사용됨
- 데이터의 생성방식은 동기적이지만, 생성된 데이터는 backpressure 메커니즘에 따라 비동기적으로 소비됨
static <T, S> Flux<T> generate(
Callable<S> stateSupplier, // 초기상태 생성하는 함수
BiFunction<S, SynchronousSink<T>, S> generator, // 상태 반환하는 핵심 로직
Consumer<? super S> stateConsumer // 스트림이 완료되거나 에러로 종료된 후 상태 정리
)
Flux<Integer> flux = Flux.generate(
() -> 1, // 초기 상태
(state, sink) -> {
sink.next(state); // 현재 상태값 방출
if (state == 5) { // 종료 조건
sink.complete();
}
return state + 1; // 다음 상태 계산
}
);
flux.subscribe(System.out::println);
// 출력: 1 2 3 4 5
✳️ create
- signal 이벤트를 발생시키며, 한 번에 여러 건의 데이터를 비동기적으로 emit하고자 할 경우 사용됨
@Slf4j
public class Example{
static int start = 1;
static int end = 4;
public static void main(String[] args) throws InterruptedException {
Flux.create((FluxSink<Integer> sink) -> {
sink.onRequest(n -> {
log.info("# requested: " + n);
try {
Thread.sleep(500L);
for(int i = start; i <= end; i++) {
sink.next(i);
}
start += 4;
end += 4;
} catch(InterruptedException e) {}
});
sink.onDispose(() -> {
log.info("# clean up");
});
}, FluxSink.OverflowStrategy.Drop)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel(), 2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3000L);
}
}
- Backpressure 전략 중 DROP 전략 지정
- publishOn()으로 Scheduler를 설정하면서 prefetch 수를 2로 지정
- start와 end 변수를 사용해서 매번 4개의 데이터를 emit
🔷 동작 과정
- 구독이 발생하면 publishOn()에서 설정한 숫자만큼의 데이터 요청
- create() 내부에서 sink.onRequest() 메서드의 람다 표현식 실행
- 요청한 개수보다 2개 더 많은 데이터 emit 👉 sink.next(i)
- subscriber가 emit된 데이터를 전달받아 로그 출력
- 이 때 backpressure DROP 전략이 적용되었으므로 요청 개수를 초과하여 emit된 데이터는 DROP됨
- 다시 publishOn()에서 지정한 숫자만큼의 데이터 요청
- create() 내부에서 onComplete signal이 발생하지 않았기 때문에 2~6 과정을 반복하다가 설정한 지연 시간이 지나면 main 스레드가 종료되어 코드 실행이 종료됨
Sequence 필터링 Operator
✳️ filter
- upstream에서 emit된 데이터 중에서 조건에 일치하는 데이터만 downstream으로 emit
- 동기적으로 조건 평가
Flux<Integer> flux = Flux.range(1, 10)
.filter(i -> i % 2 != 0); // 홀수만 필터링
flux.subscribe(System.out::println);
// 출력: 1 3 5 7 9
💡 filterWhen()은 비동기 조건을 기반으로 스트림의 요소 평가
Flux<Integer> flux = Flux.range(1, 10);
Flux<Integer> evenNumbers = flux
.filterWhen(i -> Mono.just(i % 2 == 0)); // 비동기적으로 짝수인지 확인
evenNumbers.subscribe(System.out::println);
// 출력: 2 4 6 8 10
✳️ skip
- upstream에서 emit된 데이터 중에서 파라미터로 입력받은 숫자만큼 건너뛴 후 나머지 데이터를 downstream으로 emit
- 파라미터로 시간을 지정하면 지정한 시간 내에 emit된 데이터를 건너 뛴 후 나머지 데이터 emit
Flux<Integer> flux = Flux.range(1, 10);
flux.skip(3) // 처음 3개의 요소 건너뛰기
.subscribe(System.out::println);
// 출력: 4 5 6 7 8 9 10
✳️ take
- upstream에서 emit되는 데이터 중에서 파라미터로 입력받은 숫자만큼만 downstream으로 emit
- 파라미터로 시간을 지정하면 upstream에서 emit되는 데이터 중에서 파라미터로 입력한 시간 내에 emit된 데이터만 downstream으로 emit
- takeLast()
- upstream에서 emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 마지막에 emit된 데이터를 downstream으로 emit
- takeUntil()
- 파라미터로 입력한 람다 표현식이 true가 될 때까지 upstream에서 emit된 데이터를 downstream으로 emit
- upstream에서 emit된 데이터에는 Predicate을 평가할 때 사용한 데이터가 포함됨
- takeWhile()
- 파라미터로 입력한 람다 표현식이 true가 되는 동안에만 upstream에서 emit된 데이터를 downstream으로 emit
- Predicate을 평가할 때 사용한 데이터가 downstream으로 emit되지 않음
Flux<Long> flux = Flux.interval(Duration.ofMillis(100)).take(500, TimeUnit.MILLISECONDS);
flux.subscribe(System.out::println);
// 출력: 0 1 2 3 4 (0부터 4까지의 값은 100ms 간격으로 방출되고, 500ms 후에 종료)
// takeLast()
Flux<Integer> flux = Flux.range(1, 10); // 1부터 10까지 방출
flux.takeLast(3) // 마지막 3개 요소만 방출
.subscribe(System.out::println);
// 출력:
// 8
// 9
// 10
// takeWhile()
Flux<Integer> flux = Flux.range(1, 10);
flux.takeWhile(i -> i < 5) // 값이 5 미만인 동안만 취함
.subscribe(System.out::println);
// 출력: 1 2 3 4
// takeUntil()
Flux<Integer> flux = Flux.range(1, 10);
flux.takeUntil(i -> i == 5) // 5가 될 때까지 취함
.subscribe(System.out::println);
// 출력: 1 2 3 4
✳️ next
- upstream에서 emit된 데이터 중에서 첫 번째 데이터만 downstream으로 emit
- 만약 upstream에서 emit되는 데이터가 empty라면 downstream으로 empty Mono를 emit
Flux<Integer> flux = Flux.just(10, 20, 30, 40);
Mono<Integer> mono = flux.next(); // 첫 번째 요소만 Mono로 변환
mono.subscribe(System.out::println);
// 출력: 10
'Spring > Spring WebFlux' 카테고리의 다른 글
[📕 Reactive] Operators 中 (0) | 2025.01.02 |
---|---|
[📕 Reactive] Spring WebFlux (0) | 2024.12.26 |
[📕 Reactive] 6. Sinks와 Scheduler (0) | 2024.12.23 |
[📕 Reactive] 5. Reactor (0) | 2024.12.19 |
[📕 Reactive] 4. Reactive 프로그래밍을 위한 사전 지식 (1) | 2024.12.18 |